about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-17 17:47:10 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-17 17:54:32 -0700
commitdf9e41278eb1e3e653ccd6b4dfab4d7303f64c02 (patch)
tree440984f7869286692b266c5a9d95dced081f9882
parent26becc308e4b9a0f5be1c7c2895c7761b778e01f (diff)
downloadrust-df9e41278eb1e3e653ccd6b4dfab4d7303f64c02.tar.gz
rust-df9e41278eb1e3e653ccd6b4dfab4d7303f64c02.zip
core: Wire up `stream` to newsched
-rw-r--r--src/libcore/comm.rs349
-rw-r--r--src/libcore/rt/comm.rs248
-rw-r--r--src/libcore/rt/local_services.rs2
3 files changed, 357 insertions, 242 deletions
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs
index da3ae0e6c5d..59eb915c239 100644
--- a/src/libcore/comm.rs
+++ b/src/libcore/comm.rs
@@ -25,7 +25,7 @@ use unstable::sync::{Exclusive, exclusive};
 use rtcomm = rt::comm;
 use rt;
 
-use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
+use pipes::{wait_many, PacketHeader};
 
 // FIXME #5160: Making this public exposes some plumbing from
 // pipes. Needs some refactoring
@@ -61,76 +61,14 @@ pub trait Peekable<T> {
     fn peek(&self) -> bool;
 }
 
-
-// Streams - Make pipes a little easier in general.
-
-/*proto! streamp (
-    Open:send<T: Owned> {
-        data(T) -> Open<T>
-    }
-)*/
-
-#[allow(non_camel_case_types)]
-pub mod streamp {
-    priv use core::kinds::Owned;
-
-    pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) {
-        pub use core::pipes::HasBuffer;
-        ::core::pipes::entangle()
-    }
-
-    #[allow(non_camel_case_types)]
-    pub enum Open<T> { pub data(T, server::Open<T>), }
-
-    #[allow(non_camel_case_types)]
-    pub mod client {
-        priv use core::kinds::Owned;
-
-        #[allow(non_camel_case_types)]
-        pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) ->
-            ::core::option::Option<Open<T>> {
-            {
-                use super::data;
-                let (c, s) = ::core::pipes::entangle();
-                let message = data(x_0, s);
-                if ::core::pipes::send(pipe, message) {
-                    ::core::pipes::rt::make_some(c)
-                } else { ::core::pipes::rt::make_none() }
-            }
-        }
-
-        #[allow(non_camel_case_types)]
-        pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> {
-            {
-                use super::data;
-                let (c, s) = ::core::pipes::entangle();
-                let message = data(x_0, s);
-                ::core::pipes::send(pipe, message);
-                c
-            }
-        }
-
-        #[allow(non_camel_case_types)]
-        pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>;
-    }
-
-    #[allow(non_camel_case_types)]
-    pub mod server {
-        #[allow(non_camel_case_types)]
-        pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>;
-    }
-}
-
 /// An endpoint that can send many messages.
-#[unsafe_mut_field(endp)]
 pub struct Chan<T> {
-    endp: Option<streamp::client::Open<T>>
+    inner: Either<pipesy::Chan<T>, rtcomm::Chan<T>>
 }
 
 /// An endpoint that can receive many messages.
-#[unsafe_mut_field(endp)]
 pub struct Port<T> {
-    endp: Option<streamp::server::Open<T>>,
+    inner: Either<pipesy::Port<T>, rtcomm::Port<T>>
 }
 
 /** Creates a `(Port, Chan)` pair.
@@ -139,100 +77,75 @@ These allow sending or receiving an unlimited number of messages.
 
 */
 pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
