diff options
Diffstat (limited to 'src/libcore')
| -rw-r--r-- | src/libcore/hashmap.rs | 6 | ||||
| -rw-r--r-- | src/libcore/io.rs | 2 | ||||
| -rw-r--r-- | src/libcore/oldcomm.rs | 8 | ||||
| -rw-r--r-- | src/libcore/pipes.rs | 294 | ||||
| -rw-r--r-- | src/libcore/private.rs | 6 | ||||
| -rw-r--r-- | src/libcore/private/global.rs | 2 | ||||
| -rw-r--r-- | src/libcore/reflect.rs | 2 | ||||
| -rw-r--r-- | src/libcore/task/local_data.rs | 2 | ||||
| -rw-r--r-- | src/libcore/task/spawn.rs | 2 |
9 files changed, 304 insertions, 20 deletions
diff --git a/src/libcore/hashmap.rs b/src/libcore/hashmap.rs index df98e469bbc..3a51a2a212c 100644 --- a/src/libcore/hashmap.rs +++ b/src/libcore/hashmap.rs @@ -35,13 +35,13 @@ pub mod linear { const INITIAL_CAPACITY: uint = 32u; // 2^5 - struct Bucket<K: Eq Hash, V> { + struct Bucket<K,V> { hash: uint, key: K, value: V, } - pub struct LinearMap<K: Eq Hash, V> { + pub struct LinearMap<K,V> { k0: u64, k1: u64, resize_at: uint, @@ -408,7 +408,7 @@ pub mod linear { pure fn ne(&self, other: &LinearMap<K, V>) -> bool { !self.eq(other) } } - pub struct LinearSet<T: Hash IterBytes Eq> { + pub struct LinearSet<T> { priv map: LinearMap<T, ()> } diff --git a/src/libcore/io.rs b/src/libcore/io.rs index cf49ee0becc..6d618627ece 100644 --- a/src/libcore/io.rs +++ b/src/libcore/io.rs @@ -1111,7 +1111,7 @@ pub mod fsync { // Artifacts that need to fsync on destruction - pub struct Res<t: Copy> { + pub struct Res<t> { arg: Arg<t>, } diff --git a/src/libcore/oldcomm.rs b/src/libcore/oldcomm.rs index 89300be9ab4..dc245f5bffd 100644 --- a/src/libcore/oldcomm.rs +++ b/src/libcore/oldcomm.rs @@ -68,7 +68,7 @@ use vec; * transmitted. If a port value is copied, both copies refer to the same * port. Ports may be associated with multiple `chan`s. */ -pub enum Port<T: Owned> { +pub enum Port<T> { Port_(@PortPtr<T>) } @@ -84,7 +84,7 @@ pub enum Port<T: Owned> { * data will be silently dropped. Channels may be duplicated and * themselves transmitted over other channels. */ -pub enum Chan<T: Owned> { +pub enum Chan<T> { Chan_(port_id) } @@ -120,7 +120,7 @@ pub fn listen<T: Owned, U>(f: fn(Chan<T>) -> U) -> U { f(po.chan()) } -struct PortPtr<T:Owned> { +struct PortPtr<T> { po: *rust_port, drop { unsafe { @@ -238,7 +238,7 @@ fn peek_chan<T: Owned>(ch: Chan<T>) -> bool { } /// Receive on a raw port pointer -fn recv_<T: Owned>(p: *rust_port) -> T { +fn recv_<T>(p: *rust_port) -> T { unsafe { let yield = 0; let yieldp = ptr::addr_of(&yield); diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 33826d10c3c..5af9950ff08 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -151,7 +151,7 @@ type Buffer<T: Owned> = { #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)] -pub struct Buffer<T: Owned> { +pub struct Buffer<T> { header: BufferHeader, data: T, } @@ -212,10 +212,18 @@ impl PacketHeader { } #[doc(hidden)] +#[cfg(stage0)] pub struct Packet<T: Owned> { header: PacketHeader, mut payload: Option<T>, } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub struct Packet<T> { + header: PacketHeader, + mut payload: Option<T>, +} #[doc(hidden)] pub trait HasBuffer { @@ -256,12 +264,11 @@ fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> { } move b } - #[doc(hidden)] #[cfg(stage1)] #[cfg(stage2)] #[cfg(stage3)] -fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> { +fn unibuffer<T>() -> ~Buffer<Packet<T>> { let b = ~Buffer { header: BufferHeader(), data: Packet { @@ -277,6 +284,7 @@ fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> { } #[doc(hidden)] +#[cfg(stage0)] pub fn packet<T: Owned>() -> *Packet<T> { let b = unibuffer(); let p = ptr::addr_of(&(b.data)); @@ -284,6 +292,16 @@ pub fn packet<T: Owned>() -> *Packet<T> { unsafe { forget(move b) } p } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub fn packet<T>() -> *Packet<T> { + let b = unibuffer(); + let p = ptr::addr_of(&(b.data)); + // We'll take over memory management from here. + unsafe { forget(move b) } + p +} #[doc(hidden)] pub fn entangle_buffer<T: Owned, Tstart: Owned>( @@ -387,11 +405,19 @@ fn swap_state_rel(dst: &mut State, src: State) -> State { } #[doc(hidden)] +#[cfg(stage0)] pub unsafe fn get_buffer<T: Owned>(p: *PacketHeader) -> ~Buffer<T> { transmute((*p).buf_header()) } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> { + transmute((*p).buf_header()) +} // This could probably be done with SharedMutableState to avoid move_it!(). +#[cfg(stage0)] struct BufferResource<T: Owned> { buffer: ~Buffer<T>, @@ -413,7 +439,31 @@ struct BufferResource<T: Owned> { } } } +#[cfg(stage1)] +#[cfg(stage2)] +struct BufferResource<T> { + buffer: ~Buffer<T>, + drop { + unsafe { + let b = move_it!(self.buffer); + //let p = ptr::addr_of(*b); + //error!("drop %?", p); + let old_count = atomic_sub_rel(&mut b.header.ref_count, 1); + //let old_count = atomic_xchng_rel(b.header.ref_count, 0); + if old_count == 1 { + // The new count is 0. + + // go go gadget drop glue + } + else { + forget(move b) + } + } + } +} + +#[cfg(stage0)] fn BufferResource<T: Owned>(b: ~Buffer<T>) -> BufferResource<T> { //let p = ptr::addr_of(*b); //error!("take %?", p); @@ -424,8 +474,21 @@ fn BufferResource<T: Owned>(b: ~Buffer<T>) -> BufferResource<T> { buffer: move b } } +#[cfg(stage1)] +#[cfg(stage2)] +fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> { + //let p = ptr::addr_of(*b); + //error!("take %?", p); + atomic_add_acq(&mut b.header.ref_count, 1); + + BufferResource { + // tjc: ???? + buffer: move b + } +} #[doc(hidden)] +#[cfg(stage0)] pub fn send<T: Owned, Tbuffer: Owned>(p: SendPacketBuffered<T, Tbuffer>, payload: T) -> bool { let header = p.header(); @@ -467,6 +530,49 @@ pub fn send<T: Owned, Tbuffer: Owned>(p: SendPacketBuffered<T, Tbuffer>, } } } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool { + let header = p.header(); + let p_ = p.unwrap(); + let p = unsafe { &*p_ }; + assert ptr::addr_of(&(p.header)) == header; + assert p.payload.is_none(); + p.payload = move Some(move payload); + let old_state = swap_state_rel(&mut p.header.state, Full); + match old_state { + Empty => { + // Yay, fastpath. + + // The receiver will eventually clean this up. + //unsafe { forget(p); } + return true; + } + Full => fail ~"duplicate send", + Blocked => { + debug!("waking up task for %?", p_); + let old_task = swap_task(&mut p.header.blocked_task, ptr::null()); + if !old_task.is_null() { + unsafe { + rustrt::task_signal_event( + old_task, + ptr::addr_of(&(p.header)) as *libc::c_void); + rustrt::rust_task_deref(old_task); + } + } + + // The receiver will eventually clean this up. + //unsafe { forget(p); } + return true; + } + Terminated => { + // The receiver will never receive this. Rely on drop_glue + // to clean everything up. + return false; + } + } +} /** Receives a message from a pipe. @@ -812,13 +918,24 @@ pub fn select<T: Owned, Tb: Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>]) message. */ +#[cfg(stage0)] pub type SendPacket<T: Owned> = SendPacketBuffered<T, Packet<T>>; +#[cfg(stage1)] +#[cfg(stage2)] +pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>; #[doc(hidden)] +#[cfg(stage0)] pub fn SendPacket<T: Owned>(p: *Packet<T>) -> SendPacket<T> { SendPacketBuffered(p) } +#[cfg(stage1)] +#[cfg(stage2)] +pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> { + SendPacketBuffered(p) +} +#[cfg(stage0)] pub struct SendPacketBuffered<T: Owned, Tbuffer: Owned> { mut p: Option<*Packet<T>>, mut buffer: Option<BufferResource<Tbuffer>>, @@ -837,7 +954,31 @@ pub struct SendPacketBuffered<T: Owned, Tbuffer: Owned> { // } else { "some" }); } } } +#[cfg(stage1)] +#[cfg(stage2)] +pub struct SendPacketBuffered<T, Tbuffer> { + mut p: Option<*Packet<T>>, + mut buffer: Option<BufferResource<Tbuffer>>, +} +impl<T:Owned,Tbuffer:Owned> SendPacketBuffered<T,Tbuffer> : ::ops::Drop { + fn finalize(&self) { + //if self.p != none { + // debug!("drop send %?", option::get(self.p)); + //} + if self.p != None { + let mut p = None; + p <-> self.p; + sender_terminate(option::unwrap(move p)) + } + //unsafe { error!("send_drop: %?", + // if self.buffer == none { + // "none" + // } else { "some" }); } + } +} + +#[cfg(stage0)] pub fn SendPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>) -> SendPacketBuffered<T, Tbuffer> { //debug!("take send %?", p); @@ -849,8 +990,50 @@ pub fn SendPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>) } } } +#[cfg(stage1)] +#[cfg(stage2)] +pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>) + -> SendPacketBuffered<T, Tbuffer> { + //debug!("take send %?", p); + SendPacketBuffered { + p: Some(p), + buffer: unsafe { + Some(BufferResource( + get_buffer(ptr::addr_of(&((*p).header))))) + } + } +} + +#[cfg(stage0)] +impl<T:Owned,Tbuffer:Owned> SendPacketBuffered<T,Tbuffer> { + fn unwrap() -> *Packet<T> { + let mut p = None; + p <-> self.p; + option::unwrap(move p) + } + + pure fn header() -> *PacketHeader { + match self.p { + Some(packet) => unsafe { + let packet = &*packet; + let header = ptr::addr_of(&(packet.header)); + //forget(packet); + header + }, + None => fail ~"packet already consumed" + } + } -impl<T: Owned, Tbuffer: Owned> SendPacketBuffered<T, Tbuffer> { + fn reuse_buffer() -> BufferResource<Tbuffer> { + //error!("send reuse_buffer"); + let mut tmp = None; + tmp <-> self.buffer; + option::unwrap(move tmp) + } +} +#[cfg(stage1)] +#[cfg(stage2)] +impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> { fn unwrap() -> *Packet<T> { let mut p = None; p <-> self.p; @@ -879,13 +1062,25 @@ impl<T: Owned, Tbuffer: Owned> SendPacketBuffered<T, Tbuffer> { /// Represents the receive end of a pipe. It can receive exactly one /// message. +#[cfg(stage0)] pub type RecvPacket<T: Owned> = RecvPacketBuffered<T, Packet<T>>; +#[cfg(stage1)] +#[cfg(stage2)] +pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>; #[doc(hidden)] +#[cfg(stage0)] pub fn RecvPacket<T: Owned>(p: *Packet<T>) -> RecvPacket<T> { RecvPacketBuffered(p) } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> { + RecvPacketBuffered(p) +} +#[cfg(stage0)] pub struct RecvPacketBuffered<T: Owned, Tbuffer: Owned> { mut p: Option<*Packet<T>>, mut buffer: Option<BufferResource<Tbuffer>>, @@ -904,6 +1099,29 @@ pub struct RecvPacketBuffered<T: Owned, Tbuffer: Owned> { // } else { "some" }); } } } +#[cfg(stage1)] +#[cfg(stage2)] +pub struct RecvPacketBuffered<T, Tbuffer> { + mut p: Option<*Packet<T>>, + mut buffer: Option<BufferResource<Tbuffer>>, +} + +impl<T:Owned, Tbuffer:Owned> RecvPacketBuffered<T,Tbuffer> : ::ops::Drop { + fn finalize(&self) { + //if self.p != none { + // debug!("drop recv %?", option::get(self.p)); + //} + if self.p != None { + let mut p = None; + p <-> self.p; + receiver_terminate(option::unwrap(move p)) + } + //unsafe { error!("recv_drop: %?", + // if self.buffer == none { + // "none" + // } else { "some" }); } + } +} impl<T: Owned, Tbuffer: Owned> RecvPacketBuffered<T, Tbuffer> { fn unwrap() -> *Packet<T> { @@ -934,6 +1152,7 @@ impl<T: Owned, Tbuffer: Owned> RecvPacketBuffered<T, Tbuffer> : Selectable { } } +#[cfg(stage0)] pub fn RecvPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>) -> RecvPacketBuffered<T, Tbuffer> { //debug!("take recv %?", p); @@ -945,12 +1164,33 @@ pub fn RecvPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>) } } } +#[cfg(stage1)] +#[cfg(stage2)] +pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>) + -> RecvPacketBuffered<T,Tbuffer> { + //debug!("take recv %?", p); + RecvPacketBuffered { + p: Some(p), + buffer: unsafe { + Some(BufferResource( + get_buffer(ptr::addr_of(&((*p).header))))) + } + } +} #[doc(hidden)] +#[cfg(stage0)] pub fn entangle<T: Owned>() -> (SendPacket<T>, RecvPacket<T>) { let p = packet(); (SendPacket(p), RecvPacket(p)) } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) { + let p = packet(); + (SendPacket(p), RecvPacket(p)) +} /** Spawn a task to provide a service. @@ -1042,24 +1282,50 @@ pub trait Peekable<T> { } #[doc(hidden)] +#[cfg(stage0)] struct Chan_<T:Owned> { - mut endp: Option<streamp::client::Open<T>>, + mut endp: Option<streamp::client::Open<T>> +} +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +struct Chan_<T> { + mut endp: Option<streamp::client::Open<T>> } /// An endpoint that can send many messages. +#[cfg(stage0)] pub enum Chan<T:Owned> { Chan_(Chan_<T>) } +#[cfg(stage1)] +#[cfg(stage2)] +pub enum Chan<T> { + Chan_(Chan_<T>) +} #[doc(hidden)] +#[cfg(stage0)] struct Port_<T:Owned> { mut endp: Option<streamp::server::Open<T>>, } +#[doc(hidden)] +#[cfg(stage1)] +#[cfg(stage2)] +struct Port_<T> { + mut endp: Option<streamp::server::Open<T>>, +} /// An endpoint that can receive many messages. +#[cfg(stage0)] pub enum Port<T:Owned> { Port_(Port_<T>) } +#[cfg(stage1)] +#[cfg(stage2)] +pub enum Port<T> { + Port_(Port_<T>) +} /** Creates a `(chan, port)` pair. @@ -1145,9 +1411,15 @@ impl<T: Owned> Port<T>: Selectable { } /// Treat many ports as one. +#[cfg(stage0)] pub struct PortSet<T: Owned> { mut ports: ~[pipes::Port<T>], } +#[cfg(stage1)] +#[cfg(stage2)] +pub struct PortSet<T> { + mut ports: ~[pipes::Port<T>], +} pub fn PortSet<T: Owned>() -> PortSet<T>{ PortSet { @@ -1210,7 +1482,11 @@ impl<T: Owned> PortSet<T> : Peekable<T> { } /// A channel that can be shared between many senders. +#[cfg(stage0)] pub type SharedChan<T: Owned> = private::Exclusive<Chan<T>>; +#[cfg(stage1)] +#[cfg(stage2)] +pub type SharedChan<T> = private::Exclusive<Chan<T>>; impl<T: Owned> SharedChan<T>: GenericChan<T> { fn send(x: T) { @@ -1278,9 +1554,17 @@ proto! oneshot ( ) /// The send end of a oneshot pipe. +#[cfg(stage0)] pub type ChanOne<T: Owned> = oneshot::client::Oneshot<T>; +#[cfg(stage1)] +#[cfg(stage2)] +pub type ChanOne<T> = oneshot::client::Oneshot<T>; /// The receive end of a oneshot pipe. +#[cfg(stage0)] pub type PortOne<T: Owned> = oneshot::server::Oneshot<T>; +#[cfg(stage1)] +#[cfg(stage2)] +pub type PortOne<T> = oneshot::server::Oneshot<T>; /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { diff --git a/src/libcore/private.rs b/src/libcore/private.rs index 332c763f151..b6ac711d764 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -238,7 +238,7 @@ pub unsafe fn unwrap_shared_mutable_state<T: Owned>(rc: SharedMutableState<T>) * Data races between tasks can result in crashes and, with sufficient * cleverness, arbitrary type coercion. */ -pub type SharedMutableState<T: Owned> = ArcDestruct<T>; +pub type SharedMutableState<T> = ArcDestruct<T>; pub unsafe fn shared_mutable_state<T: Owned>(data: T) -> SharedMutableState<T> { @@ -341,11 +341,11 @@ impl LittleLock { } } -struct ExData<T: Owned> { lock: LittleLock, mut failed: bool, mut data: T, } +struct ExData<T> { lock: LittleLock, mut failed: bool, mut data: T, } /** * An arc over mutable data that is protected by a lock. For library use only. */ -pub struct Exclusive<T: Owned> { x: SharedMutableState<ExData<T>> } +pub struct Exclusive<T> { x: SharedMutableState<ExData<T>> } pub fn exclusive<T:Owned >(user_data: T) -> Exclusive<T> { let data = ExData { diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs index 69319abc009..ee20fb665be 100644 --- a/src/libcore/private/global.rs +++ b/src/libcore/private/global.rs @@ -41,7 +41,7 @@ use sys::Closure; use task::spawn; use uint; -pub type GlobalDataKey<T: Owned> = &fn(v: T); +pub type GlobalDataKey<T> = &fn(v: T); pub unsafe fn global_data_clone_create<T: Owned Clone>( key: GlobalDataKey<T>, create: &fn() -> ~T) -> T { diff --git a/src/libcore/reflect.rs b/src/libcore/reflect.rs index 55eb53bc026..81a36e1ae13 100644 --- a/src/libcore/reflect.rs +++ b/src/libcore/reflect.rs @@ -41,7 +41,7 @@ pub fn align(size: uint, align: uint) -> uint { } /// Adaptor to wrap around visitors implementing MovePtr. -pub struct MovePtrAdaptor<V: TyVisitor MovePtr> { +pub struct MovePtrAdaptor<V> { inner: V } pub fn MovePtrAdaptor<V: TyVisitor MovePtr>(v: V) -> MovePtrAdaptor<V> { diff --git a/src/libcore/task/local_data.rs b/src/libcore/task/local_data.rs index 05a4e35b249..42765ef139f 100644 --- a/src/libcore/task/local_data.rs +++ b/src/libcore/task/local_data.rs @@ -45,7 +45,7 @@ use task; * * These two cases aside, the interface is safe. */ -pub type LocalDataKey<T: Durable> = &fn(v: @T); +pub type LocalDataKey<T> = &fn(v: @T); /** * Remove a task-local data value from the table, returning the diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index 0a2f6634214..3db6fa00f16 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -77,7 +77,7 @@ use cast; use container::Map; use oldcomm; use option; -use pipes::{Chan, GenericChan, GenericPort, Port}; +use pipes::{Chan, GenericChan, GenericPort, Port, stream}; use pipes; use prelude::*; use private; |
