diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-05-17 17:47:10 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-05-17 17:54:32 -0700 |
| commit | df9e41278eb1e3e653ccd6b4dfab4d7303f64c02 (patch) | |
| tree | 440984f7869286692b266c5a9d95dced081f9882 | |
| parent | 26becc308e4b9a0f5be1c7c2895c7761b778e01f (diff) | |
| download | rust-df9e41278eb1e3e653ccd6b4dfab4d7303f64c02.tar.gz rust-df9e41278eb1e3e653ccd6b4dfab4d7303f64c02.zip | |
core: Wire up `stream` to newsched
| -rw-r--r-- | src/libcore/comm.rs | 349 | ||||
| -rw-r--r-- | src/libcore/rt/comm.rs | 248 | ||||
| -rw-r--r-- | src/libcore/rt/local_services.rs | 2 |
3 files changed, 357 insertions, 242 deletions
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index da3ae0e6c5d..59eb915c239 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -25,7 +25,7 @@ use unstable::sync::{Exclusive, exclusive}; use rtcomm = rt::comm; use rt; -use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; +use pipes::{wait_many, PacketHeader}; // FIXME #5160: Making this public exposes some plumbing from // pipes. Needs some refactoring @@ -61,76 +61,14 @@ pub trait Peekable<T> { fn peek(&self) -> bool; } - -// Streams - Make pipes a little easier in general. - -/*proto! streamp ( - Open:send<T: Owned> { - data(T) -> Open<T> - } -)*/ - -#[allow(non_camel_case_types)] -pub mod streamp { - priv use core::kinds::Owned; - - pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) { - pub use core::pipes::HasBuffer; - ::core::pipes::entangle() - } - - #[allow(non_camel_case_types)] - pub enum Open<T> { pub data(T, server::Open<T>), } - - #[allow(non_camel_case_types)] - pub mod client { - priv use core::kinds::Owned; - - #[allow(non_camel_case_types)] - pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) -> - ::core::option::Option<Open<T>> { - { - use super::data; - let (c, s) = ::core::pipes::entangle(); - let message = data(x_0, s); - if ::core::pipes::send(pipe, message) { - ::core::pipes::rt::make_some(c) - } else { ::core::pipes::rt::make_none() } - } - } - - #[allow(non_camel_case_types)] - pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> { - { - use super::data; - let (c, s) = ::core::pipes::entangle(); - let message = data(x_0, s); - ::core::pipes::send(pipe, message); - c - } - } - - #[allow(non_camel_case_types)] - pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>; - } - - #[allow(non_camel_case_types)] - pub mod server { - #[allow(non_camel_case_types)] - pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>; - } -} - /// An endpoint that can send many messages. -#[unsafe_mut_field(endp)] pub struct Chan<T> { - endp: Option<streamp::client::Open<T>> + inner: Either<pipesy::Chan<T>, rtcomm::Chan<T>> } /// An endpoint that can receive many messages. -#[unsafe_mut_field(endp)] pub struct Port<T> { - endp: Option<streamp::server::Open<T>>, + inner: Either<pipesy::Port<T>, rtcomm::Port<T>> } /** Creates a `(Port, Chan)` pair. @@ -139,100 +77,75 @@ These allow sending or receiving an unlimited number of messages. */ pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) { - let (c, s) = streamp::init(); - - (Port { - endp: Some(s) - }, Chan { - endp: Some(c) - }) + let (port, chan) = match rt::context() { + rt::OldTaskContext => match pipesy::stream() { + (p, c) => (Left(p), Left(c)) + }, + _ => match rtcomm::stream() { + (p, c) => (Right(p), Right(c)) + } + }; + let port = Port { inner: port }; + let chan = Chan { inner: chan }; + return (port, chan); } impl<T: Owned> GenericChan<T> for Chan<T> { - #[inline(always)] fn send(&self, x: T) { - unsafe { - let self_endp = transmute_mut(&self.endp); - let endp = replace(self_endp, None); - *self_endp = Some(streamp::client::data(endp.unwrap(), x)) + match self.inner { + Left(ref chan) => chan.send(x), + Right(ref chan) => chan.send(x) } } } impl<T: Owned> GenericSmartChan<T> for Chan<T> { - #[inline(always)] fn try_send(&self, x: T) -> bool { - unsafe { - let self_endp = transmute_mut(&self.endp); - let endp = replace(self_endp, None); - match streamp::client::try_data(endp.unwrap(), x) { - Some(next) => { - *self_endp = Some(next); - true - } - None => false - } + match self.inner { + Left(ref chan) => chan.try_send(x), + Right(ref chan) => chan.try_send(x) } } } impl<T: Owned> GenericPort<T> for Port<T> { - #[inline(always)] fn recv(&self) -> T { - unsafe { - let self_endp = transmute_mut(&self.endp); - let endp = replace(self_endp, None); - let streamp::data(x, endp) = recv(endp.unwrap()); - *self_endp = Some(endp); - x + match self.inner { + Left(ref port) => port.recv(), + Right(ref port) => port.recv() } } - #[inline(always)] fn try_recv(&self) -> Option<T> { - unsafe { - let self_endp = transmute_mut(&self.endp); - let endp = replace(self_endp, None); - match try_recv(endp.unwrap()) { - Some(streamp::data(x, endp)) => { - *self_endp = Some(endp); - Some(x) - } - None => None - } + match self.inner { + Left(ref port) => port.try_recv(), + Right(ref port) => port.try_recv() } } } impl<T: Owned> Peekable<T> for Port<T> { - #[inline(always)] fn peek(&self) -> bool { - unsafe { - let self_endp = transmute_mut(&self.endp); - let mut endp = replace(self_endp, None); - let peek = match endp { - Some(ref mut endp) => peek(endp), - None => fail!("peeking empty stream") - }; - *self_endp = endp; - peek + match self.inner { + Left(ref port) => port.peek(), + Right(ref port) => port.peek() } } } impl<T: Owned> Selectable for Port<T> { fn header(&mut self) -> *mut PacketHeader { - match self.endp { - Some(ref mut endp) => endp.header(), - None => fail!("peeking empty stream") - } + match self.inner { + Left(ref mut port) => port.header(), + Right(_) => fail!("can't select on newsched ports") + } } } /// Treat many ports as one. #[unsafe_mut_field(ports)] pub struct PortSet<T> { - ports: ~[Port<T>], + ports: ~[pipesy::Port<T>], } pub impl<T: Owned> PortSet<T> { @@ -243,6 +156,11 @@ pub impl<T: Owned> PortSet<T> { } fn add(&self, port: Port<T>) { + let Port { inner } = port; + let port = match inner { + Left(p) => p, + Right(_) => fail!("PortSet not implemented") + }; unsafe { let self_ports = transmute_mut(&self.ports); self_ports.push(port) @@ -290,7 +208,7 @@ impl<T: Owned> Peekable<T> for PortSet<T> { // It'd be nice to use self.port.each, but that version isn't // pure. for uint::range(0, vec::uniq_len(&const self.ports)) |i| { - let port: &Port<T> = &self.ports[i]; + let port: &pipesy::Port<T> = &self.ports[i]; if port.peek() { return true; } @@ -301,12 +219,17 @@ impl<T: Owned> Peekable<T> for PortSet<T> { /// A channel that can be shared between many senders. pub struct SharedChan<T> { - ch: Exclusive<Chan<T>> + ch: Exclusive<pipesy::Chan<T>> } impl<T: Owned> SharedChan<T> { /// Converts a `chan` into a `shared_chan`. pub fn new(c: Chan<T>) -> SharedChan<T> { + let Chan { inner } = c; + let c = match inner { + Left(c) => c, + Right(_) => fail!("SharedChan not implemented") + }; SharedChan { ch: exclusive(c) } } } @@ -354,12 +277,8 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { (p, c) => (Right(p), Right(c)) } }; - let port = PortOne { - inner: port - }; - let chan = ChanOne { - inner: chan - }; + let port = PortOne { inner: port }; + let chan = ChanOne { inner: chan }; return (port, chan); } @@ -435,7 +354,10 @@ mod pipesy { use kinds::Owned; use option::{Option, Some, None}; - use pipes::{recv, try_recv}; + use pipes::{recv, try_recv, peek, PacketHeader}; + use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable}; + use cast::transmute_mut; + use util::replace; /*proto! oneshot ( Oneshot:send<T:Owned> { @@ -610,6 +532,173 @@ mod pipesy { } } + // Streams - Make pipes a little easier in general. + + /*proto! streamp ( + Open:send<T: Owned> { + data(T) -> Open<T> + } + )*/ + + #[allow(non_camel_case_types)] + pub mod streamp { + priv use core::kinds::Owned; + + pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) { + pub use core::pipes::HasBuffer; + ::core::pipes::entangle() + } + + #[allow(non_camel_case_types)] + pub enum Open<T> { pub data(T, server::Open<T>), } + + #[allow(non_camel_case_types)] + pub mod client { + priv use core::kinds::Owned; + + #[allow(non_camel_case_types)] + pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) -> + ::core::option::Option<Open<T>> { + { + use super::data; + let (c, s) = ::core::pipes::entangle(); + let message = data(x_0, s); + if ::core::pipes::send(pipe, message) { + ::core::pipes::rt::make_some(c) + } else { ::core::pipes::rt::make_none() } + } + } + + #[allow(non_camel_case_types)] + pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> { + { + use super::data; + let (c, s) = ::core::pipes::entangle(); + let message = data(x_0, s); + ::core::pipes::send(pipe, message); + c + } + } + + #[allow(non_camel_case_types)] + pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>; + } + + #[allow(non_camel_case_types)] + pub mod server { + #[allow(non_camel_case_types)] + pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>; + } + } + + /// An endpoint that can send many messages. + #[unsafe_mut_field(endp)] + pub struct Chan<T> { + endp: Option<streamp::client::Open<T>> + } + + /// An endpoint that can receive many messages. + #[unsafe_mut_field(endp)] + pub struct Port<T> { + endp: Option<streamp::server::Open<T>>, + } + + /** Creates a `(Port, Chan)` pair. + + These allow sending or receiving an unlimited number of messages. + + */ + pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) { + let (c, s) = streamp::init(); + + (Port { + endp: Some(s) + }, Chan { + endp: Some(c) + }) + } + + impl<T: Owned> GenericChan<T> for Chan<T> { + #[inline(always)] + fn send(&self, x: T) { + unsafe { + let self_endp = transmute_mut(&self.endp); + let endp = replace(self_endp, None); + *self_endp = Some(streamp::client::data(endp.unwrap(), x)) + } + } + } + + impl<T: Owned> GenericSmartChan<T> for Chan<T> { + #[inline(always)] + fn try_send(&self, x: T) -> bool { + unsafe { + let self_endp = transmute_mut(&self.endp); + let endp = replace(self_endp, None); + match streamp::client::try_data(endp.unwrap(), x) { + Some(next) => { + *self_endp = Some(next); + true + } + None => false + } + } + } + } + + impl<T: Owned> GenericPort<T> for Port<T> { + #[inline(always)] + fn recv(&self) -> T { + unsafe { + let self_endp = transmute_mut(&self.endp); + let endp = replace(self_endp, None); + let streamp::data(x, endp) = recv(endp.unwrap()); + *self_endp = Some(endp); + x + } + } + + #[inline(always)] + fn try_recv(&self) -> Option<T> { + unsafe { + let self_endp = transmute_mut(&self.endp); + let endp = replace(self_endp, None); + match try_recv(endp.unwrap()) { + Some(streamp::data(x, endp)) => { + *self_endp = Some(endp); + Some(x) + } + None => None + } + } + } + } + + impl<T: Owned> Peekable<T> for Port<T> { + #[inline(always)] + fn peek(&self) -> bool { + unsafe { + let self_endp = transmute_mut(&self.endp); + let mut endp = replace(self_endp, None); + let peek = match endp { + Some(ref mut endp) => peek(endp), + None => fail!("peeking empty stream") + }; + *self_endp = endp; + peek + } + } + } + + impl<T: Owned> Selectable for Port<T> { + fn header(&mut self) -> *mut PacketHeader { + match self.endp { + Some(ref mut endp) => endp.header(), + None => fail!("peeking empty stream") + } + } +} + } /// Returns the index of an endpoint that is ready to receive. diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index 9fcb70cfc7d..4b5732b2d3a 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -8,6 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! Ports and channels. +//! +//! XXX: Carefully consider whether the sequentially consistent +//! atomics here can be converted to acq/rel. I'm not sure they can, +//! because there is data being transerred in both directions (the payload +//! goes from sender to receiver and the task pointer goes the other way). + use option::*; use cast; use util; @@ -29,33 +36,37 @@ use cell::Cell; /// /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context -/// * <ptr> - A pointer to a Task that can be transmuted to ~Task +/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task type State = int; static STATE_BOTH: State = 2; static STATE_ONE: State = 1; +/// The heap-allocated structure shared between two endpoints. struct Packet<T> { state: State, payload: Option<T>, } -pub struct PortOne<T> { +/// A one-shot channel. +pub struct ChanOne<T> { // XXX: Hack extra allocation to make by-val self work - inner: ~PortOneHack<T> + inner: ~ChanOneHack<T> } -pub struct ChanOne<T> { + +/// A one-shot port. +pub struct PortOne<T> { // XXX: Hack extra allocation to make by-val self work - inner: ~ChanOneHack<T> + inner: ~PortOneHack<T> } -pub struct PortOneHack<T> { +pub struct ChanOneHack<T> { void_packet: *mut Void, suppress_finalize: bool } -pub struct ChanOneHack<T> { +pub struct PortOneHack<T> { void_packet: *mut Void, suppress_finalize: bool } @@ -84,6 +95,54 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { } } +impl<T> ChanOne<T> { + + pub fn send(self, val: T) { + self.try_send(val); + } + + pub fn try_send(self, val: T) -> bool { + let mut this = self; + let mut recvr_active = true; + let packet = this.inner.packet(); + + unsafe { + + // Install the payload + assert!((*packet).payload.is_none()); + (*packet).payload = Some(val); + + // Atomically swap out the old state to figure out what + // the port's up to, issuing a release barrier to prevent + // reordering of the payload write. This also issues an + // acquire barrier that keeps the subsequent access of the + // ~Task pointer from being reordered. + let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE); + match oldstate { + STATE_BOTH => { + // Port is not waiting yet. Nothing to do + } + STATE_ONE => { + // Port has closed. Need to clean up. + let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet); + recvr_active = false; + } + task_as_state => { + // Port is blocked. Wake it up. + let recvr: ~Coroutine = cast::transmute(task_as_state); + let sched = local_sched::take(); + sched.schedule_task(recvr); + } + } + } + + // Suppress the synchronizing actions in the finalizer. We're done with the packet. + this.inner.suppress_finalize = true; + return recvr_active; + } +} + + impl<T> PortOne<T> { pub fn recv(self) -> T { match self.try_recv() { @@ -96,30 +155,31 @@ impl<T> PortOne<T> { pub fn try_recv(self) -> Option<T> { let mut this = self; - - { - let self_ptr: *mut PortOne<T> = &mut this; - - // XXX: Optimize this to not require the two context switches when data is available - - // Switch to the scheduler - let sched = local_sched::take(); - do sched.deschedule_running_task_and_then |task| { - unsafe { - let task_as_state: State = cast::transmute(task); - let oldstate = atomic_xchg(&mut (*(*self_ptr).inner.packet()).state, task_as_state); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - } - STATE_ONE => { - // Channel is closed. Switch back and check the data. - let task: ~Coroutine = cast::transmute(task_as_state); - let sched = local_sched::take(); - sched.resume_task_immediately(task); - } - _ => util::unreachable() + let packet = this.inner.packet(); + + // XXX: Optimize this to not require the two context switches when data is available + + // Switch to the scheduler to put the ~Task into the Packet state. + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + unsafe { + // Atomically swap the task pointer into the Packet state, issuing + // an acquire barrier to prevent reordering of the subsequent read + // of the payload. Also issues a release barrier to prevent reordering + // of any previous writes to the task structure. + let task_as_state: State = cast::transmute(task); + let oldstate = atomic_xchg(&mut (*packet).state, task_as_state); + match oldstate { + STATE_BOTH => { + // Data has not been sent. Now we're blocked. + } + STATE_ONE => { + // Channel is closed. Switch back and check the data. + let task: ~Coroutine = cast::transmute(task_as_state); + let sched = local_sched::take(); + sched.resume_task_immediately(task); } + _ => util::unreachable() } } } @@ -130,20 +190,20 @@ impl<T> PortOne<T> { // payload. Some scenarios: // // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine. - // 2) We encountered STATE_BOTH above and blocked. The sending task work-stole us - // and ran on its thread. The work stealing had a memory barrier. + // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us + // and ran on its thread. The sending task issued a read barrier when taking the + // pointer to the receiving task. // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) // is pinned to some other scheduler, so the sending task had to give us to // a different scheduler for resuming. That send synchronized memory. unsafe { - let payload = util::replace(&mut (*this.inner.packet()).payload, None); + let payload = util::replace(&mut (*packet).payload, None); // The sender has closed up shop. Drop the packet. let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet); - // Supress the finalizer. We're done here. + // Suppress the synchronizing actions in the finalizer. We're done with the packet. this.inner.suppress_finalize = true; - return payload; } } @@ -167,47 +227,8 @@ impl<T> Peekable<T> for PortOne<T> { } } -impl<T> ChanOne<T> { - - pub fn send(self, val: T) { - self.try_send(val); - } - - pub fn try_send(self, val: T) -> bool { - let mut this = self; - let mut recvr_active = true; - - unsafe { - assert!((*this.inner.packet()).payload.is_none()); - (*this.inner.packet()).payload = Some(val); - - let oldstate = atomic_xchg(&mut (*this.inner.packet()).state, STATE_ONE); - match oldstate { - STATE_BOTH => { - // Port is not recving yet. Nothing to do - } - STATE_ONE => { - // Port has closed. Need to clean up. - let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet); - recvr_active = false; - } - _ => { - // Port is blocked. Wake it up. - let recvr: ~Coroutine = cast::transmute(oldstate); - let sched = local_sched::take(); - sched.schedule_task(recvr); - } - } - } - - // Suppress the finalizer. We're done here. - this.inner.suppress_finalize = true; - return recvr_active; - } -} - #[unsafe_destructor] -impl<T> Drop for PortOneHack<T> { +impl<T> Drop for ChanOneHack<T> { fn finalize(&self) { if self.suppress_finalize { return } @@ -216,13 +237,17 @@ impl<T> Drop for PortOneHack<T> { let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); match oldstate { STATE_BOTH => { - /* cleanup is the chan's responsibility */ + // Port still active. It will destroy the Packet. }, STATE_ONE => { let _packet: ~Packet<T> = cast::transmute(this.void_packet); - } - _ => { - util::unreachable() + }, + task_as_state => { + // The port is blocked waiting for a message we will never send. Wake it. + assert!((*this.packet()).payload.is_none()); + let recvr: ~Coroutine = cast::transmute(task_as_state); + let sched = local_sched::take(); + sched.schedule_task(recvr); } } } @@ -230,7 +255,7 @@ impl<T> Drop for PortOneHack<T> { } #[unsafe_destructor] -impl<T> Drop for ChanOneHack<T> { +impl<T> Drop for PortOneHack<T> { fn finalize(&self) { if self.suppress_finalize { return } @@ -239,24 +264,20 @@ impl<T> Drop for ChanOneHack<T> { let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); match oldstate { STATE_BOTH => { - /* cleanup is the port's responsibility */ + // Chan still active. It will destroy the packet. }, STATE_ONE => { let _packet: ~Packet<T> = cast::transmute(this.void_packet); - }, + } _ => { - // The port is blocked recving for a message we will never send. Wake it. - assert!((*this.packet()).payload.is_none()); - let recvr: ~Coroutine = cast::transmute(oldstate); - let sched = local_sched::take(); - sched.schedule_task(recvr); + util::unreachable() } } } } } -impl<T> PortOneHack<T> { +impl<T> ChanOneHack<T> { fn packet(&self) -> *mut Packet<T> { unsafe { let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); @@ -266,7 +287,7 @@ impl<T> PortOneHack<T> { } } -impl<T> ChanOneHack<T> { +impl<T> PortOneHack<T> { fn packet(&self) -> *mut Packet<T> { unsafe { let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); @@ -276,18 +297,23 @@ impl<T> ChanOneHack<T> { } } -struct StreamPayload<T>(T, PortOne<StreamPayload<T>>); - -pub struct Port<T> { - // FIXME #5372. Using Cell because we don't take &mut self - next: Cell<PortOne<StreamPayload<T>>> +struct StreamPayload<T> { + val: T, + next: PortOne<StreamPayload<T>> } +/// A channel with unbounded size. pub struct Chan<T> { // FIXME #5372. Using Cell because we don't take &mut self next: Cell<ChanOne<StreamPayload<T>>> } +/// An port with unbounded size. +pub struct Port<T> { + // FIXME #5372. Using Cell because we don't take &mut self + next: Cell<PortOne<StreamPayload<T>>> +} + pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) { let (pone, cone) = oneshot(); let port = Port { next: Cell(pone) }; @@ -295,6 +321,21 @@ pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) { return (port, chan); } +impl<T: Owned> GenericChan<T> for Chan<T> { + fn send(&self, val: T) { + self.try_send(val); + } +} + +impl<T: Owned> GenericSmartChan<T> for Chan<T> { + fn try_send(&self, val: T) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send(StreamPayload { val: val, next: next_pone }) + } +} + impl<T> GenericPort<T> for Port<T> { fn recv(&self) -> T { match self.try_recv() { @@ -308,7 +349,7 @@ impl<T> GenericPort<T> for Port<T> { fn try_recv(&self) -> Option<T> { let pone = self.next.take(); match pone.try_recv() { - Some(StreamPayload(val, next)) => { + Some(StreamPayload { val, next }) => { self.next.put_back(next); Some(val) } @@ -323,21 +364,6 @@ impl<T> Peekable<T> for Port<T> { } } -impl<T: Owned> GenericChan<T> for Chan<T> { - fn send(&self, val: T) { - self.try_send(val); - } -} - -impl<T: Owned> GenericSmartChan<T> for Chan<T> { - fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload(val, next_pone)) - } -} - #[cfg(test)] mod test { use super::*; @@ -563,7 +589,7 @@ mod test { } #[test] - fn stream_send_recv() { + fn stream_send_recv_stress() { for stress_factor().times { do run_in_newsched_task { let (port, chan) = stream::<~int>(); diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs index 35c703bb350..8d6873be8cd 100644 --- a/src/libcore/rt/local_services.rs +++ b/src/libcore/rt/local_services.rs @@ -261,7 +261,7 @@ mod test { use comm::*; do run_in_newsched_task() { - let (port, chan) = oneshot(); + let (port, chan) = stream(); chan.send(10); assert!(port.recv() == 10); } |