-    let (c, s) = streamp::init();
-
-    (Port {
-        endp: Some(s)
-    }, Chan {
-        endp: Some(c)
-    })
+    let (port, chan) = match rt::context() {
+        rt::OldTaskContext => match pipesy::stream() {
+            (p, c) => (Left(p), Left(c))
+        },
+        _ => match rtcomm::stream() {
+            (p, c) => (Right(p), Right(c))
+        }
+    };
+    let port = Port { inner: port };
+    let chan = Chan { inner: chan };
+    return (port, chan);
 }
 
 impl<T: Owned> GenericChan<T> for Chan<T> {
-    #[inline(always)]
     fn send(&self, x: T) {
-        unsafe {
-            let self_endp = transmute_mut(&self.endp);
-            let endp = replace(self_endp, None);
-            *self_endp = Some(streamp::client::data(endp.unwrap(), x))
+        match self.inner {
+            Left(ref chan) => chan.send(x),
+            Right(ref chan) => chan.send(x)
         }
     }
 }
 
 impl<T: Owned> GenericSmartChan<T> for Chan<T> {
-    #[inline(always)]
     fn try_send(&self, x: T) -> bool {
-        unsafe {
-            let self_endp = transmute_mut(&self.endp);
-            let endp = replace(self_endp, None);
-            match streamp::client::try_data(endp.unwrap(), x) {
-                Some(next) => {
-                    *self_endp = Some(next);
-                    true
-                }
-                None => false
-            }
+        match self.inner {
+            Left(ref chan) => chan.try_send(x),
+            Right(ref chan) => chan.try_send(x)
         }
     }
 }
 
 impl<T: Owned> GenericPort<T> for Port<T> {
-    #[inline(always)]
     fn recv(&self) -> T {
-        unsafe {
-            let self_endp = transmute_mut(&self.endp);
-            let endp = replace(self_endp, None);
-            let streamp::data(x, endp) = recv(endp.unwrap());
-            *self_endp = Some(endp);
-            x
+        match self.inner {
+            Left(ref port) => port.recv(),
+            Right(ref port) => port.recv()
         }
     }
 
-    #[inline(always)]
     fn try_recv(&self) -> Option<T> {
-        unsafe {
-            let self_endp = transmute_mut(&self.endp);
-            let endp = replace(self_endp, None);
-            match try_recv(endp.unwrap()) {
-                Some(streamp::data(x, endp)) => {
-                    *self_endp = Some(endp);
-                    Some(x)
-                }
-                None => None
-            }
+        match self.inner {
+            Left(ref port) => port.try_recv(),
+            Right(ref port) => port.try_recv()
         }
     }
 }
 
 impl<T: Owned> Peekable<T> for Port<T> {
-    #[inline(always)]
     fn peek(&self) -> bool {
-        unsafe {
-            let self_endp = transmute_mut(&self.endp);
-            let mut endp = replace(self_endp, None);
-            let peek = match endp {
-                Some(ref mut endp) => peek(endp),
-                None => fail!("peeking empty stream")
-            };
-            *self_endp = endp;
-            peek
+        match self.inner {
+            Left(ref port) => port.peek(),
+            Right(ref port) => port.peek()
         }
     }
 }
 
 impl<T: Owned> Selectable for Port<T> {
     fn header(&mut self) -> *mut PacketHeader {
-            match self.endp {
-                Some(ref mut endp) => endp.header(),
-                None => fail!("peeking empty stream")
-            }
+        match self.inner {
+            Left(ref mut port) => port.header(),
+            Right(_) => fail!("can't select on newsched ports")
+        }
     }
 }
 
 /// Treat many ports as one.
 #[unsafe_mut_field(ports)]
 pub struct PortSet<T> {
-    ports: ~[Port<T>],
+    ports: ~[pipesy::Port<T>],
 }
 
 pub impl<T: Owned> PortSet<T> {
@@ -243,6 +156,11 @@ pub impl<T: Owned> PortSet<T> {
     }
 
     fn add(&self, port: Port<T>) {
+        let Port { inner } = port;
+        let port = match inner {
+            Left(p) => p,
+            Right(_) => fail!("PortSet not implemented")
+        };
         unsafe {
             let self_ports = transmute_mut(&self.ports);
             self_ports.push(port)
@@ -290,7 +208,7 @@ impl<T: Owned> Peekable<T> for PortSet<T> {
         // It'd be nice to use self.port.each, but that version isn't
         // pure.
         for uint::range(0, vec::uniq_len(&const self.ports)) |i| {
-            let port: &Port<T> = &self.ports[i];
+            let port: &pipesy::Port<T> = &self.ports[i];
             if port.peek() {
                 return true;
             }
@@ -301,12 +219,17 @@ impl<T: Owned> Peekable<T> for PortSet<T> {
 
 /// A channel that can be shared between many senders.
 pub struct SharedChan<T> {
-    ch: Exclusive<Chan<T>>
+    ch: Exclusive<pipesy::Chan<T>>
 }
 
 impl<T: Owned> SharedChan<T> {
     /// Converts a `chan` into a `shared_chan`.
     pub fn new(c: Chan<T>) -> SharedChan<T> {
+        let Chan { inner } = c;
+        let c = match inner {
+            Left(c) => c,
+            Right(_) => fail!("SharedChan not implemented")
+        };
         SharedChan { ch: exclusive(c) }
     }
 }
@@ -354,12 +277,8 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
             (p, c) => (Right(p), Right(c))
         }
     };
-    let port = PortOne {
-        inner: port
-    };
-    let chan = ChanOne {
-        inner: chan
-    };
+    let port = PortOne { inner: port };
+    let chan = ChanOne { inner: chan };
     return (port, chan);
 }
 
@@ -435,7 +354,10 @@ mod pipesy {
 
     use kinds::Owned;
     use option::{Option, Some, None};
-    use pipes::{recv, try_recv};
+    use pipes::{recv, try_recv, peek, PacketHeader};
+    use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable};
+    use cast::transmute_mut;
+    use util::replace;
 
     /*proto! oneshot (
         Oneshot:send<T:Owned> {
@@ -610,6 +532,173 @@ mod pipesy {
         }
     }
 
+    // Streams - Make pipes a little easier in general.
+
+    /*proto! streamp (
+        Open:send<T: Owned> {
+            data(T) -> Open<T>
+        }
+    )*/
+
+    #[allow(non_camel_case_types)]
+    pub mod streamp {
+        priv use core::kinds::Owned;
+
+        pub fn init<T: Owned>() -> (client::Open<T>, server::Open<T>) {
+            pub use core::pipes::HasBuffer;
+            ::core::pipes::entangle()
+        }
+
+        #[allow(non_camel_case_types)]
+        pub enum Open<T> { pub data(T, server::Open<T>), }
+
+        #[allow(non_camel_case_types)]
+        pub mod client {
+            priv use core::kinds::Owned;
+
+            #[allow(non_camel_case_types)]
+            pub fn try_data<T: Owned>(pipe: Open<T>, x_0: T) ->
+                ::core::option::Option<Open<T>> {
+                {
+                    use super::data;
+                    let (c, s) = ::core::pipes::entangle();
+                    let message = data(x_0, s);
+                    if ::core::pipes::send(pipe, message) {
+                        ::core::pipes::rt::make_some(c)
+                    } else { ::core::pipes::rt::make_none() }
+                }
+            }
+
+            #[allow(non_camel_case_types)]
+            pub fn data<T: Owned>(pipe: Open<T>, x_0: T) -> Open<T> {
+                {
+                    use super::data;
+                    let (c, s) = ::core::pipes::entangle();
+                    let message = data(x_0, s);
+                    ::core::pipes::send(pipe, message);
+                    c
+                }
+            }
+
+            #[allow(non_camel_case_types)]
+            pub type Open<T> = ::core::pipes::SendPacket<super::Open<T>>;
+        }
+
+        #[allow(non_camel_case_types)]
+        pub mod server {
+            #[allow(non_camel_case_types)]
+            pub type Open<T> = ::core::pipes::RecvPacket<super::Open<T>>;
+        }
+    }
+
+    /// An endpoint that can send many messages.
+    #[unsafe_mut_field(endp)]
+    pub struct Chan<T> {
+        endp: Option<streamp::client::Open<T>>
+    }
+
+    /// An endpoint that can receive many messages.
+    #[unsafe_mut_field(endp)]
+    pub struct Port<T> {
+        endp: Option<streamp::server::Open<T>>,
+    }
+
+    /** Creates a `(Port, Chan)` pair.
+
+    These allow sending or receiving an unlimited number of messages.
+
+    */
+    pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
+        let (c, s) = streamp::init();
+
+        (Port {
+            endp: Some(s)
+        }, Chan {
+            endp: Some(c)
+        })
+    }
+
+    impl<T: Owned> GenericChan<T> for Chan<T> {
+        #[inline(always)]
+        fn send(&self, x: T) {
+            unsafe {
+                let self_endp = transmute_mut(&self.endp);
+                let endp = replace(self_endp, None);
+                *self_endp = Some(streamp::client::data(endp.unwrap(), x))
+            }
+        }
+    }
+
+    impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+        #[inline(always)]
+        fn try_send(&self, x: T) -> bool {
+            unsafe {
+                let self_endp = transmute_mut(&self.endp);
+                let endp = replace(self_endp, None);
+                match streamp::client::try_data(endp.unwrap(), x) {
+                    Some(next) => {
+                        *self_endp = Some(next);
+                        true
+                    }
+                    None => false
+                }
+            }
+        }
+    }
+
+    impl<T: Owned> GenericPort<T> for Port<T> {
+        #[inline(always)]
+        fn recv(&self) -> T {
+            unsafe {
+                let self_endp = transmute_mut(&self.endp);
+                let endp = replace(self_endp, None);
+                let streamp::data(x, endp) = recv(endp.unwrap());
+                *self_endp = Some(endp);
+                x
+            }
+        }
+
+        #[inline(always)]
+        fn try_recv(&self) -> Option<T> {
+            unsafe {
+                let self_endp = transmute_mut(&self.endp);
+                let endp = replace(self_endp, None);
+                match try_recv(endp.unwrap()) {
+                    Some(streamp::data(x, endp)) => {
+                        *self_endp = Some(endp);
+                        Some(x)
+                    }
+                    None => None
+                }
+            }
+        }
+    }
+
+    impl<T: Owned> Peekable<T> for Port<T> {
+        #[inline(always)]
+        fn peek(&self) -> bool {
+            unsafe {
+                let self_endp = transmute_mut(&self.endp);
+                let mut endp = replace(self_endp, None);
+                let peek = match endp {
+                    Some(ref mut endp) => peek(endp),
+                    None => fail!("peeking empty stream")
+                };
+                *self_endp = endp;
+                peek
+            }
+        }
+    }
+
+    impl<T: Owned> Selectable for Port<T> {
+        fn header(&mut self) -> *mut PacketHeader {
+            match self.endp {
+                Some(ref mut endp) => endp.header(),
+                None => fail!("peeking empty stream")
+            }
+    }
+}
+
 }
 
 /// Returns the index of an endpoint that is ready to receive.
diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs
index 9fcb70cfc7d..4b5732b2d3a 100644
--- a/src/libcore/rt/comm.rs
+++ b/src/libcore/rt/comm.rs
@@ -8,6 +8,13 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+//! Ports and channels.
+//!
+//! XXX: Carefully consider whether the sequentially consistent
+//! atomics here can be converted to acq/rel. I'm not sure they can,
+//! because there is data being transerred in both directions (the payload
+//! goes from sender to receiver and the task pointer goes the other way).
+
 use option::*;
 use cast;
 use util;
@@ -29,33 +36,37 @@ use cell::Cell;
 ///
 /// * 2 - both endpoints are alive
 /// * 1 - either the sender or the receiver is dead, determined by context
-/// * <ptr> - A pointer to a Task that can be transmuted to ~Task
+/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
 type State = int;
 
 static STATE_BOTH: State = 2;
 static STATE_ONE: State = 1;
 
+/// The heap-allocated structure shared between two endpoints.
 struct Packet<T> {
     state: State,
     payload: Option<T>,
 }
 
-pub struct PortOne<T> {
+/// A one-shot channel.
+pub struct ChanOne<T> {
     // XXX: Hack extra allocation to make by-val self work
-    inner: ~PortOneHack<T>
+    inner: ~ChanOneHack<T>
 }
 
-pub struct ChanOne<T> {
+
+/// A one-shot port.
+pub struct PortOne<T> {
     // XXX: Hack extra allocation to make by-val self work
-    inner: ~ChanOneHack<T>
+    inner: ~PortOneHack<T>
 }
 
-pub struct PortOneHack<T> {
+pub struct ChanOneHack<T> {
     void_packet: *mut Void,
     suppress_finalize: bool
 }
 
-pub struct ChanOneHack<T> {
+pub struct PortOneHack<T> {
     void_packet: *mut Void,
     suppress_finalize: bool
 }
@@ -84,6 +95,54 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
     }
 }
 
+impl<T> ChanOne<T> {
+
+    pub fn send(self, val: T) {
+        self.try_send(val);
+    }
+
+    pub fn try_send(self, val: T) -> bool {
+        let mut this = self;
+        let mut recvr_active = true;
+        let packet = this.inner.packet();
+
+        unsafe {
+
+            // Install the payload
+            assert!((*packet).payload.is_none());
+            (*packet).payload = Some(val);
+
+            // Atomically swap out the old state to figure out what
+            // the port's up to, issuing a release barrier to prevent
+            // reordering of the payload write. This also issues an
+            // acquire barrier that keeps the subsequent access of the
+            // ~Task pointer from being reordered.
+            let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    // Port is not waiting yet. Nothing to do
+                }
+                STATE_ONE => {
+                    // Port has closed. Need to clean up.
+                    let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
+                    recvr_active = false;
+                }
+                task_as_state => {
+                    // Port is blocked. Wake it up.
+                    let recvr: ~Coroutine = cast::transmute(task_as_state);
+                    let sched = local_sched::take();
+                    sched.schedule_task(recvr);
+                }
+            }
+        }
+
+        // Suppress the synchronizing actions in the finalizer. We're done with the packet.
+        this.inner.suppress_finalize = true;
+        return recvr_active;
+    }
+}
+
+
 impl<T> PortOne<T> {
     pub fn recv(self) -> T {
         match self.try_recv() {
@@ -96,30 +155,31 @@ impl<T> PortOne<T> {
 
     pub fn try_recv(self) -> Option<T> {
         let mut this = self;
-
-        {
-            let self_ptr: *mut PortOne<T> = &mut this;
-
-            // XXX: Optimize this to not require the two context switches when data is available
-
-            // Switch to the scheduler
-            let sched = local_sched::take();
-            do sched.deschedule_running_task_and_then |task| {
-                unsafe {
-                    let task_as_state: State = cast::transmute(task);
-                    let oldstate = atomic_xchg(&mut (*(*self_ptr).inner.packet()).state, task_as_state);
-                    match oldstate {
-                        STATE_BOTH => {
-                            // Data has not been sent. Now we're blocked.
-                        }
-                        STATE_ONE => {
-                            // Channel is closed. Switch back and check the data.
-                            let task: ~Coroutine = cast::transmute(task_as_state);
-                            let sched = local_sched::take();
-                            sched.resume_task_immediately(task);
-                        }
-                        _ => util::unreachable()
+        let packet = this.inner.packet();
+
+        // XXX: Optimize this to not require the two context switches when data is available
+
+        // Switch to the scheduler to put the ~Task into the Packet state.
+        let sched = local_sched::take();
+        do sched.deschedule_running_task_and_then |task| {
+            unsafe {
+                // Atomically swap the task pointer into the Packet state, issuing
+                // an acquire barrier to prevent reordering of the subsequent read
+                // of the payload. Also issues a release barrier to prevent reordering
+                // of any previous writes to the task structure.
+                let task_as_state: State = cast::transmute(task);
+                let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
+                match oldstate {
+                    STATE_BOTH => {
+                        // Data has not been sent. Now we're blocked.
+                    }
+                    STATE_ONE => {
+                        // Channel is closed. Switch back and check the data.
+                        let task: ~Coroutine = cast::transmute(task_as_state);
+                        let sched = local_sched::take();
+                        sched.resume_task_immediately(task);
                     }
+                    _ => util::unreachable()
                 }
             }
         }
@@ -130,20 +190,20 @@ impl<T> PortOne<T> {
         // payload. Some scenarios:
         //
         // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
-        // 2) We encountered STATE_BOTH above and blocked. The sending task work-stole us
-        //    and ran on its thread. The work stealing had a memory barrier.
+        // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
+        //    and ran on its thread. The sending task issued a read barrier when taking the
+        //    pointer to the receiving task.
         // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
         //    is pinned to some other scheduler, so the sending task had to give us to
         //    a different scheduler for resuming. That send synchronized memory.
 
         unsafe {
-            let payload = util::replace(&mut (*this.inner.packet()).payload, None);
+            let payload = util::replace(&mut (*packet).payload, None);
 
             // The sender has closed up shop. Drop the packet.
             let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
-            // Supress the finalizer. We're done here.
+            // Suppress the synchronizing actions in the finalizer. We're done with the packet.
             this.inner.suppress_finalize = true;
-
             return payload;
         }
     }
@@ -167,47 +227,8 @@ impl<T> Peekable<T> for PortOne<T> {
     }
 }
 
-impl<T> ChanOne<T> {
-
-    pub fn send(self, val: T) {
-        self.try_send(val);
-    }
-
-    pub fn try_send(self, val: T) -> bool {
-        let mut this = self;
-        let mut recvr_active = true;
-
-        unsafe {
-            assert!((*this.inner.packet()).payload.is_none());
-            (*this.inner.packet()).payload = Some(val);
-
-            let oldstate = atomic_xchg(&mut (*this.inner.packet()).state, STATE_ONE);
-            match oldstate {
-                STATE_BOTH => {
-                    // Port is not recving yet. Nothing to do
-                }
-                STATE_ONE => {
-                    // Port has closed. Need to clean up.
-                    let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
-                    recvr_active = false;
-                }
-                _ => {
-                    // Port is blocked. Wake it up.
-                    let recvr: ~Coroutine = cast::transmute(oldstate);
-                    let sched = local_sched::take();
-                    sched.schedule_task(recvr);
-                }
-            }
-        }
-
-        // Suppress the finalizer. We're done here.
-        this.inner.suppress_finalize = true;
-        return recvr_active;
-    }
-}
-
 #[unsafe_destructor]
-impl<T> Drop for PortOneHack<T> {
+impl<T> Drop for ChanOneHack<T> {
     fn finalize(&self) {
         if self.suppress_finalize { return }
 
@@ -216,13 +237,17 @@ impl<T> Drop for PortOneHack<T> {
             let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
             match oldstate {
                 STATE_BOTH => {
-                    /* cleanup is the chan's responsibility */
+                    // Port still active. It will destroy the Packet.
                 },
                 STATE_ONE => {
                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
-                }
-                _ => {
-                    util::unreachable()
+                },
+                task_as_state => {
+                    // The port is blocked waiting for a message we will never send. Wake it.
+                    assert!((*this.packet()).payload.is_none());
+                    let recvr: ~Coroutine = cast::transmute(task_as_state);
+                    let sched = local_sched::take();
+                    sched.schedule_task(recvr);
                 }
             }
         }
@@ -230,7 +255,7 @@ impl<T> Drop for PortOneHack<T> {
 }
 
 #[unsafe_destructor]
-impl<T> Drop for ChanOneHack<T> {
+impl<T> Drop for PortOneHack<T> {
     fn finalize(&self) {
         if self.suppress_finalize { return }
 
@@ -239,24 +264,20 @@ impl<T> Drop for ChanOneHack<T> {
             let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
             match oldstate {
                 STATE_BOTH => {
-                    /* cleanup is the port's responsibility */
+                    // Chan still active. It will destroy the packet.
                 },
                 STATE_ONE => {
                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
-                },
+                }
                 _ => {
-                    // The port is blocked recving for a message we will never send. Wake it.
-                    assert!((*this.packet()).payload.is_none());
-                    let recvr: ~Coroutine = cast::transmute(oldstate);
-                    let sched = local_sched::take();
-                    sched.schedule_task(recvr);
+                    util::unreachable()
                 }
             }
         }
     }
 }
 
-impl<T> PortOneHack<T> {
+impl<T> ChanOneHack<T> {
     fn packet(&self) -> *mut Packet<T> {
         unsafe {
             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
@@ -266,7 +287,7 @@ impl<T> PortOneHack<T> {
     }
 }
 
-impl<T> ChanOneHack<T> {
+impl<T> PortOneHack<T> {
     fn packet(&self) -> *mut Packet<T> {
         unsafe {
             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
@@ -276,18 +297,23 @@ impl<T> ChanOneHack<T> {
     }
 }
 
-struct StreamPayload<T>(T, PortOne<StreamPayload<T>>);
-
-pub struct Port<T> {
-    // FIXME #5372. Using Cell because we don't take &mut self
-    next: Cell<PortOne<StreamPayload<T>>>
+struct StreamPayload<T> {
+    val: T,
+    next: PortOne<StreamPayload<T>>
 }
 
+/// A channel with unbounded size.
 pub struct Chan<T> {
     // FIXME #5372. Using Cell because we don't take &mut self
     next: Cell<ChanOne<StreamPayload<T>>>
 }
 
+/// An port with unbounded size.
+pub struct Port<T> {
+    // FIXME #5372. Using Cell because we don't take &mut self
+    next: Cell<PortOne<StreamPayload<T>>>
+}
+
 pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
     let (pone, cone) = oneshot();
     let port = Port { next: Cell(pone) };
@@ -295,6 +321,21 @@ pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
     return (port, chan);
 }
 
+impl<T: Owned> GenericChan<T> for Chan<T> {
+    fn send(&self, val: T) {
+        self.try_send(val);
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+    fn try_send(&self, val: T) -> bool {
+        let (next_pone, next_cone) = oneshot();
+        let cone = self.next.take();
+        self.next.put_back(next_cone);
+        cone.try_send(StreamPayload { val: val, next: next_pone })
+    }
+}
+
 impl<T> GenericPort<T> for Port<T> {
     fn recv(&self) -> T {
         match self.try_recv() {
@@ -308,7 +349,7 @@ impl<T> GenericPort<T> for Port<T> {
     fn try_recv(&self) -> Option<T> {
         let pone = self.next.take();
         match pone.try_recv() {
-            Some(StreamPayload(val, next)) => {
+            Some(StreamPayload { val, next }) => {
                 self.next.put_back(next);
                 Some(val)
             }
@@ -323,21 +364,6 @@ impl<T> Peekable<T> for Port<T> {
     }
 }
 
-impl<T: Owned> GenericChan<T> for Chan<T> {
-    fn send(&self, val: T) {
-        self.try_send(val);
-    }
-}
-
-impl<T: Owned> GenericSmartChan<T> for Chan<T> {
-    fn try_send(&self, val: T) -> bool {
-        let (next_pone, next_cone) = oneshot();
-        let cone = self.next.take();
-        self.next.put_back(next_cone);
-        cone.try_send(StreamPayload(val, next_pone))
-    }
-}
-
 #[cfg(test)]
 mod test {
     use super::*;
@@ -563,7 +589,7 @@ mod test {
     }
 
     #[test]
-    fn stream_send_recv() {
+    fn stream_send_recv_stress() {
         for stress_factor().times {
             do run_in_newsched_task {
                 let (port, chan) = stream::<~int>();
diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs
index 35c703bb350..8d6873be8cd 100644
--- a/src/libcore/rt/local_services.rs
+++ b/src/libcore/rt/local_services.rs
@@ -261,7 +261,7 @@ mod test {
         use comm::*;
 
         do run_in_newsched_task() {
-            let (port, chan) = oneshot();
+            let (port, chan) = stream();
             chan.send(10);
             assert!(port.recv() == 10);
         }