about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-05 18:19:06 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-16 17:47:11 -0800
commit529e268ab900f1b6e731af64ce2aeecda3555f4e (patch)
tree7ebb9ed2a7f36455b9550749a442522d45f0dc30
parentbfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (diff)
downloadrust-529e268ab900f1b6e731af64ce2aeecda3555f4e.tar.gz
rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.zip
Fallout of rewriting std::comm
-rw-r--r--src/etc/licenseck.py1
-rw-r--r--src/libextra/arc.rs27
-rw-r--r--src/libextra/comm.rs119
-rw-r--r--src/libextra/future.rs8
-rw-r--r--src/libextra/sync.rs101
-rw-r--r--src/libextra/task_pool.rs6
-rw-r--r--src/libextra/test.rs19
-rw-r--r--src/libextra/workcache.rs11
-rw-r--r--src/librustc/lib.rs7
-rw-r--r--src/librustdoc/html/format.rs112
-rw-r--r--src/librustdoc/html/render.rs266
-rw-r--r--src/librustuv/signal.rs2
-rw-r--r--src/librustuv/timer.rs9
-rw-r--r--src/libstd/comm/select.rs2
-rw-r--r--src/libstd/io/comm_adapters.rs48
-rw-r--r--src/libstd/io/mod.rs3
-rw-r--r--src/libstd/io/net/tcp.rs141
-rw-r--r--src/libstd/io/net/udp.rs67
-rw-r--r--src/libstd/io/net/unix.rs25
-rw-r--r--src/libstd/io/signal.rs11
-rw-r--r--src/libstd/io/timer.rs4
-rw-r--r--src/libstd/lib.rs1
-rw-r--r--src/libstd/prelude.rs2
-rw-r--r--src/libstd/rand/os.rs10
-rw-r--r--src/libstd/rt/comm.rs1141
-rw-r--r--src/libstd/rt/kill.rs37
-rw-r--r--src/libstd/rt/local_ptr.rs3
-rw-r--r--src/libstd/rt/message_queue.rs55
-rw-r--r--src/libstd/rt/mod.rs21
-rw-r--r--src/libstd/rt/rtio.rs4
-rw-r--r--src/libstd/rt/sched.rs105
-rw-r--r--src/libstd/rt/select.rs29
-rw-r--r--src/libstd/rt/task.rs62
-rw-r--r--src/libstd/rt/test.rs22
-rw-r--r--src/libstd/rt/thread.rs187
-rw-r--r--src/libstd/run.rs5
-rw-r--r--src/libstd/select.rs306
-rw-r--r--src/libstd/task/mod.rs72
-rw-r--r--src/libstd/task/spawn.rs21
-rw-r--r--src/libstd/unstable/mod.rs13
-rw-r--r--src/libstd/unstable/sync.rs23
-rw-r--r--src/test/auxiliary/cci_capture_clause.rs2
-rw-r--r--src/test/compile-fail/bind-by-move-no-guards.rs4
-rw-r--r--src/test/compile-fail/builtin-superkinds-self-type.rs6
-rw-r--r--src/test/compile-fail/unsendable-class.rs4
-rw-r--r--src/test/run-pass/builtin-superkinds-capabilities-transitive.rs6
-rw-r--r--src/test/run-pass/builtin-superkinds-capabilities-xc.rs5
-rw-r--r--src/test/run-pass/builtin-superkinds-capabilities.rs6
-rw-r--r--src/test/run-pass/builtin-superkinds-self-type.rs6
-rw-r--r--src/test/run-pass/capture_nil.rs2
-rw-r--r--src/test/run-pass/closure-bounds-can-capture-chan.rs2
-rw-r--r--src/test/run-pass/comm.rs4
-rw-r--r--src/test/run-pass/hashmap-memory.rs18
-rw-r--r--src/test/run-pass/issue-3609.rs1
-rw-r--r--src/test/run-pass/issue-4446.rs2
-rw-r--r--src/test/run-pass/issue-4448.rs3
-rw-r--r--src/test/run-pass/ivec-tag.rs6
-rw-r--r--src/test/run-pass/logging-only-prints-once.rs2
-rw-r--r--src/test/run-pass/send-resource.rs4
-rw-r--r--src/test/run-pass/send-type-inference.rs4
-rw-r--r--src/test/run-pass/sendable-class.rs4
-rw-r--r--src/test/run-pass/spawn-types.rs2
-rw-r--r--src/test/run-pass/task-comm-0.rs6
-rw-r--r--src/test/run-pass/task-comm-10.rs11
-rw-r--r--src/test/run-pass/task-comm-11.rs11
-rw-r--r--src/test/run-pass/task-comm-12.rs2
-rw-r--r--src/test/run-pass/task-comm-13.rs7
-rw-r--r--src/test/run-pass/task-comm-14.rs8
-rw-r--r--src/test/run-pass/task-comm-15.rs10
-rw-r--r--src/test/run-pass/task-comm-16.rs15
-rw-r--r--src/test/run-pass/task-comm-3.rs10
-rw-r--r--src/test/run-pass/task-comm-4.rs4
-rw-r--r--src/test/run-pass/task-comm-5.rs4
-rw-r--r--src/test/run-pass/task-comm-6.rs14
-rw-r--r--src/test/run-pass/task-comm-7.rs7
-rw-r--r--src/test/run-pass/task-comm-9.rs8
-rw-r--r--src/test/run-pass/task-comm-chan-nil.rs4
-rw-r--r--src/test/run-pass/task-spawn-move-and-copy.rs2
-rw-r--r--src/test/run-pass/tempfile.rs2
-rw-r--r--src/test/run-pass/trait-bounds-in-arc.rs7
-rw-r--r--src/test/run-pass/trivial-message.rs4
-rw-r--r--src/test/run-pass/unique-send-2.rs8
-rw-r--r--src/test/run-pass/unique-send.rs2
-rw-r--r--src/test/run-pass/unwind-resource.rs4
-rw-r--r--src/test/run-pass/yield.rs2
-rw-r--r--src/test/run-pass/yield1.rs2
86 files changed, 765 insertions, 2590 deletions
diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py
index 91231430d2a..78d0973fdfe 100644
--- a/src/etc/licenseck.py
+++ b/src/etc/licenseck.py
@@ -77,6 +77,7 @@ exceptions = [
     "rt/isaac/rand.h", # public domain
     "rt/isaac/standard.h", # public domain
     "libstd/rt/mpsc_queue.rs", # BSD
+    "libstd/rt/spsc_queue.rs", # BSD
     "libstd/rt/mpmc_bounded_queue.rs", # BSD
 ]
 
diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs
index 5700a299945..ea8066b786f 100644
--- a/src/libextra/arc.rs
+++ b/src/libextra/arc.rs
@@ -597,7 +597,6 @@ mod tests {
 
     use arc::*;
 
-    use std::comm;
     use std::task;
 
     #[test]
@@ -605,7 +604,7 @@ mod tests {
         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
         let arc_v = Arc::new(v);
 
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do task::spawn {
             let arc_v: Arc<~[int]> = p.recv();
@@ -626,7 +625,7 @@ mod tests {
     fn test_mutex_arc_condvar() {
         let arc = ~MutexArc::new(false);
         let arc2 = ~arc.clone();
-        let (p,c) = comm::oneshot();
+        let (p,c) = Chan::new();
         do task::spawn {
             // wait until parent gets in
             p.recv();
@@ -638,7 +637,7 @@ mod tests {
 
         let mut c = Some(c);
         arc.access_cond(|state, cond| {
-            c.take_unwrap().send(());
+            c.take_unwrawp().send(());
             assert!(!*state);
             while !*state {
                 cond.wait();
@@ -650,7 +649,7 @@ mod tests {
     fn test_arc_condvar_poison() {
         let arc = ~MutexArc::new(1);
         let arc2 = ~arc.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do spawn {
             let _ = p.recv();
@@ -687,7 +686,7 @@ mod tests {
     pub fn test_mutex_arc_unwrap_poison() {
         let arc = MutexArc::new(1);
         let arc2 = ~(&arc).clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         do task::spawn {
             arc2.access(|one| {
                 c.send(());
@@ -804,7 +803,7 @@ mod tests {
     fn test_rw_arc() {
         let arc = RWArc::new(0);
         let arc2 = arc.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do task::spawn {
             arc2.write(|num| {
@@ -832,7 +831,7 @@ mod tests {
         });
 
         // Wait for children to pass their asserts
-        for r in children.iter() {
+        for r in children.mut_iter() {
             r.recv();
         }
 
@@ -855,7 +854,7 @@ mod tests {
         // Reader tasks
         let mut reader_convos = ~[];
         10.times(|| {
-            let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
+            let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new());
             reader_convos.push((rc1, rp2));
             let arcn = arc.clone();
             do task::spawn {
@@ -869,7 +868,7 @@ mod tests {
 
         // Writer task
         let arc2 = arc.clone();
-        let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
+        let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new());
         do task::spawn || {
             wp1.recv();
             arc2.write_cond(|state, cond| {
@@ -897,14 +896,14 @@ mod tests {
                 assert_eq!(*state, 42);
                 *state = 31337;
                 // send to other readers
-                for &(ref rc, _) in reader_convos.iter() {
+                for &(ref mut rc, _) in reader_convos.mut_iter() {
                     rc.send(())
                 }
             });
             let read_mode = arc.downgrade(write_mode);
             read_mode.read(|state| {
                 // complete handshake with other readers
-                for &(_, ref rp) in reader_convos.iter() {
+                for &(_, ref mut rp) in reader_convos.mut_iter() {
                     rp.recv()
                 }
                 wc1.send(()); // tell writer to try again
@@ -926,7 +925,7 @@ mod tests {
         //     "blk(&Condvar { order: opt_lock, ..*cond })"
         // with just "blk(cond)".
         let x = RWArc::new(true);
-        let (wp, wc) = comm::stream();
+        let (wp, wc) = Chan::new();
 
         // writer task
         let xw = x.clone();
@@ -951,7 +950,7 @@ mod tests {
             });
             // make a reader task to trigger the "reader cloud lock" handoff
             let xr = x.clone();
-            let (rp, rc) = comm::stream();
+            let (rp, rc) = Chan::new();
             do task::spawn {
                 rc.send(());
                 xr.read(|_state| { })
diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs
index 42287736ffa..09dd85fe0de 100644
--- a/src/libextra/comm.rs
+++ b/src/libextra/comm.rs
@@ -16,11 +16,6 @@ Higher level communication abstractions.
 
 #[allow(missing_doc)];
 
-
-use std::comm::{GenericChan, GenericSmartChan, GenericPort};
-use std::comm::{Chan, Port, Peekable};
-use std::comm;
-
 /// An extension of `pipes::stream` that allows both sending and receiving.
 pub struct DuplexStream<T, U> {
     priv chan: Chan<T>,
@@ -29,108 +24,73 @@ pub struct DuplexStream<T, U> {
 
 // Allow these methods to be used without import:
 impl<T:Send,U:Send> DuplexStream<T, U> {
+    /// Creates a bidirectional stream.
+    pub fn new() -> (DuplexStream<T, U>, DuplexStream<U, T>) {
+        let (p1, c2) = Chan::new();
+        let (p2, c1) = Chan::new();
+        (DuplexStream { chan: c1, port: p1 },
+         DuplexStream { chan: c2, port: p2 })
+    }
     pub fn send(&self, x: T) {
         self.chan.send(x)
     }
     pub fn try_send(&self, x: T) -> bool {
         self.chan.try_send(x)
     }
-    pub fn recv(&self, ) -> U {
+    pub fn recv(&self) -> U {
         self.port.recv()
     }
     pub fn try_recv(&self) -> Option<U> {
         self.port.try_recv()
     }
-    pub fn peek(&self) -> bool {
-        self.port.peek()
-    }
-}
-
-impl<T:Send,U:Send> GenericChan<T> for DuplexStream<T, U> {
-    fn send(&self, x: T) {
-        self.chan.send(x)
-    }
-}
-
-impl<T:Send,U:Send> GenericSmartChan<T> for DuplexStream<T, U> {
-    fn try_send(&self, x: T) -> bool {
-        self.chan.try_send(x)
-    }
-}
-
-impl<T:Send,U:Send> GenericPort<U> for DuplexStream<T, U> {
-    fn recv(&self) -> U {
-        self.port.recv()
-    }
-
-    fn try_recv(&self) -> Option<U> {
-        self.port.try_recv()
-    }
-}
-
-impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> {
-    fn peek(&self) -> bool {
-        self.port.peek()
+    pub fn recv_opt(&self) -> Option<U> {
+        self.port.recv_opt()
     }
 }
 
-/// Creates a bidirectional stream.
-pub fn DuplexStream<T:Send,U:Send>()
-    -> (DuplexStream<T, U>, DuplexStream<U, T>)
-{
-    let (p1, c2) = comm::stream();
-    let (p2, c1) = comm::stream();
-    (DuplexStream {
-        chan: c1,
-        port: p1
-    },
-     DuplexStream {
-         chan: c2,
-         port: p2
-     })
-}
-
 /// An extension of `pipes::stream` that provides synchronous message sending.
 pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> }
 /// An extension of `pipes::stream` that acknowledges each message received.
 pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> }
 
-impl<T: Send> GenericChan<T> for SyncChan<T> {
-    fn send(&self, val: T) {
+impl<T: Send> SyncChan<T> {
+    pub fn send(&self, val: T) {
         assert!(self.try_send(val), "SyncChan.send: receiving port closed");
     }
-}
 
-impl<T: Send> GenericSmartChan<T> for SyncChan<T> {
-    /// Sends a message, or report if the receiver has closed the connection before receiving.
-    fn try_send(&self, val: T) -> bool {
-        self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some()
+    /// Sends a message, or report if the receiver has closed the connection
+    /// before receiving.
+    pub fn try_send(&self, val: T) -> bool {
+        self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
     }
 }
 
-impl<T: Send> GenericPort<T> for SyncPort<T> {
-    fn recv(&self) -> T {
-        self.try_recv().expect("SyncPort.recv: sending channel closed")
+impl<T: Send> SyncPort<T> {
+    pub fn recv(&self) -> T {
+        self.recv_opt().expect("SyncPort.recv: sending channel closed")
     }
 
-    fn try_recv(&self) -> Option<T> {
-        self.duplex_stream.try_recv().map(|val| {
+    pub fn recv_opt(&self) -> Option<T> {
+        self.duplex_stream.recv_opt().map(|val| {
             self.duplex_stream.try_send(());
             val
         })
     }
-}
 
-impl<T: Send> Peekable<T> for SyncPort<T> {
-    fn peek(&self) -> bool {
-        self.duplex_stream.peek()
+    pub fn try_recv(&self) -> Option<T> {
+        self.duplex_stream.try_recv().map(|val| {
+            self.duplex_stream.try_send(());
+            val
+        })
     }
 }
 
-/// Creates a stream whose channel, upon sending a message, blocks until the message is received.
+/// Creates a stream whose channel, upon sending a message, blocks until the
+/// message is received.
 pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
-    let (chan_stream, port_stream) = DuplexStream();
-    (SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream })
+    let (chan_stream, port_stream) = DuplexStream::new();
+    (SyncPort { duplex_stream: port_stream },
+     SyncChan { duplex_stream: chan_stream })
 }
 
 #[cfg(test)]
@@ -141,7 +101,7 @@ mod test {
 
     #[test]
     pub fn DuplexStream1() {
-        let (left, right) = DuplexStream();
+        let (mut left, mut right) = DuplexStream::new();
 
         left.send(~"abc");
         right.send(123);
@@ -152,9 +112,10 @@ mod test {
 
     #[test]
     pub fn basic_rendezvous_test() {
-        let (port, chan) = rendezvous();
+        let (mut port, chan) = rendezvous();
 
         do spawn {
+            let mut chan = chan;
             chan.send("abc");
         }
 
@@ -165,8 +126,9 @@ mod test {
     fn recv_a_lot() {
         // Rendezvous streams should be able to handle any number of messages being sent
         do run_in_uv_task {
-            let (port, chan) = rendezvous();
+            let (mut port, chan) = rendezvous();
             do spawn {
+                let mut chan = chan;
                 1000000.times(|| { chan.send(()) })
             }
             1000000.times(|| { port.recv() })
@@ -175,8 +137,9 @@ mod test {
 
     #[test]
     fn send_and_fail_and_try_recv() {
-        let (port, chan) = rendezvous();
+        let (mut port, chan) = rendezvous();
         do spawn {
+            let mut chan = chan;
             chan.duplex_stream.send(()); // Can't access this field outside this module
             fail!()
         }
@@ -185,8 +148,9 @@ mod test {
 
     #[test]
     fn try_send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
+        let (port, mut chan) = rendezvous();
         do spawn {
+            let mut port = port;
             port.duplex_stream.recv();
             fail!()
         }
@@ -196,8 +160,9 @@ mod test {
     #[test]
     #[should_fail]
     fn send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
+        let (port, mut chan) = rendezvous();
         do spawn {
+            let mut port = port;
             port.duplex_stream.recv();
             fail!()
         }
diff --git a/src/libextra/future.rs b/src/libextra/future.rs
index 1a2ac398132..eb61b7781f1 100644
--- a/src/libextra/future.rs
+++ b/src/libextra/future.rs
@@ -25,7 +25,6 @@
 
 #[allow(missing_doc)];
 
-use std::comm::{PortOne, oneshot};
 use std::util::replace;
 
 /// A type encapsulating the result of a computation which may not be complete
@@ -104,7 +103,7 @@ impl<A> Future<A> {
 }
 
 impl<A:Send> Future<A> {
-    pub fn from_port(port: PortOne<A>) -> Future<A> {
+    pub fn from_port(port: Port<A>) -> Future<A> {
         /*!
          * Create a future from a port
          *
@@ -125,7 +124,7 @@ impl<A:Send> Future<A> {
          * value of the future.
          */
 
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             chan.send(blk());
@@ -139,7 +138,6 @@ impl<A:Send> Future<A> {
 mod test {
     use future::Future;
 
-    use std::comm::oneshot;
     use std::task;
 
     #[test]
@@ -150,7 +148,7 @@ mod test {
 
     #[test]
     fn test_from_port() {
-        let (po, ch) = oneshot();
+        let (po, ch) = Chan::new();
         ch.send(~"whale");
         let mut f = Future::from_port(po);
         assert_eq!(f.get(), ~"whale");
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs
index f00c3d8db9a..1cc403c32f4 100644
--- a/src/libextra/sync.rs
+++ b/src/libextra/sync.rs
@@ -19,9 +19,6 @@
 
 
 use std::borrow;
-use std::comm;
-use std::comm::SendDeferred;
-use std::comm::{GenericPort, Peekable};
 use std::unstable::sync::{Exclusive, UnsafeArc};
 use std::unstable::atomics;
 use std::unstable::finally::Finally;
@@ -34,48 +31,53 @@ use std::util::NonCopyable;
 
 // Each waiting task receives on one of these.
 #[doc(hidden)]
-type WaitEnd = comm::PortOne<()>;
+type WaitEnd = Port<()>;
 #[doc(hidden)]
-type SignalEnd = comm::ChanOne<()>;
+type SignalEnd = Chan<()>;
 // A doubly-ended queue of waiting tasks.
 #[doc(hidden)]
-struct WaitQueue { head: comm::Port<SignalEnd>,
-                   tail: comm::Chan<SignalEnd> }
+struct WaitQueue { head: Port<SignalEnd>,
+                   tail: Chan<SignalEnd> }
 
 impl WaitQueue {
     fn new() -> WaitQueue {
-        let (block_head, block_tail) = comm::stream();
+        let (block_head, block_tail) = Chan::new();
         WaitQueue { head: block_head, tail: block_tail }
     }
 
     // Signals one live task from the queue.
     fn signal(&self) -> bool {
-        // The peek is mandatory to make sure recv doesn't block.
-        if self.head.peek() {
-            // Pop and send a wakeup signal. If the waiter was killed, its port
-            // will have closed. Keep trying until we get a live task.
-            if self.head.recv().try_send_deferred(()) {
-                true
-            } else {
-                self.signal()
+        match self.head.try_recv() {
+            Some(ch) => {
+                // Send a wakeup signal. If the waiter was killed, its port will
+                // have closed. Keep trying until we get a live task.
+                if ch.try_send_deferred(()) {
+                    true
+                } else {
+                    self.signal()
+                }
             }
-        } else {
-            false
+            None => false
         }
     }
 
     fn broadcast(&self) -> uint {
         let mut count = 0;
-        while self.head.peek() {
-            if self.head.recv().try_send_deferred(()) {
-                count += 1;
+        loop {
+            match self.head.try_recv() {
+                None => break,
+                Some(ch) => {
+                    if ch.try_send_deferred(()) {
+                        count += 1;
+                    }
+                }
             }
         }
         count
     }
 
     fn wait_end(&self) -> WaitEnd {
-        let (wait_end, signal_end) = comm::oneshot();
+        let (wait_end, signal_end) = Chan::new();
         self.tail.send_deferred(signal_end);
         wait_end
     }
@@ -282,8 +284,7 @@ impl<'a> Condvar<'a> {
                               condvar_id,
                               "cond.signal_on()",
                               || {
-                let queue = queue.take_unwrap();
-                queue.broadcast()
+                queue.take_unwrap().broadcast()
             })
         }
     }
@@ -676,7 +677,6 @@ mod tests {
     use sync::*;
 
     use std::cast;
-    use std::comm;
     use std::result;
     use std::task;
 
@@ -711,7 +711,7 @@ mod tests {
     #[test]
     fn test_sem_as_cvar() {
         /* Child waits and parent signals */
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let s = Semaphore::new(0);
         let s2 = s.clone();
         do task::spawn {
@@ -723,7 +723,7 @@ mod tests {
         let _ = p.recv();
 
         /* Parent waits and child signals */
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let s = Semaphore::new(0);
         let s2 = s.clone();
         do task::spawn {
@@ -740,8 +740,8 @@ mod tests {
         // time, and shake hands.
         let s = Semaphore::new(2);
         let s2 = s.clone();
-        let (p1,c1) = comm::stream();
-        let (p2,c2) = comm::stream();
+        let (p1,c1) = Chan::new();
+        let (p2,c2) = Chan::new();
         do task::spawn {
             s2.access(|| {
                 let _ = p2.recv();
@@ -760,7 +760,7 @@ mod tests {
         do task::spawn_sched(task::SingleThreaded) {
             let s = Semaphore::new(1);
             let s2 = s.clone();
-            let (p, c) = comm::stream();
+            let (p, c) = Chan::new();
             let mut child_data = Some((s2, c));
             s.access(|| {
                 let (s2, c) = child_data.take_unwrap();
@@ -782,7 +782,7 @@ mod tests {
     fn test_mutex_lock() {
         // Unsafely achieve shared state, and do the textbook
         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let m = Mutex::new();
         let m2 = m.clone();
         let mut sharedstate = ~0;
@@ -829,7 +829,7 @@ mod tests {
             cond.wait();
         });
         // Parent wakes up child
-        let (port,chan) = comm::stream();
+        let (port,chan) = Chan::new();
         let m3 = m.clone();
         do task::spawn {
             m3.lock_cond(|cond| {
@@ -852,7 +852,7 @@ mod tests {
 
         num_waiters.times(|| {
             let mi = m.clone();
-            let (port, chan) = comm::stream();
+            let (port, chan) = Chan::new();
             ports.push(port);
             do task::spawn {
                 mi.lock_cond(|cond| {
@@ -864,13 +864,13 @@ mod tests {
         });
 
         // wait until all children get in the mutex
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
         m.lock_cond(|cond| {
             let num_woken = cond.broadcast();
             assert_eq!(num_woken, num_waiters);
         });
         // wait until all children wake up
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
     }
     #[test]
     fn test_mutex_cond_broadcast() {
@@ -915,8 +915,8 @@ mod tests {
         let m2 = m.clone();
 
         let result: result::Result<(), ~Any> = do task::try {
-            let (p, c) = comm::stream();
-            do task::spawn || { // linked
+            let (p, c) = Chan::new();
+            do task::spawn { // linked
                 let _ = p.recv(); // wait for sibling to get in the mutex
                 task::deschedule();
                 fail!();
@@ -940,19 +940,18 @@ mod tests {
 
         let m = Mutex::new();
         let m2 = m.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         let result: result::Result<(), ~Any> = do task::try {
             let mut sibling_convos = ~[];
             2.times(|| {
-                let (p, c) = comm::stream();
+                let (p, c) = Chan::new();
                 sibling_convos.push(p);
                 let mi = m2.clone();
                 // spawn sibling task
                 do task::spawn { // linked
                     let mut c = Some(c);
                     mi.lock_cond(|cond| {
-                        let c = c.take_unwrap();
                         c.send(()); // tell sibling to go ahead
                         (|| {
                             cond.wait(); // block forever
@@ -964,7 +963,7 @@ mod tests {
                     })
                 }
             });
-            for p in sibling_convos.iter() {
+            for p in sibling_convos.mut_iter() {
                 let _ = p.recv(); // wait for sibling to get in the mutex
             }
             m2.lock(|| { });
@@ -973,8 +972,8 @@ mod tests {
         };
         assert!(result.is_err());
         // child task must have finished by the time try returns
-        let r = p.recv();
-        for p in r.iter() { p.recv(); } // wait on all its siblings
+        let mut r = p.recv();
+        for p in r.mut_iter() { p.recv(); } // wait on all its siblings
         m.lock_cond(|cond| {
             let woken = cond.broadcast();
             assert_eq!(woken, 0);
@@ -999,7 +998,7 @@ mod tests {
         let result = do task::try {
             let m = Mutex::new_with_condvars(2);
             let m2 = m.clone();
-            let (p, c) = comm::stream();
+            let (p, c) = Chan::new();
             do task::spawn {
                 m2.lock_cond(|cond| {
                     c.send(());
@@ -1060,7 +1059,7 @@ mod tests {
                                  mode2: RWLockMode) {
         // Test mutual exclusion between readers and writers. Just like the
         // mutex mutual exclusion test, a ways above.
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let x2 = x.clone();
         let mut sharedstate = ~0;
         {
@@ -1111,8 +1110,8 @@ mod tests {
                                  make_mode2_go_first: bool) {
         // Much like sem_multi_resource.
         let x2 = x.clone();
-        let (p1, c1) = comm::stream();
-        let (p2, c2) = comm::stream();
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
         do task::spawn {
             if !make_mode2_go_first {
                 let _ = p2.recv(); // parent sends to us once it locks, or ...
@@ -1177,7 +1176,7 @@ mod tests {
             cond.wait();
         });
         // Parent wakes up child
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         let x3 = x.clone();
         do task::spawn {
             x3.write_cond(|cond| {
@@ -1214,7 +1213,7 @@ mod tests {
 
         num_waiters.times(|| {
             let xi = x.clone();
-            let (port, chan) = comm::stream();
+            let (port, chan) = Chan::new();
             ports.push(port);
             do task::spawn {
                 lock_cond(&xi, dg1, |cond| {
@@ -1226,13 +1225,13 @@ mod tests {
         });
 
         // wait until all children get in the mutex
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
         lock_cond(&x, dg2, |cond| {
             let num_woken = cond.broadcast();
             assert_eq!(num_woken, num_waiters);
         });
         // wait until all children wake up
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
     }
     #[test]
     fn test_rwlock_cond_broadcast() {
diff --git a/src/libextra/task_pool.rs b/src/libextra/task_pool.rs
index bda6935643f..f0c9833adf8 100644
--- a/src/libextra/task_pool.rs
+++ b/src/libextra/task_pool.rs
@@ -14,8 +14,6 @@
 /// parallelism.
 
 
-use std::comm::{Chan, GenericChan, GenericPort};
-use std::comm;
 use std::task::SchedMode;
 use std::task;
 use std::vec;
@@ -35,7 +33,7 @@ pub struct TaskPool<T> {
 #[unsafe_destructor]
 impl<T> Drop for TaskPool<T> {
     fn drop(&mut self) {
-        for channel in self.channels.iter() {
+        for channel in self.channels.mut_iter() {
             channel.send(Quit);
         }
     }
@@ -54,7 +52,7 @@ impl<T> TaskPool<T> {
         assert!(n_tasks >= 1);
 
         let channels = vec::from_fn(n_tasks, |i| {
-            let (port, chan) = comm::stream::<Msg<T>>();
+            let (port, chan) = Chan::<Msg<T>>::new();
             let init_fn = init_fn_factory();
 
             let task_body: proc() = proc() {
diff --git a/src/libextra/test.rs b/src/libextra/test.rs
index c2b4ff05d5d..974d4dc1dc5 100644
--- a/src/libextra/test.rs
+++ b/src/libextra/test.rs
@@ -29,7 +29,6 @@ use time::precise_time_ns;
 use treemap::TreeMap;
 
 use std::clone::Clone;
-use std::comm::{stream, SharedChan, GenericPort, GenericChan};
 use std::io;
 use std::io::File;
 use std::io::Writer;
@@ -746,8 +745,7 @@ fn run_tests(opts: &TestOpts,
     remaining.reverse();
     let mut pending = 0;
 
-    let (p, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
 
     while pending > 0 || !remaining.is_empty() {
         while pending < concurrency && !remaining.is_empty() {
@@ -872,7 +870,7 @@ pub fn run_test(force_ignore: bool,
     fn run_test_inner(desc: TestDesc,
                       monitor_ch: SharedChan<MonitorMsg>,
                       testfn: proc()) {
-        do task::spawn {
+        do spawn {
             let mut task = task::task();
             task.name(match desc.name {
                 DynTestName(ref name) => SendStrOwned(name.clone()),
@@ -1206,7 +1204,6 @@ mod tests {
                StaticTestName, DynTestName, DynTestFn};
     use test::{TestOpts, run_test};
 
-    use std::comm::{stream, SharedChan};
     use tempfile::TempDir;
 
     #[test]
@@ -1220,8 +1217,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert!(res != TrOk);
@@ -1238,8 +1234,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrIgnored);
@@ -1256,8 +1251,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrOk);
@@ -1274,8 +1268,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrFailed);
diff --git a/src/libextra/workcache.rs b/src/libextra/workcache.rs
index 91f1f1a0d0b..8713dbde920 100644
--- a/src/libextra/workcache.rs
+++ b/src/libextra/workcache.rs
@@ -15,8 +15,7 @@ use json::ToJson;
 use serialize::{Encoder, Encodable, Decoder, Decodable};
 use arc::{Arc,RWArc};
 use treemap::TreeMap;
-use std::comm::{PortOne, oneshot};
-use std::{str, task};
+use std::str;
 use std::io;
 use std::io::{File, Decorator};
 use std::io::mem::MemWriter;
@@ -252,7 +251,7 @@ pub struct Exec {
 
 enum Work<'a, T> {
     WorkValue(T),
-    WorkFromTask(&'a Prep<'a>, PortOne<(Exec, T)>),
+    WorkFromTask(&'a Prep<'a>, Port<(Exec, T)>),
 }
 
 fn json_encode<'a, T:Encodable<json::Encoder<'a>>>(t: &T) -> ~str {
@@ -427,11 +426,11 @@ impl<'a> Prep<'a> {
 
             _ => {
                 debug!("Cache miss!");
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
                 let blk = bo.take_unwrap();
 
                 // XXX: What happens if the task fails?
-                do task::spawn {
+                do spawn {
                     let mut exe = Exec {
                         discovered_inputs: WorkMap::new(),
                         discovered_outputs: WorkMap::new(),
@@ -453,7 +452,7 @@ impl<'a, T:Send +
     pub fn from_value(elt: T) -> Work<'a, T> {
         WorkValue(elt)
     }
-    pub fn from_task(prep: &'a Prep<'a>, port: PortOne<(Exec, T)>)
+    pub fn from_task(prep: &'a Prep<'a>, port: Port<(Exec, T)>)
         -> Work<'a, T> {
         WorkFromTask(prep, port)
     }
diff --git a/src/librustc/lib.rs b/src/librustc/lib.rs
index a26fd9f5af2..c5be6776eef 100644
--- a/src/librustc/lib.rs
+++ b/src/librustc/lib.rs
@@ -27,6 +27,7 @@ use driver::driver::{compile_input};
 use driver::session;
 use middle::lint;
 
+use std::cast;
 use std::comm;
 use std::io;
 use std::io::Reader;
@@ -303,7 +304,8 @@ impl diagnostic::Emitter for RustcEmitter {
             msg: &str,
             lvl: diagnostic::level) {
         if lvl == diagnostic::fatal {
-            self.ch_capture.send(fatal)
+            let this = unsafe { cast::transmute_mut(self) };
+            this.ch_capture.send(fatal)
         }
 
         diagnostic::DefaultEmitter.emit(cmsp, msg, lvl)
@@ -333,8 +335,7 @@ pub fn monitor(f: proc(@diagnostic::Emitter)) {
     #[cfg(not(rtopt))]
     static STACK_SIZE: uint = 20000000; // 20MB
 
-    let (p, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
     let ch_capture = ch.clone();
     let mut task_builder = task::task();
     task_builder.name("rustc");
diff --git a/src/librustdoc/html/format.rs b/src/librustdoc/html/format.rs
index 661f96e7c1a..04da17d4ec4 100644
--- a/src/librustdoc/html/format.rs
+++ b/src/librustdoc/html/format.rs
@@ -174,71 +174,70 @@ fn path(w: &mut io::Writer, path: &clean::Path, print_all: bool,
         let loc = loc.unwrap();
 
         local_data::get(cache_key, |cache| {
-            cache.unwrap().read(|cache| {
-                let abs_root = root(cache, loc.as_slice());
-                let rel_root = match path.segments[0].name.as_slice() {
-                    "self" => Some(~"./"),
-                    _ => None,
-                };
-
-                if print_all {
-                    let amt = path.segments.len() - 1;
-                    match rel_root {
-                        Some(root) => {
-                            let mut root = root;
-                            for seg in path.segments.slice_to(amt).iter() {
-                                if "super" == seg.name || "self" == seg.name {
-                                    write!(w, "{}::", seg.name);
-                                } else {
-                                    root.push_str(seg.name);
-                                    root.push_str("/");
-                                    write!(w, "<a class='mod'
-                                                  href='{}index.html'>{}</a>::",
-                                           root,
-                                           seg.name);
-                                }
-                            }
-                        }
-                        None => {
-                            for seg in path.segments.slice_to(amt).iter() {
+            let cache = cache.unwrap().get();
+            let abs_root = root(cache, loc.as_slice());
+            let rel_root = match path.segments[0].name.as_slice() {
+                "self" => Some(~"./"),
+                _ => None,
+            };
+
+            if print_all {
+                let amt = path.segments.len() - 1;
+                match rel_root {
+                    Some(root) => {
+                        let mut root = root;
+                        for seg in path.segments.slice_to(amt).iter() {
+                            if "super" == seg.name || "self" == seg.name {
                                 write!(w, "{}::", seg.name);
+                            } else {
+                                root.push_str(seg.name);
+                                root.push_str("/");
+                                write!(w, "<a class='mod'
+                                              href='{}index.html'>{}</a>::",
+                                       root,
+                                       seg.name);
                             }
                         }
                     }
+                    None => {
+                        for seg in path.segments.slice_to(amt).iter() {
+                            write!(w, "{}::", seg.name);
+                        }
+                    }
                 }
+            }
 
-                match info(cache) {
-                    // This is a documented path, link to it!
-                    Some((ref fqp, shortty)) if abs_root.is_some() => {
-                        let mut url = abs_root.unwrap();
-                        let to_link = fqp.slice_to(fqp.len() - 1);
-                        for component in to_link.iter() {
-                            url.push_str(*component);
-                            url.push_str("/");
+            match info(cache) {
+                // This is a documented path, link to it!
+                Some((ref fqp, shortty)) if abs_root.is_some() => {
+                    let mut url = abs_root.unwrap();
+                    let to_link = fqp.slice_to(fqp.len() - 1);
+                    for component in to_link.iter() {
+                        url.push_str(*component);
+                        url.push_str("/");
+                    }
+                    match shortty {
+                        "mod" => {
+                            url.push_str(*fqp.last());
+                            url.push_str("/index.html");
                         }
-                        match shortty {
-                            "mod" => {
-                                url.push_str(*fqp.last());
-                                url.push_str("/index.html");
-                            }
-                            _ => {
-                                url.push_str(shortty);
-                                url.push_str(".");
-                                url.push_str(*fqp.last());
-                                url.push_str(".html");
-                            }
+                        _ => {
+                            url.push_str(shortty);
+                            url.push_str(".");
+                            url.push_str(*fqp.last());
+                            url.push_str(".html");
                         }
-
-                        write!(w, "<a class='{}' href='{}' title='{}'>{}</a>",
-                               shortty, url, fqp.connect("::"), last.name);
                     }
 
-                    _ => {
-                        write!(w, "{}", last.name);
-                    }
+                    write!(w, "<a class='{}' href='{}' title='{}'>{}</a>",
+                           shortty, url, fqp.connect("::"), last.name);
                 }
-                write!(w, "{}", generics);
-            })
+
+                _ => {
+                    write!(w, "{}", last.name);
+                }
+            }
+            write!(w, "{}", generics);
         })
     })
 }
@@ -263,9 +262,8 @@ impl fmt::Default for clean::Type {
         match *g {
             clean::TyParamBinder(id) | clean::Generic(id) => {
                 local_data::get(cache_key, |cache| {
-                    cache.unwrap().read(|m| {
-                        f.buf.write(m.typarams.get(&id).as_bytes());
-                    })
+                    let m = cache.unwrap().get();
+                    f.buf.write(m.typarams.get(&id).as_bytes());
                 })
             }
             clean::ResolvedPath{id, typarams: ref tp, path: ref path} => {
diff --git a/src/librustdoc/html/render.rs b/src/librustdoc/html/render.rs
index 0c703e780f1..82122c4c32f 100644
--- a/src/librustdoc/html/render.rs
+++ b/src/librustdoc/html/render.rs
@@ -33,8 +33,6 @@
 //! These tasks are not parallelized (they haven't been a bottleneck yet), and
 //! both occur before the crate is rendered.
 
-use std::comm::{SharedPort, SharedChan};
-use std::comm;
 use std::fmt;
 use std::hashmap::{HashMap, HashSet};
 use std::local_data;
@@ -42,12 +40,10 @@ use std::io::buffered::BufferedWriter;
 use std::io;
 use std::io::fs;
 use std::io::File;
-use std::os;
 use std::str;
-use std::task;
 use std::vec;
 
-use extra::arc::RWArc;
+use extra::arc::Arc;
 use extra::json::ToJson;
 use extra::sort;
 
@@ -121,7 +117,7 @@ enum Implementor {
 ///
 /// This structure purposefully does not implement `Clone` because it's intended
 /// to be a fairly large and expensive structure to clone. Instead this adheres
-/// to both `Send` and `Freeze` so it may be stored in a `RWArc` instance and
+/// to both `Send` and `Freeze` so it may be stored in a `Arc` instance and
 /// shared among the various rendering tasks.
 pub struct Cache {
     /// Mapping of typaram ids to the name of the type parameter. This is used
@@ -197,7 +193,7 @@ struct IndexItem {
 
 // TLS keys used to carry information around during rendering.
 
-local_data_key!(pub cache_key: RWArc<Cache>)
+local_data_key!(pub cache_key: Arc<Cache>)
 local_data_key!(pub current_location_key: ~[~str])
 
 /// Generates the documentation for `crate` into the directory `dst`
@@ -640,22 +636,6 @@ impl<'a> Cache {
     }
 }
 
-enum Progress {
-    JobNew,
-    JobDone,
-}
-
-/// A helper object to unconditionally send a value on a chanel.
-struct ChannelGuard {
-    channel: SharedChan<Progress>,
-}
-
-impl Drop for ChannelGuard {
-    fn drop(&mut self) {
-        self.channel.send(JobDone)
-    }
-}
-
 impl Context {
     /// Recurse in the directory structure and change the "root path" to make
     /// sure it always points to the top (relatively)
@@ -680,97 +660,26 @@ impl Context {
         return ret;
     }
 
-    /// Main method for rendering a crate. This parallelizes the task of
-    /// rendering a crate, and requires ownership of the crate in order to break
-    /// it up into its separate components.
-    fn crate(self, mut crate: clean::Crate, cache: Cache) {
-        enum Work {
-            Die,
-            Process(Context, clean::Item),
-        }
-        let workers = match os::getenv("RUSTDOC_WORKERS") {
-            Some(s) => {
-                match from_str::<uint>(s) {
-                    Some(n) => n, None => fail!("{} not a number", s)
-                }
-            }
-            None => 10,
-        };
-
+    /// Main method for rendering a crate.
+    ///
+    /// This currently isn't parallelized, but it'd be pretty easy to add
+    /// parallelization to this function.
+    fn crate(mut self, mut crate: clean::Crate, cache: Cache) {
         let mut item = match crate.module.take() {
             Some(i) => i,
             None => return
         };
         item.name = Some(crate.name);
 
-        let (port, chan) = comm::stream::<Work>();
-        let port = SharedPort::new(port);
-        let chan = SharedChan::new(chan);
-        let (prog_port, prog_chan) = comm::stream();
-        let prog_chan = SharedChan::new(prog_chan);
-        let cache = RWArc::new(cache);
-
-        // Each worker thread receives work from a shared port and publishes
-        // new work onto the corresponding shared port. All of the workers are
-        // using the same channel/port. Through this, the crate is recursed on
-        // in a hierarchical fashion, and parallelization is only achieved if
-        // one node in the hierarchy has more than one child (very common).
-        for i in range(0, workers) {
-            let port = port.clone();
-            let chan = chan.clone();
-            let prog_chan = prog_chan.clone();
-
-            let mut task = task::task();
-            task.name(format!("worker{}", i));
-            let cache = cache.clone();
-            do task.spawn {
-                worker(cache, &port, &chan, &prog_chan);
-            }
-
-            fn worker(cache: RWArc<Cache>,
-                      port: &SharedPort<Work>,
-                      chan: &SharedChan<Work>,
-                      prog_chan: &SharedChan<Progress>) {
-                local_data::set(cache_key, cache);
-
-                loop {
-                    match port.recv() {
-                        Process(cx, item) => {
-                            let mut cx = cx;
-
-                            // If we fail, everything else should still get
-                            // completed.
-                            let _guard = ChannelGuard {
-                                channel: prog_chan.clone(),
-                            };
-                            cx.item(item, |cx, item| {
-                                prog_chan.send(JobNew);
-                                chan.send(Process(cx.clone(), item));
-                            })
-                        }
-                        Die => break,
-                    }
-                }
-            }
-        }
-
-        // Send off the initial job
-        chan.send(Process(self, item));
-        let mut jobs = 1;
+        // using a rwarc makes this parallelizable in the future
+        local_data::set(cache_key, Arc::new(cache));
 
-        // Keep track of the number of jobs active in the system and kill
-        // everything once there are no more jobs remaining.
-        loop {
-            match prog_port.recv() {
-                JobNew => jobs += 1,
-                JobDone => jobs -= 1,
-            }
-
-            if jobs == 0 { break }
-        }
-
-        for _ in range(0, workers) {
-            chan.send(Die);
+        let mut work = ~[item];
+        while work.len() > 0 {
+            let item = work.pop();
+            self.item(item, |_cx, item| {
+                work.push(item);
+            })
         }
     }
 
@@ -1210,29 +1119,28 @@ fn item_trait(w: &mut Writer, it: &clean::Item, t: &clean::Trait) {
     }
 
     local_data::get(cache_key, |cache| {
-        cache.unwrap().read(|cache| {
-            match cache.implementors.find(&it.id) {
-                Some(implementors) => {
-                    write!(w, "
-                        <h2 id='implementors'>Implementors</h2>
-                        <ul class='item-list'>
-                    ");
-                    for i in implementors.iter() {
-                        match *i {
-                            PathType(ref ty) => {
-                                write!(w, "<li><code>{}</code></li>", *ty);
-                            }
-                            OtherType(ref generics, ref trait_, ref for_) => {
-                                write!(w, "<li><code>impl{} {} for {}</code></li>",
-                                       *generics, *trait_, *for_);
-                            }
+        let cache = cache.unwrap().get();
+        match cache.implementors.find(&it.id) {
+            Some(implementors) => {
+                write!(w, "
+                    <h2 id='implementors'>Implementors</h2>
+                    <ul class='item-list'>
+                ");
+                for i in implementors.iter() {
+                    match *i {
+                        PathType(ref ty) => {
+                            write!(w, "<li><code>{}</code></li>", *ty);
+                        }
+                        OtherType(ref generics, ref trait_, ref for_) => {
+                            write!(w, "<li><code>impl{} {} for {}</code></li>",
+                                   *generics, *trait_, *for_);
                         }
                     }
-                    write!(w, "</ul>");
                 }
-                None => {}
+                write!(w, "</ul>");
             }
-        })
+            None => {}
+        }
     })
 }
 
@@ -1422,36 +1330,34 @@ fn render_struct(w: &mut Writer, it: &clean::Item,
 
 fn render_methods(w: &mut Writer, it: &clean::Item) {
     local_data::get(cache_key, |cache| {
-        let cache = cache.unwrap();
-        cache.read(|c| {
-            match c.impls.find(&it.id) {
-                Some(v) => {
-                    let mut non_trait = v.iter().filter(|p| {
-                        p.n0_ref().trait_.is_none()
-                    });
-                    let non_trait = non_trait.to_owned_vec();
-                    let mut traits = v.iter().filter(|p| {
-                        p.n0_ref().trait_.is_some()
-                    });
-                    let traits = traits.to_owned_vec();
-
-                    if non_trait.len() > 0 {
-                        write!(w, "<h2 id='methods'>Methods</h2>");
-                        for &(ref i, ref dox) in non_trait.move_iter() {
-                            render_impl(w, i, dox);
-                        }
+        let c = cache.unwrap().get();
+        match c.impls.find(&it.id) {
+            Some(v) => {
+                let mut non_trait = v.iter().filter(|p| {
+                    p.n0_ref().trait_.is_none()
+                });
+                let non_trait = non_trait.to_owned_vec();
+                let mut traits = v.iter().filter(|p| {
+                    p.n0_ref().trait_.is_some()
+                });
+                let traits = traits.to_owned_vec();
+
+                if non_trait.len() > 0 {
+                    write!(w, "<h2 id='methods'>Methods</h2>");
+                    for &(ref i, ref dox) in non_trait.move_iter() {
+                        render_impl(w, i, dox);
                     }
-                    if traits.len() > 0 {
-                        write!(w, "<h2 id='implementations'>Trait \
-                                   Implementations</h2>");
-                        for &(ref i, ref dox) in traits.move_iter() {
-                            render_impl(w, i, dox);
-                        }
+                }
+                if traits.len() > 0 {
+                    write!(w, "<h2 id='implementations'>Trait \
+                               Implementations</h2>");
+                    for &(ref i, ref dox) in traits.move_iter() {
+                        render_impl(w, i, dox);
                     }
                 }
-                None => {}
             }
-        })
+            None => {}
+        }
     })
 }
 
@@ -1502,27 +1408,26 @@ fn render_impl(w: &mut Writer, i: &clean::Impl, dox: &Option<~str>) {
             Some(id) => id,
         };
         local_data::get(cache_key, |cache| {
-            cache.unwrap().read(|cache| {
-                match cache.traits.find(&trait_id) {
-                    Some(t) => {
-                        let name = meth.name.clone();
-                        match t.methods.iter().find(|t| t.item().name == name) {
-                            Some(method) => {
-                                match method.item().doc_value() {
-                                    Some(s) => {
-                                        write!(w,
-                                               "<div class='docblock'>{}</div>",
-                                               Markdown(s));
-                                    }
-                                    None => {}
+            let cache = cache.unwrap().get();
+            match cache.traits.find(&trait_id) {
+                Some(t) => {
+                    let name = meth.name.clone();
+                    match t.methods.iter().find(|t| t.item().name == name) {
+                        Some(method) => {
+                            match method.item().doc_value() {
+                                Some(s) => {
+                                    write!(w,
+                                           "<div class='docblock'>{}</div>",
+                                           Markdown(s));
                                 }
+                                None => {}
                             }
-                            None => {}
                         }
+                        None => {}
                     }
-                    None => {}
                 }
-            })
+                None => {}
+            }
         })
     }
 
@@ -1532,22 +1437,21 @@ fn render_impl(w: &mut Writer, i: &clean::Impl, dox: &Option<~str>) {
         None => {}
         Some(id) => {
             local_data::get(cache_key, |cache| {
-                cache.unwrap().read(|cache| {
-                    match cache.traits.find(&id) {
-                        Some(t) => {
-                            for method in t.methods.iter() {
-                                let n = method.item().name.clone();
-                                match i.methods.iter().find(|m| m.name == n) {
-                                    Some(..) => continue,
-                                    None => {}
-                                }
-
-                                docmeth(w, method.item());
+                let cache = cache.unwrap().get();
+                match cache.traits.find(&id) {
+                    Some(t) => {
+                        for method in t.methods.iter() {
+                            let n = method.item().name.clone();
+                            match i.methods.iter().find(|m| m.name == n) {
+                                Some(..) => continue,
+                                None => {}
                             }
+
+                            docmeth(w, method.item());
                         }
-                        None => {}
                     }
-                })
+                    None => {}
+                }
             })
         }
     }
diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs
index db698f80e38..67777050cf3 100644
--- a/src/librustuv/signal.rs
+++ b/src/librustuv/signal.rs
@@ -11,7 +11,7 @@
 use std::libc::c_int;
 use std::io::signal::Signum;
 use std::rt::sched::{SchedHandle, Scheduler};
-use std::comm::{SharedChan, SendDeferred};
+use std::comm::SharedChan;
 use std::rt::local::Local;
 use std::rt::rtio::RtioSignal;
 
diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs
index a229c000066..7efdafd2369 100644
--- a/src/librustuv/timer.rs
+++ b/src/librustuv/timer.rs
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred};
 use std::libc::c_int;
 use std::rt::BlockedTask;
 use std::rt::local::Local;
@@ -28,7 +27,7 @@ pub struct TimerWatcher {
 
 pub enum NextAction {
     WakeTask(BlockedTask),
-    SendOnce(ChanOne<()>),
+    SendOnce(Chan<()>),
     SendMany(Chan<()>),
 }
 
@@ -95,8 +94,8 @@ impl RtioTimer for TimerWatcher {
         self.stop();
     }
 
-    fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
-        let (port, chan) = oneshot();
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        let (port, chan) = Chan::new();
 
         // similarly to the destructor, we must drop the previous action outside
         // of the homing missile
@@ -111,7 +110,7 @@ impl RtioTimer for TimerWatcher {
     }
 
     fn period(&mut self, msecs: u64) -> Port<()> {
-        let (port, chan) = stream();
+        let (port, chan) = Chan::new();
 
         // similarly to the destructor, we must drop the previous action outside
         // of the homing missile
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
index 81a77000bad..2d9bc6e9c12 100644
--- a/src/libstd/comm/select.rs
+++ b/src/libstd/comm/select.rs
@@ -41,6 +41,8 @@
 //!     }
 //! )
 
+#[allow(dead_code)];
+
 use cast;
 use iter::Iterator;
 use kinds::Send;
diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs
index b3e5a9a0c86..7f94af8307e 100644
--- a/src/libstd/io/comm_adapters.rs
+++ b/src/libstd/io/comm_adapters.rs
@@ -10,7 +10,7 @@
 
 use prelude::*;
 
-use comm::{GenericPort, GenericChan, GenericSmartChan};
+use comm::{Port, Chan};
 use cmp;
 use io;
 use option::{None, Option, Some};
@@ -30,15 +30,15 @@ use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};
 ///     None => println!("At the end of the stream!")
 /// }
 /// ```
-pub struct PortReader<P> {
+pub struct PortReader {
     priv buf: Option<~[u8]>,  // A buffer of bytes received but not consumed.
     priv pos: uint,           // How many of the buffered bytes have already be consumed.
-    priv port: P,             // The port to pull data from.
+    priv port: Port<~[u8]>,   // The port to pull data from.
     priv closed: bool,        // Whether the pipe this port connects to has been closed.
 }
 
-impl<P: GenericPort<~[u8]>> PortReader<P> {
-    pub fn new(port: P) -> PortReader<P> {
+impl PortReader {
+    pub fn new(port: Port<~[u8]>) -> PortReader<P> {
         PortReader {
             buf: None,
             pos: 0,
@@ -48,7 +48,7 @@ impl<P: GenericPort<~[u8]>> PortReader<P> {
     }
 }
 
-impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
+impl Reader for PortReader {
     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
         let mut num_read = 0;
         loop {
@@ -67,7 +67,7 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
                 break;
             }
             self.pos = 0;
-            self.buf = self.port.try_recv();
+            self.buf = self.port.recv_opt();
             self.closed = self.buf.is_none();
         }
         if self.closed && num_read == 0 {
@@ -89,17 +89,17 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
 /// let writer = ChanWriter::new(chan);
 /// writer.write("hello, world".as_bytes());
 /// ```
-pub struct ChanWriter<C> {
-    chan: C,
+pub struct ChanWriter {
+    chan: Chan<~[u8]>,
 }
 
-impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> {
+impl ChanWriter {
     pub fn new(chan: C) -> ChanWriter<C> {
         ChanWriter { chan: chan }
     }
 }
 
-impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
+impl Writer for ChanWriter {
     fn write(&mut self, buf: &[u8]) {
         if !self.chan.try_send(buf.to_owned()) {
             io::io_error::cond.raise(io::IoError {
@@ -111,28 +111,6 @@ impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
     }
 }
 
-pub struct ReaderPort<R>;
-
-impl<R: Reader> ReaderPort<R> {
-    pub fn new(_reader: R) -> ReaderPort<R> { fail!() }
-}
-
-impl<R: Reader> GenericPort<~[u8]> for ReaderPort<R> {
-    fn recv(&self) -> ~[u8] { fail!() }
-
-    fn try_recv(&self) -> Option<~[u8]> { fail!() }
-}
-
-pub struct WriterChan<W>;
-
-impl<W: Writer> WriterChan<W> {
-    pub fn new(_writer: W) -> WriterChan<W> { fail!() }
-}
-
-impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> {
-    fn send(&self, _x: ~[u8]) { fail!() }
-}
-
 
 #[cfg(test)]
 mod test {
@@ -144,7 +122,7 @@ mod test {
 
     #[test]
     fn test_port_reader() {
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         do task::spawn {
           chan.send(~[1u8, 2u8]);
           chan.send(~[]);
@@ -199,7 +177,7 @@ mod test {
 
     #[test]
     fn test_chan_writer() {
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         let mut writer = ChanWriter::new(chan);
         writer.write_be_u32(42);
 
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index c0bdc2a2014..2e9056a6aee 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -318,9 +318,6 @@ mod option;
 /// Basic stream compression. XXX: Belongs with other flate code
 pub mod flate;
 
-/// Interop between byte streams and pipes. Not sure where it belongs
-pub mod comm_adapters;
-
 /// Extension traits
 pub mod extensions;
 
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 3c7582db7b8..a6230ede7e3 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -151,7 +151,6 @@ mod test {
     use io::net::ip::{Ipv4Addr, SocketAddr};
     use io::*;
     use prelude::*;
-    use rt::comm::oneshot;
 
     #[test] #[ignore]
     fn bind_error() {
@@ -195,7 +194,7 @@ mod test {
     fn smoke_test_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -206,11 +205,9 @@ mod test {
                 assert!(buf[0] == 99);
             }
 
-            do spawntask {
-                port.recv();
-                let mut stream = TcpStream::connect(addr);
-                stream.write([99]);
-            }
+            port.recv();
+            let mut stream = TcpStream::connect(addr);
+            stream.write([99]);
         }
     }
 
@@ -218,7 +215,7 @@ mod test {
     fn smoke_test_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -229,11 +226,9 @@ mod test {
                 assert!(buf[0] == 99);
             }
 
-            do spawntask {
-                port.recv();
-                let mut stream = TcpStream::connect(addr);
-                stream.write([99]);
-            }
+            port.recv();
+            let mut stream = TcpStream::connect(addr);
+            stream.write([99]);
         }
     }
 
@@ -241,7 +236,7 @@ mod test {
     fn read_eof_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -252,11 +247,9 @@ mod test {
                 assert!(nread.is_none());
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -264,7 +257,7 @@ mod test {
     fn read_eof_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -275,11 +268,9 @@ mod test {
                 assert!(nread.is_none());
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -287,7 +278,7 @@ mod test {
     fn read_eof_twice_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -308,11 +299,9 @@ mod test {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -320,7 +309,7 @@ mod test {
     fn read_eof_twice_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -341,11 +330,9 @@ mod test {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -353,7 +340,7 @@ mod test {
     fn write_close_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -377,11 +364,9 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -389,7 +374,7 @@ mod test {
     fn write_close_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -413,11 +398,9 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -426,7 +409,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             let max = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -438,13 +421,11 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                max.times(|| {
-                    let mut stream = TcpStream::connect(addr);
-                    stream.write([99]);
-                });
-            }
+            port.recv();
+            max.times(|| {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            });
         }
     }
 
@@ -453,7 +434,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             let max = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -465,13 +446,11 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                max.times(|| {
-                    let mut stream = TcpStream::connect(addr);
-                    stream.write([99]);
-                });
-            }
+            port.recv();
+            max.times(|| {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            });
         }
     }
 
@@ -480,7 +459,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -520,7 +499,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -560,7 +539,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -599,7 +578,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -653,7 +632,7 @@ mod test {
     #[cfg(test)]
     fn peer_name(addr: SocketAddr) {
         do run_in_mt_newsched_task {
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -662,20 +641,18 @@ mod test {
                 acceptor.accept();
             }
 
-            do spawntask {
-                port.recv();
-                let stream = TcpStream::connect(addr);
+            port.recv();
+            let stream = TcpStream::connect(addr);
 
-                assert!(stream.is_some());
-                let mut stream = stream.unwrap();
+            assert!(stream.is_some());
+            let mut stream = stream.unwrap();
 
-                // Make sure peer_name gives us the
-                // address/port of the peer we've
-                // connected to.
-                let peer_name = stream.peer_name();
-                assert!(peer_name.is_some());
-                assert_eq!(addr, peer_name.unwrap());
-            }
+            // Make sure peer_name gives us the
+            // address/port of the peer we've
+            // connected to.
+            let peer_name = stream.peer_name();
+            assert!(peer_name.is_some());
+            assert_eq!(addr, peer_name.unwrap());
         }
     }
 
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index 87cf59aba3b..1e56f964bea 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -107,8 +107,7 @@ mod test {
     use rt::test::*;
     use io::net::ip::{Ipv4Addr, SocketAddr};
     use io::*;
-    use option::{Some, None};
-    use rt::comm::oneshot;
+    use prelude::*;
 
     #[test]  #[ignore]
     fn bind_error() {
@@ -131,7 +130,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip4();
             let client_ip = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -151,14 +150,12 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(ref mut client) => {
-                        port.recv();
-                        client.sendto([99], server_ip)
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(ref mut client) => {
+                    port.recv();
+                    client.sendto([99], server_ip)
                 }
+                None => fail!()
             }
         }
     }
@@ -168,7 +165,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip6();
             let client_ip = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -188,14 +185,12 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(ref mut client) => {
-                        port.recv();
-                        client.sendto([99], server_ip)
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(ref mut client) => {
+                    port.recv();
+                    client.sendto([99], server_ip)
                 }
+                None => fail!()
             }
         }
     }
@@ -205,7 +200,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip4();
             let client_ip = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -226,16 +221,14 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(client) => {
-                        let client = ~client;
-                        let mut stream = client.connect(server_ip);
-                        port.recv();
-                        stream.write([99]);
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(client) => {
+                    let client = ~client;
+                    let mut stream = client.connect(server_ip);
+                    port.recv();
+                    stream.write([99]);
                 }
+                None => fail!()
             }
         }
     }
@@ -245,7 +238,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip6();
             let client_ip = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -266,16 +259,14 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(client) => {
-                        let client = ~client;
-                        let mut stream = client.connect(server_ip);
-                        port.recv();
-                        stream.write([99]);
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(client) => {
+                    let client = ~client;
+                    let mut stream = client.connect(server_ip);
+                    port.recv();
+                    stream.write([99]);
                 }
+                None => fail!()
             }
         }
     }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index c1f75465d9c..d3fc265cf2a 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -152,25 +152,22 @@ mod tests {
     use super::*;
     use rt::test::*;
     use io::*;
-    use rt::comm::oneshot;
 
     fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
         do run_in_mt_newsched_task {
             let path1 = next_test_unix();
             let path2 = path1.clone();
-            let (port, chan) = oneshot();
             let (client, server) = (client, server);
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = UnixListener::bind(&path1).listen();
                 chan.send(());
-                server(acceptor.accept().unwrap());
+                server.take()(acceptor.accept().unwrap());
             }
 
-            do spawntask {
-                port.recv();
-                client(UnixStream::connect(&path2).unwrap());
-            }
+            port.recv();
+            client.take()(UnixStream::connect(&path2).unwrap());
         }
     }
 
@@ -251,7 +248,7 @@ mod tests {
             let times = 10;
             let path1 = next_test_unix();
             let path2 = path1.clone();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = UnixListener::bind(&path1).listen();
@@ -264,13 +261,11 @@ mod tests {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                times.times(|| {
-                    let mut stream = UnixStream::connect(&path2);
-                    stream.write([100]);
-                })
-            }
+            port.recv();
+            times.times(|| {
+                let mut stream = UnixStream::connect(&path2);
+                stream.write([100]);
+            })
         }
     }
 
diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs
index 3b6c6013dd2..c568a19dfa2 100644
--- a/src/libstd/io/signal.rs
+++ b/src/libstd/io/signal.rs
@@ -20,7 +20,7 @@ definitions for a number of signals.
 */
 
 use clone::Clone;
-use comm::{Port, SharedChan, stream};
+use comm::{Port, SharedChan};
 use container::{Map, MutableMap};
 use hashmap;
 use io::io_error;
@@ -93,9 +93,9 @@ impl Listener {
     /// Creates a new listener for signals. Once created, signals are bound via
     /// the `register` method (otherwise nothing will ever be received)
     pub fn new() -> Listener {
-        let (port, chan) = stream();
+        let (port, chan) = SharedChan::new();
         Listener {
-            chan: SharedChan::new(chan),
+            chan: chan,
             port: port,
             handles: hashmap::HashMap::new(),
         }
@@ -149,7 +149,6 @@ mod test {
     use libc;
     use io::timer;
     use super::{Listener, Interrupt};
-    use comm::{GenericPort, Peekable};
 
     // kill is only available on Unixes
     #[cfg(unix)]
@@ -198,9 +197,7 @@ mod test {
         s2.unregister(Interrupt);
         sigint();
         timer::sleep(10);
-        if s2.port.peek() {
-            fail!("Unexpected {:?}", s2.port.recv());
-        }
+        assert!(s2.port.try_recv().is_none());
     }
 
     #[cfg(windows)]
diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs
index 202e02d55d0..5fb64ab3d09 100644
--- a/src/libstd/io/timer.rs
+++ b/src/libstd/io/timer.rs
@@ -38,7 +38,7 @@ loop {
 
 */
 
-use comm::{Port, PortOne};
+use comm::Port;
 use option::{Option, Some, None};
 use result::{Ok, Err};
 use io::io_error;
@@ -86,7 +86,7 @@ impl Timer {
     /// Note that this invalidates any previous port which has been created by
     /// this timer, and that the returned port will be invalidated once the
     /// timer is destroyed (when it falls out of scope).
-    pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+    pub fn oneshot(&mut self, msecs: u64) -> Port<()> {
         self.obj.oneshot(msecs)
     }
 
diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs
index 6948eb60b1f..53e26e435b7 100644
--- a/src/libstd/lib.rs
+++ b/src/libstd/lib.rs
@@ -156,7 +156,6 @@ pub mod trie;
 
 pub mod task;
 pub mod comm;
-pub mod select;
 pub mod local_data;
 
 
diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs
index 83439d4c903..201259679cc 100644
--- a/src/libstd/prelude.rs
+++ b/src/libstd/prelude.rs
@@ -84,7 +84,7 @@ pub use vec::{OwnedVector, OwnedCopyableVector,OwnedEqVector, MutableVector};
 pub use vec::{Vector, VectorVector, CopyableVector, ImmutableVector};
 
 // Reexported runtime types
-pub use comm::{stream, Port, Chan, GenericChan, GenericSmartChan, GenericPort, Peekable};
+pub use comm::{Port, Chan, SharedChan};
 pub use task::spawn;
 
 /// Disposes of a value.
diff --git a/src/libstd/rand/os.rs b/src/libstd/rand/os.rs
index 5558b8b3348..1eaf1a29fa8 100644
--- a/src/libstd/rand/os.rs
+++ b/src/libstd/rand/os.rs
@@ -135,8 +135,10 @@ impl Drop for OSRng {
 
 #[cfg(test)]
 mod test {
+    use prelude::*;
     use super::*;
     use rand::Rng;
+    use task;
 
     #[test]
     fn test_os_rng() {
@@ -151,16 +153,10 @@ mod test {
 
     #[test]
     fn test_os_rng_tasks() {
-        use task;
-        use comm;
-        use comm::{GenericChan, GenericPort};
-        use option::{None, Some};
-        use iter::{Iterator, range};
-        use vec::{ImmutableVector, OwnedVector};
 
         let mut chans = ~[];
         for _ in range(0, 20) {
-            let (p, c) = comm::stream();
+            let (p, c) = Chan::new();
             chans.push(c);
             do task::spawn {
                 // wait until all the tasks are ready to go.
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
deleted file mode 100644
index 2fa34994292..00000000000
--- a/src/libstd/rt/comm.rs
+++ /dev/null
@@ -1,1141 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Ports and channels.
-
-use option::*;
-use cast;
-use ops::Drop;
-use rt::kill::BlockedTask;
-use kinds::Send;
-use rt;
-use rt::sched::Scheduler;
-use rt::local::Local;
-use rt::select::{SelectInner, SelectPortInner};
-use select::{Select, SelectPort};
-use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
-use unstable::sync::UnsafeArc;
-use util;
-use util::Void;
-use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable, SendDeferred};
-use cell::RefCell;
-use clone::Clone;
-use tuple::ImmutableTuple;
-
-/// A combined refcount / BlockedTask-as-uint pointer.
-///
-/// Can be equal to the following values:
-///
-/// * 2 - both endpoints are alive
-/// * 1 - either the sender or the receiver is dead, determined by context
-/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
-type State = uint;
-
-static STATE_BOTH: State = 2;
-static STATE_ONE: State = 1;
-
-/// The heap-allocated structure shared between two endpoints.
-struct Packet<T> {
-    state: AtomicUint,
-    payload: Option<T>,
-}
-
-// A one-shot channel.
-pub struct ChanOne<T> {
-    priv void_packet: *mut Void,
-    priv suppress_finalize: bool
-}
-
-/// A one-shot port.
-pub struct PortOne<T> {
-    priv void_packet: *mut Void,
-    priv suppress_finalize: bool
-}
-
-pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
-    let packet: ~Packet<T> = ~Packet {
-        state: AtomicUint::new(STATE_BOTH),
-        payload: None
-    };
-
-    unsafe {
-        let packet: *mut Void = cast::transmute(packet);
-        let port = PortOne {
-            void_packet: packet,
-            suppress_finalize: false
-        };
-        let chan = ChanOne {
-            void_packet: packet,
-            suppress_finalize: false
-        };
-        return (port, chan);
-    }
-}
-
-impl<T: Send> ChanOne<T> {
-    #[inline]
-    fn packet(&self) -> *mut Packet<T> {
-        unsafe {
-            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
-            let p: *mut Packet<T> = &mut **p;
-            return p;
-        }
-    }
-
-    /// Send a message on the one-shot channel. If a receiver task is blocked
-    /// waiting for the message, will wake it up and reschedule to it.
-    pub fn send(self, val: T) {
-        self.try_send(val);
-    }
-
-    /// As `send`, but also returns whether or not the receiver endpoint is still open.
-    pub fn try_send(self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-
-    /// Send a message without immediately rescheduling to a blocked receiver.
-    /// This can be useful in contexts where rescheduling is forbidden, or to
-    /// optimize for when the sender expects to still have useful work to do.
-    pub fn send_deferred(self, val: T) {
-        self.try_send_deferred(val);
-    }
-
-    /// As `send_deferred` and `try_send` together.
-    pub fn try_send_deferred(self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-
-    // 'do_resched' configures whether the scheduler immediately switches to
-    // the receiving task, or leaves the sending task still running.
-    fn try_send_inner(mut self, val: T, do_resched: bool) -> bool {
-        if do_resched {
-            rtassert!(!rt::in_sched_context());
-        }
-
-        // In order to prevent starvation of other tasks in situations
-        // where a task sends repeatedly without ever receiving, we
-        // occassionally yield instead of doing a send immediately.
-        // Only doing this if we're doing a rescheduling send,
-        // otherwise the caller is expecting not to context switch.
-        if do_resched {
-            // XXX: This TLS hit should be combined with other uses of the scheduler below
-            let sched: ~Scheduler = Local::take();
-            sched.maybe_yield();
-        }
-
-        let mut recvr_active = true;
-        let packet = self.packet();
-
-        unsafe {
-
-            // Install the payload
-            rtassert!((*packet).payload.is_none());
-            (*packet).payload = Some(val);
-
-            // Atomically swap out the old state to figure out what
-            // the port's up to, issuing a release barrier to prevent
-            // reordering of the payload write. This also issues an
-            // acquire barrier that keeps the subsequent access of the
-            // ~Task pointer from being reordered.
-            let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
-
-            // Suppress the synchronizing actions in the finalizer. We're
-            // done with the packet. NB: In case of do_resched, this *must*
-            // happen before waking up a blocked task (or be unkillable),
-            // because we might get a kill signal during the reschedule.
-            self.suppress_finalize = true;
-
-            match oldstate {
-                STATE_BOTH => {
-                    // Port is not waiting yet. Nothing to do
-                }
-                STATE_ONE => {
-                    // Port has closed. Need to clean up.
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                    recvr_active = false;
-                }
-                task_as_state => {
-                    // Port is blocked. Wake it up.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    if do_resched {
-                        recvr.wake().map(|woken_task| {
-                            Scheduler::run_task(woken_task);
-                        });
-                    } else {
-                        let mut sched = Local::borrow(None::<Scheduler>);
-                        sched.get().enqueue_blocked_task(recvr);
-                    }
-                }
-            }
-        }
-
-        return recvr_active;
-    }
-}
-
-impl<T: Send> PortOne<T> {
-    fn packet(&self) -> *mut Packet<T> {
-        unsafe {
-            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
-            let p: *mut Packet<T> = &mut **p;
-            return p;
-        }
-    }
-
-    /// Wait for a message on the one-shot port. Fails if the send end is closed.
-    pub fn recv(self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on closed channel");
-            }
-        }
-    }
-
-    /// As `recv`, but returns `None` if the send end is closed rather than failing.
-    pub fn try_recv(mut self) -> Option<T> {
-        // Optimistic check. If data was sent already, we don't even need to block.
-        // No release barrier needed here; we're not handing off our task pointer yet.
-        if !self.optimistic_check() {
-            // No data available yet.
-            // Switch to the scheduler to put the ~Task into the Packet state.
-            let sched: ~Scheduler = Local::take();
-            sched.deschedule_running_task_and_then(|sched, task| {
-                self.block_on(sched, task);
-            })
-        }
-
-        // Task resumes.
-        self.recv_ready()
-    }
-}
-
-impl<T: Send> SelectInner for PortOne<T> {
-    #[inline] #[cfg(not(test))]
-    fn optimistic_check(&mut self) -> bool {
-        unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
-    }
-
-    #[inline] #[cfg(test)]
-    fn optimistic_check(&mut self) -> bool {
-        // The optimistic check is never necessary for correctness. For testing
-        // purposes, making it randomly return false simulates a racing sender.
-        use rand::{Rand};
-        let mut sched = Local::borrow(None::<Scheduler>);
-        let actually_check = Rand::rand(&mut sched.get().rng);
-        if actually_check {
-            unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
-        } else {
-            false
-        }
-    }
-
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        unsafe {
-            // Atomically swap the task pointer into the Packet state, issuing
-            // an acquire barrier to prevent reordering of the subsequent read
-            // of the payload. Also issues a release barrier to prevent
-            // reordering of any previous writes to the task structure.
-            let task_as_state = task.cast_to_uint();
-            let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Data has not been sent. Now we're blocked.
-                    rtdebug!("non-rendezvous recv");
-                    false
-                }
-                STATE_ONE => {
-                    // Re-record that we are the only owner of the packet.
-                    // No barrier needed, even if the task gets reawoken
-                    // on a different core -- this is analogous to writing a
-                    // payload; a barrier in enqueueing the task protects it.
-                    // NB(#8132). This *must* occur before the enqueue below.
-                    // FIXME(#6842, #8130) This is usually only needed for the
-                    // assertion in recv_ready, except in the case of select().
-                    // This won't actually ever have cacheline contention, but
-                    // maybe should be optimized out with a cfg(test) anyway?
-                    (*self.packet()).state.store(STATE_ONE, Relaxed);
-
-                    rtdebug!("rendezvous recv");
-
-                    // Channel is closed. Switch back and check the data.
-                    // NB: We have to drop back into the scheduler event loop here
-                    // instead of switching immediately back or we could end up
-                    // triggering infinite recursion on the scheduler's stack.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    sched.enqueue_blocked_task(recvr);
-                    true
-                }
-                _ => rtabort!("can't block_on; a task is already blocked")
-            }
-        }
-    }
-
-    // This is the only select trait function that's not also used in recv.
-    fn unblock_from(&mut self) -> bool {
-        let packet = self.packet();
-        unsafe {
-            // In case the data is available, the acquire barrier here matches
-            // the release barrier the sender used to release the payload.
-            match (*packet).state.load(Acquire) {
-                // Impossible. We removed STATE_BOTH when blocking on it, and
-                // no self-respecting sender would put it back.
-                STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
-                // Here, a sender already tried to wake us up. Perhaps they
-                // even succeeded! Data is available.
-                STATE_ONE     => true,
-                // Still registered as blocked. Need to "unblock" the pointer.
-                task_as_state => {
-                    // In the window between the load and the CAS, a sender
-                    // might take the pointer and set the refcount to ONE. If
-                    // that happens, we shouldn't clobber that with BOTH!
-                    // Acquire barrier again for the same reason as above.
-                    match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
-                                                           Acquire) {
-                        STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
-                        STATE_ONE  => true, // Lost the race. Data available.
-                        same_ptr   => {
-                            // We successfully unblocked our task pointer.
-                            rtassert!(task_as_state == same_ptr);
-                            let handle = BlockedTask::cast_from_uint(task_as_state);
-                            // Because we are already awake, the handle we
-                            // gave to this port shall already be empty.
-                            handle.assert_already_awake();
-                            false
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
-
-impl<T: Send> Select for PortOne<T> { }
-
-impl<T: Send> SelectPortInner<T> for PortOne<T> {
-    fn recv_ready(mut self) -> Option<T> {
-        let packet = self.packet();
-
-        // No further memory barrier is needed here to access the
-        // payload. Some scenarios:
-        //
-        // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
-        // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
-        //    and ran on its thread. The sending task issued a read barrier when taking the
-        //    pointer to the receiving task.
-        // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
-        //    is pinned to some other scheduler, so the sending task had to give us to
-        //    a different scheduler for resuming. That send synchronized memory.
-        unsafe {
-            // See corresponding store() above in block_on for rationale.
-            // FIXME(#8130) This can happen only in test builds.
-            // This load is not required for correctness and may be compiled out.
-            rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
-
-            let payload = (*packet).payload.take();
-
-            // The sender has closed up shop. Drop the packet.
-            let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-            // Suppress the synchronizing actions in the finalizer. We're done with the packet.
-            self.suppress_finalize = true;
-            return payload;
-        }
-    }
-}
-
-impl<T: Send> SelectPort<T> for PortOne<T> { }
-
-impl<T: Send> Peekable<T> for PortOne<T> {
-    fn peek(&self) -> bool {
-        unsafe {
-            let packet: *mut Packet<T> = self.packet();
-            let oldstate = (*packet).state.load(SeqCst);
-            match oldstate {
-                STATE_BOTH => false,
-                STATE_ONE => (*packet).payload.is_some(),
-                _ => rtabort!("peeked on a blocked task")
-            }
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for ChanOne<T> {
-    fn drop(&mut self) {
-        if self.suppress_finalize { return }
-
-        unsafe {
-            let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Port still active. It will destroy the Packet.
-                },
-                STATE_ONE => {
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                },
-                task_as_state => {
-                    // The port is blocked waiting for a message we will never send. Wake it.
-                    rtassert!((*self.packet()).payload.is_none());
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    recvr.wake().map(|woken_task| {
-                        Scheduler::run_task(woken_task);
-                    });
-                }
-            }
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for PortOne<T> {
-    fn drop(&mut self) {
-        if self.suppress_finalize { return }
-
-        unsafe {
-            let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Chan still active. It will destroy the packet.
-                },
-                STATE_ONE => {
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                }
-                task_as_state => {
-                    // This case occurs during unwinding, when the blocked
-                    // receiver was killed awake. The task can't still be
-                    // blocked (we are it), but we need to free the handle.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    recvr.assert_already_awake();
-                }
-            }
-        }
-    }
-}
-
-struct StreamPayload<T> {
-    val: T,
-    next: PortOne<StreamPayload<T>>
-}
-
-type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
-type StreamPortOne<T> = PortOne<StreamPayload<T>>;
-
-/// A channel with unbounded size.
-pub struct Chan<T> {
-    // FIXME #5372. Using RefCell because we don't take &mut self
-    next: RefCell<StreamChanOne<T>>
-}
-
-/// An port with unbounded size.
-pub struct Port<T> {
-    // FIXME #5372. Using RefCell because we don't take &mut self
-    next: RefCell<Option<StreamPortOne<T>>>
-}
-
-pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
-    let (pone, cone) = oneshot();
-    let port = Port { next: RefCell::new(Some(pone)) };
-    let chan = Chan { next: RefCell::new(cone) };
-    return (port, chan);
-}
-
-impl<T: Send> Chan<T> {
-    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
-        let (next_pone, mut cone) = oneshot();
-        let mut b = self.next.borrow_mut();
-        util::swap(&mut cone, b.get());
-        cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
-    }
-}
-
-impl<T: Send> GenericChan<T> for Chan<T> {
-    fn send(&self, val: T) {
-        self.try_send(val);
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for Chan<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for Chan<T> {
-    fn send_deferred(&self, val: T) {
-        self.try_send_deferred(val);
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-}
-
-impl<T: Send> GenericPort<T> for Port<T> {
-    fn recv(&self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on closed channel");
-            }
-        }
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        let mut b = self.next.borrow_mut();
-        b.get().take().map_default(None, |pone| {
-            match pone.try_recv() {
-                Some(StreamPayload { val, next }) => {
-                    *b.get() = Some(next);
-                    Some(val)
-                }
-                None => None
-            }
-        })
-    }
-}
-
-impl<T: Send> Peekable<T> for Port<T> {
-    fn peek(&self) -> bool {
-        self.next.with_mut(|p| p.get_mut_ref().peek())
-    }
-}
-
-// XXX: Kind of gross. A Port<T> should be selectable so you can make an array
-// of them, but a &Port<T> should also be selectable so you can select2 on it
-// alongside a PortOne<U> without passing the port by value in recv_ready.
-
-impl<'a, T: Send> SelectInner for &'a Port<T> {
-    #[inline]
-    fn optimistic_check(&mut self) -> bool {
-        self.next.with_mut(|pone| { pone.get_mut_ref().optimistic_check() })
-    }
-
-    #[inline]
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        let mut b = self.next.borrow_mut();
-        b.get().get_mut_ref().block_on(sched, task)
-    }
-
-    #[inline]
-    fn unblock_from(&mut self) -> bool {
-        self.next.with_mut(|pone| { pone.get_mut_ref().unblock_from() })
-    }
-}
-
-impl<'a, T: Send> Select for &'a Port<T> { }
-
-impl<T: Send> SelectInner for Port<T> {
-    #[inline]
-    fn optimistic_check(&mut self) -> bool {
-        (&*self).optimistic_check()
-    }
-
-    #[inline]
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        (&*self).block_on(sched, task)
-    }
-
-    #[inline]
-    fn unblock_from(&mut self) -> bool {
-        (&*self).unblock_from()
-    }
-}
-
-impl<T: Send> Select for Port<T> { }
-
-impl<'a, T: Send> SelectPortInner<T> for &'a Port<T> {
-    fn recv_ready(self) -> Option<T> {
-        let mut b = self.next.borrow_mut();
-        match b.get().take_unwrap().recv_ready() {
-            Some(StreamPayload { val, next }) => {
-                *b.get() = Some(next);
-                Some(val)
-            }
-            None => None
-        }
-    }
-}
-
-impl<'a, T: Send> SelectPort<T> for &'a Port<T> { }
-
-pub struct SharedChan<T> {
-    // Just like Chan, but a shared AtomicOption
-    priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
-}
-
-impl<T: Send> SharedChan<T> {
-    pub fn new(chan: Chan<T>) -> SharedChan<T> {
-        let next = chan.next.unwrap();
-        let next = AtomicOption::new(~next);
-        SharedChan { next: UnsafeArc::new(next) }
-    }
-}
-
-impl<T: Send> SharedChan<T> {
-    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
-        unsafe {
-            let (next_pone, next_cone) = oneshot();
-            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
-            cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
-                                         do_resched)
-        }
-    }
-}
-
-impl<T: Send> GenericChan<T> for SharedChan<T> {
-    fn send(&self, val: T) {
-        self.try_send(val);
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for SharedChan<T> {
-    fn send_deferred(&self, val: T) {
-        self.try_send_deferred(val);
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-}
-
-impl<T: Send> Clone for SharedChan<T> {
-    fn clone(&self) -> SharedChan<T> {
-        SharedChan {
-            next: self.next.clone()
-        }
-    }
-}
-
-pub struct SharedPort<T> {
-    // The next port on which we will receive the next port on which we will receive T
-    priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
-}
-
-impl<T: Send> SharedPort<T> {
-    pub fn new(port: Port<T>) -> SharedPort<T> {
-        // Put the data port into a new link pipe
-        let next_data_port = port.next.unwrap().unwrap();
-        let (next_link_port, next_link_chan) = oneshot();
-        next_link_chan.send(next_data_port);
-        let next_link = AtomicOption::new(~next_link_port);
-        SharedPort { next_link: UnsafeArc::new(next_link) }
-    }
-}
-
-impl<T: Send> GenericPort<T> for SharedPort<T> {
-    fn recv(&self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on a closed channel");
-            }
-        }
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        unsafe {
-            let (next_link_port, next_link_chan) = oneshot();
-            let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
-            let link_port = link_port.unwrap();
-            let data_port = link_port.recv();
-            let (next_data_port, res) = match data_port.try_recv() {
-                Some(StreamPayload { val, next }) => {
-                    (next, Some(val))
-                }
-                None => {
-                    let (next_data_port, _) = oneshot();
-                    (next_data_port, None)
-                }
-            };
-            next_link_chan.send(next_data_port);
-            return res;
-        }
-    }
-}
-
-impl<T: Send> Clone for SharedPort<T> {
-    fn clone(&self) -> SharedPort<T> {
-        SharedPort {
-            next_link: self.next_link.clone()
-        }
-    }
-}
-
-// FIXME #7760: Need better name
-type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
-
-pub fn megapipe<T: Send>() -> MegaPipe<T> {
-    let (port, chan) = stream();
-    (SharedPort::new(port), SharedChan::new(chan))
-}
-
-impl<T: Send> GenericChan<T> for MegaPipe<T> {
-    fn send(&self, val: T) {
-        self.second_ref().send(val)
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.second_ref().try_send(val)
-    }
-}
-
-impl<T: Send> GenericPort<T> for MegaPipe<T> {
-    fn recv(&self) -> T {
-        self.first_ref().recv()
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        self.first_ref().try_recv()
-    }
-}
-
-impl<T: Send> SendDeferred<T> for MegaPipe<T> {
-    fn send_deferred(&self, val: T) {
-        self.second_ref().send_deferred(val)
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.second_ref().try_send_deferred(val)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use option::*;
-    use rt::test::*;
-    use num::Times;
-    use rt::util;
-
-    #[test]
-    fn oneshot_single_thread_close_port_first() {
-        // Simple test of closing without sending
-        do run_in_newsched_task {
-            let (port, _chan) = oneshot::<int>();
-            { let _p = port; }
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_close_chan_first() {
-        // Simple test of closing without sending
-        do run_in_newsched_task {
-            let (_port, chan) = oneshot::<int>();
-            { let _c = chan; }
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_send_port_close() {
-        // Testing that the sender cleans up the payload if receiver is closed
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            { let _p = port; }
-            chan.send(~0);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_recv_chan_close() {
-        // Receiving on a closed chan will fail
-        do run_in_newsched_task {
-            let res = do spawntask_try {
-                let (port, chan) = oneshot::<~int>();
-                { let _c = chan; }
-                port.recv();
-            };
-            // What is our res?
-            rtdebug!("res is: {:?}", res.is_err());
-            assert!(res.is_err());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_send_then_recv() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            chan.send(~10);
-            assert!(port.recv() == ~10);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_send_open() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            assert!(chan.try_send(10));
-            assert!(port.recv() == 10);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_send_closed() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _p = port; }
-            assert!(!chan.try_send(10));
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_recv_open() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            chan.send(10);
-            assert!(port.try_recv() == Some(10));
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_recv_closed() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _c = chan; }
-            assert!(port.try_recv() == None);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_data() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            assert!(!port.peek());
-            chan.send(10);
-            assert!(port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_close() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _c = chan; }
-            assert!(!port.peek());
-            assert!(!port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_open() {
-        do run_in_newsched_task {
-            let (port, _) = oneshot::<int>();
-            assert!(!port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_task_recv_then_send() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            do spawntask {
-                assert!(port.recv() == ~10);
-            }
-
-            chan.send(~10);
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_task_recv_then_close() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            do spawntask_later {
-                let _ = chan;
-            }
-            let res = do spawntask_try {
-                assert!(port.recv() == ~10);
-            };
-            assert!(res.is_err());
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_thread_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread = do spawntask_thread {
-                    let _ = port;
-                };
-                let _chan = chan;
-                thread.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_send_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread1 = do spawntask_thread {
-                    let _ = port;
-                };
-                let thread2 = do spawntask_thread {
-                    chan.send(1);
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_recv_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread1 = do spawntask_thread {
-                    let port = port;
-                    let res = do spawntask_try {
-                        port.recv();
-                    };
-                    assert!(res.is_err());
-                };
-                let thread2 = do spawntask_thread {
-                    let chan = chan;
-                    do spawntask {
-                        let _ = chan;
-                    }
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_send_recv_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<~int>();
-                let thread1 = do spawntask_thread {
-                    chan.send(~10);
-                };
-                let thread2 = do spawntask_thread {
-                    assert!(port.recv() == ~10);
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn stream_send_recv_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_mt_newsched_task {
-                let (port, chan) = stream::<~int>();
-
-                send(chan, 0);
-                recv(port, 0);
-
-                fn send(chan: Chan<~int>, i: int) {
-                    if i == 10 { return }
-
-                    do spawntask_random {
-                        chan.send(~i);
-                        send(chan, i + 1);
-                    }
-                }
-
-                fn recv(port: Port<~int>, i: int) {
-                    if i == 10 { return }
-
-                    do spawntask_random {
-                        assert!(port.recv() == ~i);
-                        recv(port, i + 1);
-                    };
-                }
-            }
-        })
-    }
-
-    #[test]
-    fn recv_a_lot() {
-        // Regression test that we don't run out of stack in scheduler context
-        do run_in_newsched_task {
-            let (port, chan) = stream();
-            10000.times(|| { chan.send(()) });
-            10000.times(|| { port.recv() });
-        }
-    }
-
-    #[test]
-    fn shared_chan_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        do run_in_mt_newsched_task {
-            let (port, chan) = stream();
-            let chan = SharedChan::new(chan);
-            let total = stress_factor() + 100;
-            total.times(|| {
-                let chan_clone = chan.clone();
-                do spawntask_random {
-                    chan_clone.send(());
-                }
-            });
-
-            total.times(|| {
-                port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn shared_port_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream();
-            let (port, chan) = stream();
-            let end_chan = SharedChan::new(end_chan);
-            let port = SharedPort::new(port);
-            let total = stress_factor() + 100;
-            total.times(|| {
-                let end_chan_clone = end_chan.clone();
-                let port_clone = port.clone();
-                do spawntask_random {
-                    port_clone.recv();
-                    end_chan_clone.send(());
-                }
-            });
-
-            total.times(|| {
-                chan.send(());
-            });
-
-            total.times(|| {
-                end_port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn shared_port_close_simple() {
-        do run_in_mt_newsched_task {
-            let (port, chan) = stream::<()>();
-            let port = SharedPort::new(port);
-            { let _chan = chan; }
-            assert!(port.try_recv().is_none());
-        }
-    }
-
-    #[test]
-    fn shared_port_close() {
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream::<bool>();
-            let (port, chan) = stream::<()>();
-            let end_chan = SharedChan::new(end_chan);
-            let port = SharedPort::new(port);
-            let chan = SharedChan::new(chan);
-            let send_total = 10;
-            let recv_total = 20;
-            do spawntask_random {
-                send_total.times(|| {
-                    let chan_clone = chan.clone();
-                    do spawntask_random {
-                        chan_clone.send(());
-                    }
-                });
-            }
-            let end_chan_clone = end_chan.clone();
-            do spawntask_random {
-                recv_total.times(|| {
-                    let port_clone = port.clone();
-                    let end_chan_clone = end_chan_clone.clone();
-                    do spawntask_random {
-                        let recvd = port_clone.try_recv().is_some();
-                        end_chan_clone.send(recvd);
-                    }
-                });
-            }
-
-            let mut recvd = 0;
-            recv_total.times(|| {
-                recvd += if end_port.recv() { 1 } else { 0 };
-            });
-
-            assert!(recvd == send_total);
-        }
-    }
-
-    #[test]
-    fn megapipe_stress() {
-        use rand;
-        use rand::Rng;
-
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream::<()>();
-            let end_chan = SharedChan::new(end_chan);
-            let pipe = megapipe();
-            let total = stress_factor() + 10;
-            let mut rng = rand::rng();
-            total.times(|| {
-                let msgs = rng.gen_range(0u, 10);
-                let pipe_clone = pipe.clone();
-                let end_chan_clone = end_chan.clone();
-                do spawntask_random {
-                    msgs.times(|| {
-                        pipe_clone.send(());
-                    });
-                    msgs.times(|| {
-                        pipe_clone.recv();
-                    });
-                }
-
-                end_chan_clone.send(());
-            });
-
-            total.times(|| {
-                end_port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn send_deferred() {
-        use unstable::sync::atomic;
-
-        // Tests no-rescheduling of send_deferred on all types of channels.
-        do run_in_newsched_task {
-            let (pone, cone) = oneshot();
-            let (pstream, cstream) = stream();
-            let (pshared, cshared) = stream();
-            let cshared = SharedChan::new(cshared);
-            let mp = megapipe();
-
-            do spawntask { pone.recv(); }
-            do spawntask { pstream.recv(); }
-            do spawntask { pshared.recv(); }
-            let p_mp = mp.clone();
-            do spawntask { p_mp.recv(); }
-
-            unsafe {
-                let _guard = atomic();
-                cone.send_deferred(());
-                cstream.send_deferred(());
-                cshared.send_deferred(());
-                mp.send_deferred(());
-            }
-        }
-    }
-
-}
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index e3f9cd09632..f4f128cf5aa 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -153,8 +153,9 @@ There are two known issues with the current scheme for exit code propagation.
 use cast;
 use option::{Option, Some, None};
 use prelude::*;
+use iter;
+use task::TaskResult;
 use rt::task::Task;
-use rt::task::UnwindResult;
 use unstable::atomics::{AtomicUint, SeqCst};
 use unstable::sync::UnsafeArc;
 
@@ -169,11 +170,21 @@ pub enum BlockedTask {
 pub struct Death {
     // Action to be done with the exit code. If set, also makes the task wait
     // until all its watched children exit before collecting the status.
-    on_exit:         Option<proc(UnwindResult)>,
+    on_exit:         Option<proc(TaskResult)>,
     // nesting level counter for unstable::atomically calls (0 == can deschedule).
     priv wont_sleep:      int,
 }
 
+pub struct BlockedTaskIterator {
+    priv inner: UnsafeArc<AtomicUint>,
+}
+
+impl Iterator<BlockedTask> for BlockedTaskIterator {
+    fn next(&mut self) -> Option<BlockedTask> {
+        Some(Shared(self.inner.clone()))
+    }
+}
+
 impl BlockedTask {
     /// Returns Some if the task was successfully woken; None if already killed.
     pub fn wake(self) -> Option<~Task> {
@@ -194,19 +205,17 @@ impl BlockedTask {
     }
 
     /// Converts one blocked task handle to a list of many handles to the same.
-    pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
-        let handles = match self {
+    pub fn make_selectable(self, num_handles: uint)
+        -> iter::Take<BlockedTaskIterator>
+    {
+        let arc = match self {
             Owned(task) => {
-                let flag = unsafe {
-                    AtomicUint::new(cast::transmute(task))
-                };
-                UnsafeArc::newN(flag, num_handles)
+                let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
+                UnsafeArc::new(flag)
             }
-            Shared(arc) => arc.cloneN(num_handles),
+            Shared(arc) => arc.clone(),
         };
-        // Even if the task was unkillable before, we use 'Killable' because
-        // multiple pipes will have handles. It does not really mean killable.
-        handles.move_iter().map(|x| Shared(x)).collect()
+        BlockedTaskIterator{ inner: arc }.take(num_handles)
     }
 
     // This assertion has two flavours because the wake involves an atomic op.
@@ -254,10 +263,10 @@ impl Death {
     }
 
     /// Collect failure exit codes from children and propagate them to a parent.
-    pub fn collect_failure(&mut self, result: UnwindResult) {
+    pub fn collect_failure(&mut self, result: TaskResult) {
         match self.on_exit.take() {
+            Some(f) => f(result),
             None => {}
-            Some(on_exit) => on_exit(result),
         }
     }
 
diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs
index 66fe9742121..925aa802ad5 100644
--- a/src/libstd/rt/local_ptr.rs
+++ b/src/libstd/rt/local_ptr.rs
@@ -77,10 +77,9 @@ pub unsafe fn borrow<T>() -> Borrowed<T> {
 /// it wherever possible.
 #[cfg(not(windows), not(target_os = "android"))]
 pub mod compiled {
-    #[cfg(not(test))]
-    use libc::c_void;
     use cast;
     use option::{Option, Some, None};
+    #[cfg(not(test))] use libc::c_void;
 
     #[cfg(test)]
     pub use realstd::rt::shouldnt_be_public::RT_TLS_PTR;
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
deleted file mode 100644
index 10e457368f0..00000000000
--- a/src/libstd/rt/message_queue.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! A concurrent queue that supports multiple producers and a
-//! single consumer.
-
-use kinds::Send;
-use vec::OwnedVector;
-use option::Option;
-use clone::Clone;
-use rt::mpsc_queue::Queue;
-
-pub struct MessageQueue<T> {
-    priv queue: Queue<T>
-}
-
-impl<T: Send> MessageQueue<T> {
-    pub fn new() -> MessageQueue<T> {
-        MessageQueue {
-            queue: Queue::new()
-        }
-    }
-
-    #[inline]
-    pub fn push(&mut self, value: T) {
-        self.queue.push(value)
-    }
-
-    #[inline]
-    pub fn pop(&mut self) -> Option<T> {
-        self.queue.pop()
-    }
-
-    /// A pop that may sometimes miss enqueued elements, but is much faster
-    /// to give up without doing any synchronization
-    #[inline]
-    pub fn casual_pop(&mut self) -> Option<T> {
-        self.queue.pop()
-    }
-}
-
-impl<T: Send> Clone for MessageQueue<T> {
-    fn clone(&self) -> MessageQueue<T> {
-        MessageQueue {
-            queue: self.queue.clone()
-        }
-    }
-}
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index ce8d1ab1983..5d2179e8b96 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -65,7 +65,7 @@ use ptr::RawPtr;
 use rt::local::Local;
 use rt::sched::{Scheduler, Shutdown};
 use rt::sleeper_list::SleeperList;
-use rt::task::UnwindResult;
+use task::TaskResult;
 use rt::task::{Task, SchedTask, GreenTask, Sched};
 use send_str::SendStrStatic;
 use unstable::atomics::{AtomicInt, AtomicBool, SeqCst};
@@ -91,8 +91,6 @@ pub use self::kill::BlockedTask;
 // XXX: these probably shouldn't be public...
 #[doc(hidden)]
 pub mod shouldnt_be_public {
-    pub use super::select::SelectInner;
-    pub use super::select::{SelectInner, SelectPortInner};
     pub use super::local_ptr::native::maybe_tls_key;
     #[cfg(not(windows), not(target_os = "android"))]
     pub use super::local_ptr::compiled::RT_TLS_PTR;
@@ -123,11 +121,11 @@ pub mod rtio;
 /// or task-local storage.
 pub mod local;
 
-/// A parallel queue.
-pub mod message_queue;
-
 /// A mostly lock-free multi-producer, single consumer queue.
-mod mpsc_queue;
+pub mod mpsc_queue;
+
+/// A lock-free single-producer, single consumer queue.
+pub mod spsc_queue;
 
 /// A lock-free multi-producer, multi-consumer bounded queue.
 mod mpmc_bounded_queue;
@@ -169,11 +167,6 @@ pub mod rc;
 /// scheduler and task context
 pub mod tube;
 
-/// Simple reimplementation of std::comm
-pub mod comm;
-
-mod select;
-
 /// The runtime needs to be able to put a pointer into thread-local storage.
 mod local_ptr;
 
@@ -349,7 +342,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
     // When the main task exits, after all the tasks in the main
     // task tree, shut down the schedulers and set the exit code.
     let handles = handles;
-    let on_exit: proc(UnwindResult) = proc(exit_success) {
+    let on_exit: proc(TaskResult) = proc(exit_success) {
         unsafe {
             assert!(!(*exited_already.get()).swap(true, SeqCst),
                     "the runtime already exited");
@@ -361,7 +354,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
         }
 
         unsafe {
-            let exit_code = if exit_success.is_success() {
+            let exit_code = if exit_success.is_ok() {
                 use rt::util;
 
                 // If we're exiting successfully, then return the global
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 557d9c998ca..b54231421e3 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -10,7 +10,7 @@
 
 use c_str::CString;
 use cast;
-use comm::{SharedChan, PortOne, Port};
+use comm::{SharedChan, Port};
 use libc::c_int;
 use libc;
 use ops::Drop;
@@ -222,7 +222,7 @@ pub trait RtioUdpSocket : RtioSocket {
 
 pub trait RtioTimer {
     fn sleep(&mut self, msecs: u64);
-    fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
+    fn oneshot(&mut self, msecs: u64) -> Port<()>;
     fn period(&mut self, msecs: u64) -> Port<()>;
 }
 
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index fa17efc833b..ac3aeb5a4bb 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -17,7 +17,6 @@ use super::stack::{StackPool};
 use super::rtio::EventLoop;
 use super::context::Context;
 use super::task::{Task, AnySched, Sched};
-use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
 use rt::deque;
 use rt::local_ptr;
@@ -29,6 +28,7 @@ use iter::range;
 use unstable::mutex::Mutex;
 use vec::{OwnedVector};
 
+use mpsc = super::mpsc_queue;
 
 /// A scheduler is responsible for coordinating the execution of Tasks
 /// on a single thread. The scheduler runs inside a slightly modified
@@ -47,7 +47,9 @@ pub struct Scheduler {
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
-    message_queue: MessageQueue<SchedMessage>,
+    message_queue: mpsc::Consumer<SchedMessage, ()>,
+    /// Producer used to clone sched handles from
+    message_producer: mpsc::Producer<SchedMessage, ()>,
     /// A shared list of sleeping schedulers. We'll use this to wake
     /// up schedulers when pushing work onto the work queue.
     sleeper_list: SleeperList,
@@ -104,7 +106,7 @@ enum EffortLevel {
     GiveItYourBest
 }
 
-static MAX_YIELD_CHECKS: uint = 200;
+static MAX_YIELD_CHECKS: uint = 20000;
 
 fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
     let r: uint = Rand::rand(rng);
@@ -135,9 +137,11 @@ impl Scheduler {
                        friend: Option<SchedHandle>)
         -> Scheduler {
 
+        let (consumer, producer) = mpsc::queue(());
         let mut sched = Scheduler {
             sleeper_list: sleeper_list,
-            message_queue: MessageQueue::new(),
+            message_queue: consumer,
+            message_producer: producer,
             sleepy: false,
             no_sleep: false,
             event_loop: event_loop,
@@ -218,7 +222,7 @@ impl Scheduler {
 
         // Should not have any messages
         let message = stask.sched.get_mut_ref().message_queue.pop();
-        rtassert!(message.is_none());
+        rtassert!(match message { mpsc::Empty => true, _ => false });
 
         stask.destroyed = true;
     }
@@ -315,10 +319,27 @@ impl Scheduler {
     fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> {
 
         let msg = if effort == DontTryTooHard {
-            // Do a cheap check that may miss messages
             self.message_queue.casual_pop()
         } else {
-            self.message_queue.pop()
+            // When popping our message queue, we could see an "inconsistent"
+            // state which means that we *should* be able to pop data, but we
+            // are unable to at this time. Our options are:
+            //
+            //  1. Spin waiting for data
+            //  2. Ignore this and pretend we didn't find a message
+            //
+            // If we choose route 1, then if the pusher in question is currently
+            // pre-empted, we're going to take up our entire time slice just
+            // spinning on this queue. If we choose route 2, then the pusher in
+            // question is still guaranteed to make a send() on its async
+            // handle, so we will guaranteed wake up and see its message at some
+            // point.
+            //
+            // I have chosen to take route #2.
+            match self.message_queue.pop() {
+                mpsc::Data(t) => Some(t),
+                mpsc::Empty | mpsc::Inconsistent => None
+            }
         };
 
         match msg {
@@ -793,7 +814,7 @@ impl Scheduler {
 
         return SchedHandle {
             remote: remote,
-            queue: self.message_queue.clone(),
+            queue: self.message_producer.clone(),
             sched_id: self.sched_id()
         };
     }
@@ -813,7 +834,7 @@ pub enum SchedMessage {
 
 pub struct SchedHandle {
     priv remote: ~RemoteCallback,
-    priv queue: MessageQueue<SchedMessage>,
+    priv queue: mpsc::Producer<SchedMessage, ()>,
     sched_id: uint
 }
 
@@ -915,17 +936,17 @@ fn new_sched_rng() -> XorShiftRng {
 #[cfg(test)]
 mod test {
     use prelude::*;
-    use rt::test::*;
-    use unstable::run_in_bare_thread;
+
     use borrow::to_uint;
-    use rt::sched::{Scheduler};
     use rt::deque::BufferPool;
-    use rt::thread::Thread;
-    use rt::task::{Task, Sched};
     use rt::basic;
+    use rt::sched::{Scheduler};
+    use rt::task::{Task, Sched};
+    use rt::test::*;
+    use rt::thread::Thread;
     use rt::util;
-    use option::{Some};
-    use rt::task::UnwindResult;
+    use task::TaskResult;
+    use unstable::run_in_bare_thread;
 
     #[test]
     fn trivial_run_in_newsched_task_test() {
@@ -1010,8 +1031,8 @@ mod test {
                 assert!(Task::on_appropriate_sched());
             };
 
-            let on_exit: proc(UnwindResult) = proc(exit_status) {
-                rtassert!(exit_status.is_success())
+            let on_exit: proc(TaskResult) = proc(exit_status) {
+                rtassert!(exit_status.is_ok())
             };
             task.death.on_exit = Some(on_exit);
 
@@ -1027,7 +1048,6 @@ mod test {
         use rt::sleeper_list::SleeperList;
         use rt::sched::Shutdown;
         use borrow;
-        use rt::comm::*;
 
         do run_in_bare_thread {
 
@@ -1089,7 +1109,7 @@ mod test {
             rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
 
             // Signal from the special task that we are done.
-            let (port, chan) = oneshot::<()>();
+            let (port, chan) = Chan::<()>::new();
 
             let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
                 rtdebug!("*about to submit task2*");
@@ -1160,10 +1180,8 @@ mod test {
 
     #[test]
     fn handle() {
-        use rt::comm::*;
-
         do run_in_bare_thread {
-            let (port, chan) = oneshot::<()>();
+            let (port, chan) = Chan::new();
 
             let thread_one = do Thread::start {
                 let chan = chan;
@@ -1230,7 +1248,6 @@ mod test {
 
     #[test]
     fn multithreading() {
-        use rt::comm::*;
         use num::Times;
         use vec::OwnedVector;
         use container::Container;
@@ -1238,7 +1255,7 @@ mod test {
         do run_in_mt_newsched_task {
             let mut ports = ~[];
             10.times(|| {
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
                 do spawntask_later {
                     chan.send(());
                 }
@@ -1253,21 +1270,17 @@ mod test {
 
      #[test]
     fn thread_ring() {
-        use rt::comm::*;
-        use comm::{GenericPort, GenericChan};
-
         do run_in_mt_newsched_task {
-            let (end_port, end_chan) = oneshot();
+            let (end_port, end_chan) = Chan::new();
 
             let n_tasks = 10;
             let token = 2000;
 
-            let (p, ch1) = stream();
-            let mut p = p;
+            let (mut p, ch1) = Chan::new();
             ch1.send((token, end_chan));
             let mut i = 2;
             while i <= n_tasks {
-                let (next_p, ch) = stream();
+                let (next_p, ch) = Chan::new();
                 let imm_i = i;
                 let imm_p = p;
                 do spawntask_random {
@@ -1276,23 +1289,23 @@ mod test {
                 p = next_p;
                 i += 1;
             }
-            let imm_p = p;
-            let imm_ch = ch1;
+            let p = p;
             do spawntask_random {
-                roundtrip(1, n_tasks, &imm_p, &imm_ch);
+                roundtrip(1, n_tasks, &p, &ch1);
             }
 
             end_port.recv();
         }
 
         fn roundtrip(id: int, n_tasks: int,
-                     p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
+                     p: &Port<(int, Chan<()>)>,
+                     ch: &Chan<(int, Chan<()>)>) {
             while (true) {
                 match p.recv() {
                     (1, end_chan) => {
-                                debug!("{}\n", id);
-                                end_chan.send(());
-                                return;
+                        debug!("{}\n", id);
+                        end_chan.send(());
+                        return;
                     }
                     (token, end_chan) => {
                         debug!("thread: {}   got token: {}", id, token);
@@ -1331,16 +1344,14 @@ mod test {
 
     // FIXME: #9407: xfail-test
     fn dont_starve_1() {
-        use rt::comm::oneshot;
-
         stress_factor().times(|| {
             do run_in_mt_newsched_task {
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
 
                 // This task should not be able to starve the sender;
                 // The sender should get stolen to another thread.
                 do spawntask {
-                    while !port.peek() { }
+                    while port.try_recv().is_none() { }
                 }
 
                 chan.send(());
@@ -1350,17 +1361,15 @@ mod test {
 
     #[test]
     fn dont_starve_2() {
-        use rt::comm::oneshot;
-
         stress_factor().times(|| {
             do run_in_newsched_task {
-                let (port, chan) = oneshot();
-                let (_port2, chan2) = stream();
+                let (port, chan) = Chan::new();
+                let (_port2, chan2) = Chan::new();
 
                 // This task should not be able to starve the other task.
                 // The sends should eventually yield.
                 do spawntask {
-                    while !port.peek() {
+                    while port.try_recv().is_none() {
                         chan2.send(());
                     }
                 }
diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs
deleted file mode 100644
index 6cde0a1f216..00000000000
--- a/src/libstd/rt/select.rs
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Module for private, abstraction-leaking select traits. Wrapped in std::select.
-
-use rt::kill::BlockedTask;
-use rt::sched::Scheduler;
-use option::Option;
-
-pub trait SelectInner {
-    // Returns true if data was available.
-    fn optimistic_check(&mut self) -> bool;
-    // Returns true if data was available. If so, shall also wake() the task.
-    fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
-    // Returns true if data was available.
-    fn unblock_from(&mut self) -> bool;
-}
-
-pub trait SelectPortInner<T> {
-    fn recv_ready(self) -> Option<T>;
-}
-
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 2adc32f33fb..62e012f9f41 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -20,20 +20,22 @@ use prelude::*;
 use borrow;
 use cast::transmute;
 use cleanup;
+use io::Writer;
 use libc::{c_void, uintptr_t, c_char, size_t};
 use local_data;
 use option::{Option, Some, None};
 use rt::borrowck::BorrowRecord;
 use rt::borrowck;
+use rt::context;
 use rt::context::Context;
 use rt::env;
-use io::Writer;
 use rt::kill::Death;
 use rt::local::Local;
 use rt::logging::StdErrLogger;
 use rt::sched::{Scheduler, SchedHandle};
 use rt::stack::{StackSegment, StackPool};
 use send_str::SendStr;
+use task::TaskResult;
 use unstable::finally::Finally;
 use unstable::mutex::Mutex;
 
@@ -90,46 +92,17 @@ pub enum SchedHome {
 pub struct GarbageCollector;
 pub struct LocalStorage(Option<local_data::Map>);
 
-/// Represents the reason for the current unwinding process
-pub enum UnwindResult {
-    /// The task is ending successfully
-    Success,
-
-    /// The Task is failing with reason `~Any`
-    Failure(~Any),
-}
-
-impl UnwindResult {
-    /// Returns `true` if this `UnwindResult` is a failure
-    #[inline]
-    pub fn is_failure(&self) -> bool {
-        match *self {
-            Success => false,
-            Failure(_) => true
-        }
-    }
-
-    /// Returns `true` if this `UnwindResult` is a success
-    #[inline]
-    pub fn is_success(&self) -> bool {
-        match *self {
-            Success => true,
-            Failure(_) => false
-        }
-    }
-}
-
 pub struct Unwinder {
     unwinding: bool,
     cause: Option<~Any>
 }
 
 impl Unwinder {
-    fn to_unwind_result(&mut self) -> UnwindResult {
+    fn result(&mut self) -> TaskResult {
         if self.unwinding {
-            Failure(self.cause.take().unwrap())
+            Err(self.cause.take().unwrap())
         } else {
-            Success
+            Ok(())
         }
     }
 }
@@ -326,7 +299,7 @@ impl Task {
         // Cleanup the dynamic borrowck debugging info
         borrowck::clear_task_borrow_list();
 
-        self.death.collect_failure(self.unwinder.to_unwind_result());
+        self.death.collect_failure(self.unwinder.result());
         self.destroyed = true;
     }
 
@@ -691,6 +664,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> !
 mod test {
     use super::*;
     use rt::test::*;
+    use prelude::*;
 
     #[test]
     fn local_heap() {
@@ -744,22 +718,9 @@ mod test {
     }
 
     #[test]
-    fn comm_oneshot() {
-        use comm::*;
-
-        do run_in_newsched_task {
-            let (port, chan) = oneshot();
-            chan.send(10);
-            assert!(port.recv() == 10);
-        }
-    }
-
-    #[test]
     fn comm_stream() {
-        use comm::*;
-
         do run_in_newsched_task() {
-            let (port, chan) = stream();
+            let (port, chan) = Chan::new();
             chan.send(10);
             assert!(port.recv() == 10);
         }
@@ -767,11 +728,8 @@ mod test {
 
     #[test]
     fn comm_shared_chan() {
-        use comm::*;
-
         do run_in_newsched_task() {
-            let (port, chan) = stream();
-            let chan = SharedChan::new(chan);
+            let (port, chan) = SharedChan::new();
             chan.send(10);
             assert!(port.recv() == 10);
         }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index 96b80d11129..2b48b396c99 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -21,14 +21,14 @@ use rand::Rng;
 use rand;
 use result::{Result, Ok, Err};
 use rt::basic;
-use rt::comm::oneshot;
 use rt::deque::BufferPool;
+use comm::Chan;
 use rt::new_event_loop;
 use rt::sched::Scheduler;
 use rt::sleeper_list::SleeperList;
 use rt::task::Task;
-use rt::task::UnwindResult;
 use rt::thread::Thread;
+use task::TaskResult;
 use unstable::{run_in_bare_thread};
 use vec;
 use vec::{OwnedVector, MutableVector, ImmutableVector};
@@ -82,10 +82,10 @@ pub fn run_in_uv_task_core(f: proc()) {
     let mut sched = ~new_test_uv_sched();
     let exit_handle = sched.make_handle();
 
-    let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+    let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
         let mut exit_handle = exit_handle;
         exit_handle.send(Shutdown);
-        rtassert!(exit_status.is_success());
+        rtassert!(exit_status.is_ok());
     };
     let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
     task.death.on_exit = Some(on_exit);
@@ -99,10 +99,10 @@ pub fn run_in_newsched_task_core(f: proc()) {
     let mut sched = ~new_test_sched();
     let exit_handle = sched.make_handle();
 
-    let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+    let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
         let mut exit_handle = exit_handle;
         exit_handle.send(Shutdown);
-        rtassert!(exit_status.is_success());
+        rtassert!(exit_status.is_ok());
     };
     let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
     task.death.on_exit = Some(on_exit);
@@ -240,14 +240,14 @@ pub fn run_in_mt_newsched_task(f: proc()) {
         }
 
         let handles = handles;  // Work around not being able to capture mut
-        let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+        let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
             // Tell schedulers to exit
             let mut handles = handles;
             for handle in handles.mut_iter() {
                 handle.send(Shutdown);
             }
 
-            rtassert!(exit_status.is_success());
+            rtassert!(exit_status.is_ok());
         };
         let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
                                             None,
@@ -311,8 +311,8 @@ pub fn spawntask_random(f: proc()) {
 
 pub fn spawntask_try(f: proc()) -> Result<(),()> {
 
-    let (port, chan) = oneshot();
-    let on_exit: proc(UnwindResult) = proc(exit_status) {
+    let (port, chan) = Chan::new();
+    let on_exit: proc(TaskResult) = proc(exit_status) {
         chan.send(exit_status)
     };
 
@@ -322,7 +322,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> {
     Scheduler::run_task(new_task);
 
     let exit_status = port.recv();
-    if exit_status.is_success() { Ok(()) } else { Err(()) }
+    if exit_status.is_ok() { Ok(()) } else { Err(()) }
 
 }
 
diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs
index 9031147f8b1..da02988c75c 100644
--- a/src/libstd/rt/thread.rs
+++ b/src/libstd/rt/thread.rs
@@ -21,42 +21,32 @@ use kinds::Send;
 use libc;
 use ops::Drop;
 use option::{Option, Some, None};
-use ptr;
 use uint;
 
-#[cfg(windows)]
-use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T,
-                                   LPVOID, DWORD, LPDWORD, HANDLE};
-
-#[cfg(windows)] type rust_thread = HANDLE;
-#[cfg(unix)] type rust_thread = libc::pthread_t;
-#[cfg(windows)] type rust_thread_return = DWORD;
-#[cfg(unix)] type rust_thread_return = *libc::c_void;
-
-type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return;
+type StartFn = extern "C" fn(*libc::c_void) -> imp::rust_thread_return;
 
 /// This struct represents a native thread's state. This is used to join on an
 /// existing thread created in the join-able state.
 pub struct Thread<T> {
-    priv native: rust_thread,
+    priv native: imp::rust_thread,
     priv joined: bool,
     priv packet: ~Option<T>,
 }
 
-static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024;
+static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024;
 
 // This is the starting point of rust os threads. The first thing we do
 // is make sure that we don't trigger __morestack (also why this has a
 // no_split_stack annotation), and then we extract the main function
 // and invoke it.
 #[no_split_stack]
-extern fn thread_start(main: *libc::c_void) -> rust_thread_return {
+extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return {
     use rt::context;
     unsafe {
         context::record_stack_bounds(0, uint::max_value);
         let f: ~proc() = cast::transmute(main);
         (*f)();
-        cast::transmute(0 as rust_thread_return)
+        cast::transmute(0 as imp::rust_thread_return)
     }
 }
 
@@ -88,7 +78,7 @@ impl Thread<()> {
             *cast::transmute::<&~Option<T>, **mut Option<T>>(&packet)
         };
         let main: proc() = proc() unsafe { *packet2 = Some(main()); };
-        let native = unsafe { native_thread_create(~main) };
+        let native = unsafe { imp::create(~main) };
 
         Thread {
             native: native,
@@ -105,10 +95,16 @@ impl Thread<()> {
     /// there are detached thread still running around.
     pub fn spawn(main: proc()) {
         unsafe {
-            let handle = native_thread_create(~main);
-            native_thread_detach(handle);
+            let handle = imp::create(~main);
+            imp::detach(handle);
         }
     }
+
+    /// Relinquishes the CPU slot that this OS-thread is currently using,
+    /// allowing another thread to run for awhile.
+    pub fn yield_now() {
+        unsafe { imp::yield_now(); }
+    }
 }
 
 impl<T: Send> Thread<T> {
@@ -116,7 +112,7 @@ impl<T: Send> Thread<T> {
     /// calculation.
     pub fn join(mut self) -> T {
         assert!(!self.joined);
-        unsafe { native_thread_join(self.native) };
+        unsafe { imp::join(self.native) };
         self.joined = true;
         assert!(self.packet.is_some());
         self.packet.take_unwrap()
@@ -129,80 +125,115 @@ impl<T: Send> Drop for Thread<T> {
         // This is required for correctness. If this is not done then the thread
         // would fill in a return box which no longer exists.
         if !self.joined {
-            unsafe { native_thread_join(self.native) };
+            unsafe { imp::join(self.native) };
         }
     }
 }
 
 #[cfg(windows)]
-unsafe fn native_thread_create(p: ~proc()) -> rust_thread {
-    let arg: *mut libc::c_void = cast::transmute(p);
-    CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start,
-                 arg, 0, ptr::mut_null())
-}
-
-#[cfg(windows)]
-unsafe fn native_thread_join(native: rust_thread) {
-    use libc::consts::os::extra::INFINITE;
-    WaitForSingleObject(native, INFINITE);
-}
+mod imp {
+    use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL,
+                                       LPVOID, DWORD, LPDWORD, HANDLE};
+    use libc;
+    use cast;
+    use super::DEFAULT_STACK_SIZE;
+
+    pub type rust_thread = HANDLE;
+    pub type rust_thread_return = DWORD;
+
+    pub unsafe fn create(p: ~proc()) -> rust_thread {
+        let arg: *mut libc::c_void = cast::transmute(p);
+        CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, super::thread_start,
+                     arg, 0, ptr::mut_null())
+    }
 
-#[cfg(windows)]
-unsafe fn native_thread_detach(native: rust_thread) {
-    assert!(libc::CloseHandle(native) != 0);
-}
+    pub unsafe fn join(native: rust_thread) {
+        use libc::consts::os::extra::INFINITE;
+        WaitForSingleObject(native, INFINITE);
+    }
 
-#[cfg(unix)]
-unsafe fn native_thread_create(p: ~proc()) -> rust_thread {
-    use unstable::intrinsics;
-    use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE;
+    pub unsafe fn detach(native: rust_thread) {
+        assert!(libc::CloseHandle(native) != 0);
+    }
 
-    let mut native: libc::pthread_t = intrinsics::uninit();
-    let mut attr: libc::pthread_attr_t = intrinsics::uninit();
-    assert_eq!(pthread_attr_init(&mut attr), 0);
-    assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0);
-    assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0);
+    pub unsafe fn yield_now() {
+        // This function will return 0 if there are no other threads to execute,
+        // but this also means that the yield was useless so this isn't really a
+        // case that needs to be worried about.
+        SwitchToThread();
+    }
 
-    let arg: *libc::c_void = cast::transmute(p);
-    assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0);
-    native
+    extern "system" {
+        fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
+                        dwStackSize: SIZE_T,
+                        lpStartAddress: super::StartFn,
+                        lpParameter: LPVOID,
+                        dwCreationFlags: DWORD,
+                        lpThreadId: LPDWORD) -> HANDLE;
+        fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
+        fn SwitchToThread() -> BOOL;
+    }
 }
 
 #[cfg(unix)]
-unsafe fn native_thread_join(native: rust_thread) {
-    assert_eq!(pthread_join(native, ptr::null()), 0);
-}
+mod imp {
+    use cast;
+    use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE;
+    use libc;
+    use ptr;
+    use super::DEFAULT_STACK_SIZE;
+    use unstable::intrinsics;
 
-#[cfg(unix)]
-fn native_thread_detach(native: rust_thread) {
-    unsafe { assert_eq!(pthread_detach(native), 0) }
-}
+    pub type rust_thread = libc::pthread_t;
+    pub type rust_thread_return = *libc::c_void;
+
+    pub unsafe fn create(p: ~proc()) -> rust_thread {
+        let mut native: libc::pthread_t = intrinsics::uninit();
+        let mut attr: libc::pthread_attr_t = intrinsics::uninit();
+        assert_eq!(pthread_attr_init(&mut attr), 0);
+        assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0);
+        assert_eq!(pthread_attr_setdetachstate(&mut attr,
+                                               PTHREAD_CREATE_JOINABLE), 0);
+
+        let arg: *libc::c_void = cast::transmute(p);
+        assert_eq!(pthread_create(&mut native, &attr,
+                                  super::thread_start, arg), 0);
+        native
+    }
 
-#[cfg(windows)]
-extern "system" {
-    fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
-                    dwStackSize: SIZE_T,
-                    lpStartAddress: StartFn,
-                    lpParameter: LPVOID,
-                    dwCreationFlags: DWORD,
-                    lpThreadId: LPDWORD) -> HANDLE;
-    fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
-}
+    pub unsafe fn join(native: rust_thread) {
+        assert_eq!(pthread_join(native, ptr::null()), 0);
+    }
 
-#[cfg(unix)]
-extern {
-    fn pthread_create(native: *mut libc::pthread_t,
-                      attr: *libc::pthread_attr_t,
-                      f: StartFn,
-                      value: *libc::c_void) -> libc::c_int;
-    fn pthread_join(native: libc::pthread_t,
-                    value: **libc::c_void) -> libc::c_int;
-    fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int;
-    fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t,
-                                 stack_size: libc::size_t) -> libc::c_int;
-    fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t,
-                                   state: libc::c_int) -> libc::c_int;
-    fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
+    pub unsafe fn detach(native: rust_thread) {
+        assert_eq!(pthread_detach(native), 0);
+    }
+
+    #[cfg(target_os = "macos")]
+    pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); }
+
+    #[cfg(not(target_os = "macos"))]
+    pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); }
+
+    extern {
+        fn pthread_create(native: *mut libc::pthread_t,
+                          attr: *libc::pthread_attr_t,
+                          f: super::StartFn,
+                          value: *libc::c_void) -> libc::c_int;
+        fn pthread_join(native: libc::pthread_t,
+                        value: **libc::c_void) -> libc::c_int;
+        fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int;
+        fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t,
+                                     stack_size: libc::size_t) -> libc::c_int;
+        fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t,
+                                       state: libc::c_int) -> libc::c_int;
+        fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
+
+        #[cfg(target_os = "macos")]
+        fn sched_yield() -> libc::c_int;
+        #[cfg(not(target_os = "macos"))]
+        fn pthread_yield() -> libc::c_int;
+    }
 }
 
 #[cfg(test)]
diff --git a/src/libstd/run.rs b/src/libstd/run.rs
index 14d49df59a4..70ad752ea93 100644
--- a/src/libstd/run.rs
+++ b/src/libstd/run.rs
@@ -12,7 +12,7 @@
 
 #[allow(missing_doc)];
 
-use comm::{stream, SharedChan};
+use comm::SharedChan;
 use io::Reader;
 use io::process::ProcessExit;
 use io::process;
@@ -220,8 +220,7 @@ impl Process {
         // in parallel so we don't deadlock while blocking on one
         // or the other. FIXME (#2625): Surely there's a much more
         // clever way to do this.
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         let ch_clone = ch.clone();
 
         do spawn {
diff --git a/src/libstd/select.rs b/src/libstd/select.rs
deleted file mode 100644
index cca64244db5..00000000000
--- a/src/libstd/select.rs
+++ /dev/null
@@ -1,306 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-#[allow(missing_doc)];
-
-use comm;
-use container::Container;
-use iter::{Iterator, DoubleEndedIterator};
-use kinds::Send;
-use ops::Drop;
-use option::*;
-use rt::local::Local;
-use rt::rtio::EventLoop;
-use rt::sched::Scheduler;
-use rt::shouldnt_be_public::{SelectInner, SelectPortInner};
-use vec::{OwnedVector, MutableVector};
-
-/// Trait for message-passing primitives that can be select()ed on.
-pub trait Select : SelectInner { }
-
-/// Trait for message-passing primitives that can use the select2() convenience wrapper.
-// (This is separate from the above trait to enable heterogeneous lists of ports
-// that implement Select on different types to use select().)
-pub trait SelectPort<T> : SelectPortInner<T> { }
-
-/// A helper type that throws away a value on a port.
-struct PortGuard<T> {
-    port: Option<comm::PortOne<T>>,
-}
-
-#[unsafe_destructor]
-impl<T:Send> Drop for PortGuard<T> {
-    fn drop(&mut self) {
-        let _ = self.port.take_unwrap().recv();
-    }
-}
-
-/// Receive a message from any one of many ports at once. Returns the index of the
-/// port whose data is ready. (If multiple are ready, returns the lowest index.)
-pub fn select<A: Select>(ports: &mut [A]) -> uint {
-    if ports.is_empty() {
-        fail!("can't select on an empty list");
-    }
-
-    for (index, port) in ports.mut_iter().enumerate() {
-        if port.optimistic_check() {
-            return index;
-        }
-    }
-
-    // If one of the ports already contains data when we go to block on it, we
-    // don't bother enqueueing on the rest of them, so we shouldn't bother
-    // unblocking from it either. This is just for efficiency, not correctness.
-    // (If not, we need to unblock from all of them. Length is a placeholder.)
-    let mut ready_index = ports.len();
-
-    // XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
-    // in that we need to continue mutating the ready_index in the environment
-    // after letting the task get woken up. The and_then closure needs to delay
-    // the task from resuming until all ports have become blocked_on.
-    let (p,c) = comm::oneshot();
-
-    {
-        let _guard = PortGuard {
-            port: Some(p),
-        };
-
-        let mut c = Some(c);
-        let sched: ~Scheduler = Local::take();
-        sched.deschedule_running_task_and_then(|sched, task| {
-            let task_handles = task.make_selectable(ports.len());
-
-            for (index, (port, task_handle)) in
-                    ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
-                // If one of the ports has data by now, it will wake the handle.
-                if port.block_on(sched, task_handle) {
-                    ready_index = index;
-                    break;
-                }
-            }
-
-            let c = c.take_unwrap();
-            do sched.event_loop.callback {
-                c.send_deferred(())
-            }
-        })
-    }
-
-    // Task resumes. Now unblock ourselves from all the ports we blocked on.
-    // If the success index wasn't reset, 'take' will just take all of them.
-    // Iterate in reverse so the 'earliest' index that's ready gets returned.
-    for (index, port) in ports.mut_slice(0, ready_index).mut_iter().enumerate().invert() {
-        if port.unblock_from() {
-            ready_index = index;
-        }
-    }
-
-    assert!(ready_index < ports.len());
-    return ready_index;
-}
-
-/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
-
-impl <'a> Select for &'a mut Select {
-    fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        self.block_on(sched, task)
-    }
-    fn unblock_from(&mut self) -> bool { self.unblock_from() }
-}
-
-pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
-        -> Either<(Option<TA>, B), (A, Option<TB>)> {
-    let result = {
-        let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
-        select(ports)
-    };
-    match result {
-        0 => Left ((a.recv_ready(), b)),
-        1 => Right((a, b.recv_ready())),
-        x => fail!("impossible case in select2: {:?}", x)
-    }
-}
-
-*/
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use clone::Clone;
-    use num::Times;
-    use option::*;
-    use rt::comm::*;
-    use rt::test::*;
-    use vec::*;
-    use comm::GenericChan;
-    use task;
-    use iter::{Iterator, range};
-
-    #[test] #[should_fail]
-    fn select_doesnt_get_trolled() {
-        select::<PortOne<()>>([]);
-    }
-
-    /* non-blocking select tests */
-
-    #[cfg(test)]
-    fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
-        // Unfortunately this does not actually test the block_on early-break
-        // codepath in select -- racing between the sender and the receiver in
-        // separate tasks is necessary to get around the optimistic check.
-        let (ports, chans) = unzip(range(0, num_ports).map(|_| oneshot::<()>()));
-        let mut dead_chans = ~[];
-        let mut ports = ports;
-        for (i, chan) in chans.move_iter().enumerate() {
-            if send_on_chans.contains(&i) {
-                chan.send(());
-            } else {
-                dead_chans.push(chan);
-            }
-        }
-        let ready_index = select(ports);
-        assert!(send_on_chans.contains(&ready_index));
-        assert!(ports.swap_remove(ready_index).recv_ready().is_some());
-        let _ = dead_chans;
-
-        // Same thing with streams instead.
-        // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
-        let (ports, chans) = unzip(range(0, num_ports).map(|_| stream::<()>()));
-        let mut dead_chans = ~[];
-        let mut ports = ports;
-        for (i, chan) in chans.move_iter().enumerate() {
-            if send_on_chans.contains(&i) {
-                chan.send(());
-            } else {
-                dead_chans.push(chan);
-            }
-        }
-        let ready_index = select(ports);
-        assert!(send_on_chans.contains(&ready_index));
-        assert!(ports.swap_remove(ready_index).recv_ready().is_some());
-        let _ = dead_chans;
-    }
-
-    #[test]
-    fn select_one() {
-        do run_in_uv_task { select_helper(1, [0]) }
-    }
-
-    #[test]
-    fn select_two() {
-        // NB. I would like to have a test that tests the first one that is
-        // ready is the one that's returned, but that can't be reliably tested
-        // with the randomized behaviour of optimistic_check.
-        do run_in_uv_task { select_helper(2, [1]) }
-        do run_in_uv_task { select_helper(2, [0]) }
-        do run_in_uv_task { select_helper(2, [1,0]) }
-    }
-
-    #[test]
-    fn select_a_lot() {
-        do run_in_uv_task { select_helper(12, [7,8,9]) }
-    }
-
-    #[test]
-    fn select_stream() {
-        use util;
-        use comm::GenericChan;
-
-        // Sends 10 buffered packets, and uses select to retrieve them all.
-        // Puts the port in a different spot in the vector each time.
-        do run_in_uv_task {
-            let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
-            let (port, chan) = stream();
-            10.times(|| { chan.send(31337); });
-            let mut ports = ports;
-            let mut port = Some(port);
-            let order = [5u,0,4,3,2,6,9,8,7,1];
-            for &index in order.iter() {
-                // put the port in the vector at any index
-                util::swap(port.get_mut_ref(), &mut ports[index]);
-                assert!(select(ports) == index);
-                // get it back out
-                util::swap(port.get_mut_ref(), &mut ports[index]);
-                // NB. Not recv(), because optimistic_check randomly fails.
-                assert!(port.get_ref().recv_ready().unwrap() == 31337);
-            }
-        }
-    }
-
-    #[test]
-    fn select_simple() {
-        do run_in_uv_task {
-            select_helper(2, [1])
-        }
-    }
-
-    /* blocking select tests */
-
-    #[test]
-    fn select_blocking() {
-        do run_in_uv_task {
-            let (p1,_c) = oneshot();
-            let (p2,c2) = oneshot();
-            let mut ports = [p1,p2];
-
-            let (p3,c3) = oneshot();
-            let (p4,c4) = oneshot();
-
-            do task::spawn {
-                p3.recv();   // handshake parent
-                c4.send(()); // normal receive
-                task::deschedule();
-                c2.send(()); // select receive
-            }
-
-            // Try to block before child sends on c2.
-            c3.send(());
-            p4.recv();
-            assert!(select(ports) == 1);
-        }
-    }
-
-    #[test]
-    fn select_racing_senders() {
-        static NUM_CHANS: uint = 10;
-
-        select_racing_senders_helper(~[0,1,2,3,4,5,6,7,8,9]);
-        select_racing_senders_helper(~[0,1,2]);
-        select_racing_senders_helper(~[3,4,5,6]);
-        select_racing_senders_helper(~[7,8,9]);
-
-        fn select_racing_senders_helper(send_on_chans: ~[uint]) {
-            use rt::test::spawntask_random;
-
-            do run_in_uv_task {
-                // A bit of stress, since ordinarily this is just smoke and mirrors.
-                4.times(|| {
-                    let send_on_chans = send_on_chans.clone();
-                    do task::spawn {
-                        let mut ports = ~[];
-                        for i in range(0u, NUM_CHANS) {
-                            let (p,c) = oneshot();
-                            ports.push(p);
-                            if send_on_chans.contains(&i) {
-                                do spawntask_random {
-                                    task::deschedule();
-                                    c.send(());
-                                }
-                            }
-                        }
-                        // nondeterministic result, but should succeed
-                        select(ports);
-                    }
-                })
-            }
-        }
-    }
-}
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index 24a24f24818..0e56f42f5b9 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -55,11 +55,10 @@
 
 use prelude::*;
 
-use comm::{stream, Chan, GenericChan, GenericPort, Port, Peekable};
+use comm::{Chan, Port};
 use result::{Result, Ok, Err};
 use rt::in_green_task_context;
 use rt::local::Local;
-use rt::task::{UnwindResult, Success, Failure};
 use send_str::{SendStr, IntoSendStr};
 use util;
 
@@ -81,33 +80,6 @@ pub mod spawn;
 /// children tasks complete, recommend using a result future.
 pub type TaskResult = Result<(), ~Any>;
 
-pub struct TaskResultPort {
-    priv port: Port<UnwindResult>
-}
-
-fn to_task_result(res: UnwindResult) -> TaskResult {
-    match res {
-        Success => Ok(()), Failure(a) => Err(a),
-    }
-}
-
-impl GenericPort<TaskResult> for TaskResultPort {
-    #[inline]
-    fn recv(&self) -> TaskResult {
-        to_task_result(self.port.recv())
-    }
-
-    #[inline]
-    fn try_recv(&self) -> Option<TaskResult> {
-        self.port.try_recv().map(to_task_result)
-    }
-}
-
-impl Peekable<TaskResult> for TaskResultPort {
-    #[inline]
-    fn peek(&self) -> bool { self.port.peek() }
-}
-
 /// Scheduler modes
 #[deriving(Eq)]
 pub enum SchedMode {
@@ -150,7 +122,7 @@ pub struct SchedOpts {
  */
 pub struct TaskOpts {
     priv watched: bool,
-    priv notify_chan: Option<Chan<UnwindResult>>,
+    priv notify_chan: Option<Chan<TaskResult>>,
     name: Option<SendStr>,
     sched: SchedOpts,
     stack_size: Option<uint>
@@ -232,7 +204,7 @@ impl TaskBuilder {
     ///
     /// # Failure
     /// Fails if a future_result was already set for this task.
-    pub fn future_result(&mut self) -> TaskResultPort {
+    pub fn future_result(&mut self) -> Port<TaskResult> {
         // FIXME (#3725): Once linked failure and notification are
         // handled in the library, I can imagine implementing this by just
         // registering an arbitrary number of task::on_exit handlers and
@@ -243,12 +215,12 @@ impl TaskBuilder {
         }
 
         // Construct the future and give it to the caller.
-        let (notify_pipe_po, notify_pipe_ch) = stream::<UnwindResult>();
+        let (notify_pipe_po, notify_pipe_ch) = Chan::new();
 
         // Reconfigure self to use a notify channel.
         self.opts.notify_chan = Some(notify_pipe_ch);
 
-        TaskResultPort { port: notify_pipe_po }
+        notify_pipe_po
     }
 
     /// Name the task-to-be. Currently the name is used for identification
@@ -341,7 +313,7 @@ impl TaskBuilder {
      * Fails if a future_result was already set for this task.
      */
     pub fn try<T:Send>(mut self, f: proc() -> T) -> Result<T, ~Any> {
-        let (po, ch) = stream::<T>();
+        let (po, ch) = Chan::new();
 
         let result = self.future_result();
 
@@ -466,7 +438,7 @@ pub fn failing() -> bool {
 // !!! instead of exiting cleanly. This might wedge the buildbots.       !!!
 
 #[cfg(test)]
-fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); }
+fn block_forever() { let (po, _ch) = Chan::<()>::new(); po.recv(); }
 
 #[test]
 fn test_unnamed_task() {
@@ -528,9 +500,8 @@ fn test_send_named_task() {
 
 #[test]
 fn test_run_basic() {
-    let (po, ch) = stream::<()>();
-    let builder = task();
-    do builder.spawn {
+    let (po, ch) = Chan::new();
+    do task().spawn {
         ch.send(());
     }
     po.recv();
@@ -543,7 +514,7 @@ struct Wrapper {
 
 #[test]
 fn test_add_wrapper() {
-    let (po, ch) = stream::<()>();
+    let (po, ch) = Chan::new();
     let mut b0 = task();
     do b0.add_wrapper |body| {
         let ch = ch;
@@ -608,8 +579,7 @@ fn get_sched_id() -> int {
 
 #[test]
 fn test_spawn_sched() {
-    let (po, ch) = stream::<()>();
-    let ch = SharedChan::new(ch);
+    let (po, ch) = SharedChan::new();
 
     fn f(i: int, ch: SharedChan<()>) {
         let parent_sched_id = get_sched_id();
@@ -632,14 +602,14 @@ fn test_spawn_sched() {
 
 #[test]
 fn test_spawn_sched_childs_on_default_sched() {
-    let (po, ch) = stream();
+    let (po, ch) = Chan::new();
 
     // Assuming tests run on the default scheduler
     let default_id = get_sched_id();
 
     do spawn_sched(SingleThreaded) {
-        let parent_sched_id = get_sched_id();
         let ch = ch;
+        let parent_sched_id = get_sched_id();
         do spawn {
             let child_sched_id = get_sched_id();
             assert!(parent_sched_id != child_sched_id);
@@ -660,8 +630,8 @@ fn test_spawn_sched_blocking() {
         // Testing that a task in one scheduler can block in foreign code
         // without affecting other schedulers
         20u.times(|| {
-            let (start_po, start_ch) = stream();
-            let (fin_po, fin_ch) = stream();
+            let (start_po, start_ch) = Chan::new();
+            let (fin_po, fin_ch) = Chan::new();
 
             let mut lock = Mutex::new();
             let lock2 = lock.clone();
@@ -686,14 +656,14 @@ fn test_spawn_sched_blocking() {
                 let mut val = 20;
                 while val > 0 {
                     val = po.recv();
-                    ch.send(val - 1);
+                    ch.try_send(val - 1);
                 }
             }
 
-            let (setup_po, setup_ch) = stream();
-            let (parent_po, parent_ch) = stream();
+            let (setup_po, setup_ch) = Chan::new();
+            let (parent_po, parent_ch) = Chan::new();
             do spawn {
-                let (child_po, child_ch) = stream();
+                let (child_po, child_ch) = Chan::new();
                 setup_ch.send(child_ch);
                 pingpong(&child_po, &parent_ch);
             };
@@ -712,12 +682,12 @@ fn test_spawn_sched_blocking() {
 
 #[cfg(test)]
 fn avoid_copying_the_body(spawnfn: |v: proc()|) {
-    let (p, ch) = stream::<uint>();
+    let (p, ch) = Chan::<uint>::new();
 
     let x = ~1;
     let x_in_parent = ptr::to_unsafe_ptr(&*x) as uint;
 
-    do spawnfn || {
+    do spawnfn {
         let x_in_child = ptr::to_unsafe_ptr(&*x) as uint;
         ch.send(x_in_child);
     }
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs
index 4ab7b74d300..eb3e19f4a5a 100644
--- a/src/libstd/task/spawn.rs
+++ b/src/libstd/task/spawn.rs
@@ -77,18 +77,15 @@
 
 use prelude::*;
 
-use comm::{GenericChan, oneshot};
+use comm::Chan;
 use rt::local::Local;
 use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
 use rt::task::{Task, Sched};
-use rt::task::UnwindResult;
 use rt::thread::Thread;
 use rt::{in_green_task_context, new_event_loop};
-use task::SingleThreaded;
-use task::TaskOpts;
+use task::{SingleThreaded, TaskOpts, TaskResult};
 
 #[cfg(test)] use task::default_task_opts;
-#[cfg(test)] use comm;
 #[cfg(test)] use task;
 
 pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
@@ -132,7 +129,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
 
             // Create a task that will later be used to join with the new scheduler
             // thread when it is ready to terminate
-            let (thread_port, thread_chan) = oneshot();
+            let (thread_port, thread_chan) = Chan::new();
             let join_task = do Task::build_child(None) {
                 debug!("running join task");
                 let thread: Thread<()> = thread_port.recv();
@@ -173,7 +170,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
 
     if opts.notify_chan.is_some() {
         let notify_chan = opts.notify_chan.take_unwrap();
-        let on_exit: proc(UnwindResult) = proc(task_result) {
+        let on_exit: proc(TaskResult) = proc(task_result) {
             notify_chan.send(task_result)
         };
         task.death.on_exit = Some(on_exit);
@@ -187,7 +184,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
 
 #[test]
 fn test_spawn_raw_simple() {
-    let (po, ch) = stream();
+    let (po, ch) = Chan::new();
     do spawn_raw(default_task_opts()) {
         ch.send(());
     }
@@ -208,7 +205,7 @@ fn test_spawn_raw_unsupervise() {
 
 #[test]
 fn test_spawn_raw_notify_success() {
-    let (notify_po, notify_ch) = comm::stream();
+    let (notify_po, notify_ch) = Chan::new();
 
     let opts = task::TaskOpts {
         notify_chan: Some(notify_ch),
@@ -216,13 +213,13 @@ fn test_spawn_raw_notify_success() {
     };
     do spawn_raw(opts) {
     }
-    assert!(notify_po.recv().is_success());
+    assert!(notify_po.recv().is_ok());
 }
 
 #[test]
 fn test_spawn_raw_notify_failure() {
     // New bindings for these
-    let (notify_po, notify_ch) = comm::stream();
+    let (notify_po, notify_ch) = Chan::new();
 
     let opts = task::TaskOpts {
         watched: false,
@@ -232,5 +229,5 @@ fn test_spawn_raw_notify_failure() {
     do spawn_raw(opts) {
         fail!();
     }
-    assert!(notify_po.recv().is_failure());
+    assert!(notify_po.recv().is_err());
 }
diff --git a/src/libstd/unstable/mod.rs b/src/libstd/unstable/mod.rs
index f8e2ea54f44..043d99eb1b8 100644
--- a/src/libstd/unstable/mod.rs
+++ b/src/libstd/unstable/mod.rs
@@ -10,10 +10,7 @@
 
 #[doc(hidden)];
 
-use comm::{GenericChan, GenericPort};
-use comm;
 use prelude::*;
-use task;
 use libc::uintptr_t;
 
 pub mod dynamic_lib;
@@ -38,15 +35,7 @@ a normal large stack.
 */
 pub fn run_in_bare_thread(f: proc()) {
     use rt::thread::Thread;
-
-    let (port, chan) = comm::stream();
-    // FIXME #4525: Unfortunate that this creates an extra scheduler but it's
-    // necessary since rust_raw_thread_join is blocking
-    do task::spawn_sched(task::SingleThreaded) {
-        Thread::start(f).join();
-        chan.send(());
-    }
-    port.recv();
+    Thread::start(f).join()
 }
 
 #[test]
diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs
index 2dd5515bdbc..50fae1e0239 100644
--- a/src/libstd/unstable/sync.rs
+++ b/src/libstd/unstable/sync.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use cast;
-use comm;
+use comm::{Chan, Port};
 use ptr;
 use option::{Option,Some,None};
 use task;
@@ -56,7 +56,7 @@ struct ArcData<T> {
     // drops the last refcount on an arc. Unfortunately this can't be a proper
     // pipe protocol because the unwrapper has to access both stages at once.
     // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)?
-    unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>,
+    unwrapper: AtomicOption<(Chan<()>, Port<bool>)>,
     // FIXME(#3224) should be able to make this non-option to save memory
     data: Option<T>,
 }
@@ -70,7 +70,7 @@ unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
 
 /// A helper object used by `UnsafeArc::unwrap`.
 struct ChannelAndDataGuard<T> {
-    channel: Option<comm::ChanOne<bool>>,
+    channel: Option<Chan<bool>>,
     data: Option<~ArcData<T>>,
 }
 
@@ -92,7 +92,7 @@ impl<T> Drop for ChannelAndDataGuard<T> {
 }
 
 impl<T> ChannelAndDataGuard<T> {
-    fn unwrap(mut self) -> (comm::ChanOne<bool>, ~ArcData<T>) {
+    fn unwrap(mut self) -> (Chan<bool>, ~ArcData<T>) {
         (self.channel.take_unwrap(), self.data.take_unwrap())
     }
 }
@@ -167,8 +167,8 @@ impl<T: Send> UnsafeArc<T> {
             // The ~ dtor needs to run if this code succeeds.
             let mut data: ~ArcData<T> = cast::transmute(this.data);
             // Set up the unwrap protocol.
-            let (p1,c1) = comm::oneshot(); // ()
-            let (p2,c2) = comm::oneshot(); // bool
+            let (p1,c1) = Chan::new(); // ()
+            let (p2,c2) = Chan::new(); // bool
             // Try to put our server end in the unwrapper slot.
             // This needs no barrier -- it's protected by the release barrier on
             // the xadd, and the acquire+release barrier in the destructor's xadd.
@@ -269,7 +269,7 @@ impl<T> Drop for UnsafeArc<T>{
                 // reference. In effect, being here means we're the only
                 // *awake* task with the data.
                 match data.unwrapper.take(Acquire) {
-                    Some(~(message,response)) => {
+                    Some(~(message, response)) => {
                         // Send 'ready' and wait for a response.
                         message.send(());
                         // Unkillable wait. Message guaranteed to come.
@@ -508,7 +508,6 @@ impl<T:Send> Exclusive<T> {
 
 #[cfg(test)]
 mod tests {
-    use comm;
     use option::*;
     use prelude::*;
     use super::{Exclusive, UnsafeArc, atomic};
@@ -541,10 +540,10 @@ mod tests {
 
             for _ in range(0u, num_tasks) {
                 let total = total.clone();
-                let (port, chan) = comm::stream();
+                let (port, chan) = Chan::new();
                 futures.push(port);
 
-                do task::spawn || {
+                do task::spawn {
                     for _ in range(0u, count) {
                         total.with(|count| **count += 1);
                     }
@@ -552,7 +551,7 @@ mod tests {
                 }
             };
 
-            for f in futures.iter() { f.recv() }
+            for f in futures.mut_iter() { f.recv() }
 
             total.with(|total| assert!(**total == num_tasks * count));
         }
@@ -625,7 +624,7 @@ mod tests {
         // When an unwrap and a try_unwrap race, the unwrapper should always win.
         let x = UnsafeArc::new(~~"hello");
         let x2 = x.clone();
-        let (p,c) = comm::stream();
+        let (p,c) = Chan::new();
         do task::spawn {
             c.send(());
             assert!(x2.unwrap() == ~~"hello");
diff --git a/src/test/auxiliary/cci_capture_clause.rs b/src/test/auxiliary/cci_capture_clause.rs
index a9c3e1d2b0f..ed896af69b4 100644
--- a/src/test/auxiliary/cci_capture_clause.rs
+++ b/src/test/auxiliary/cci_capture_clause.rs
@@ -11,7 +11,7 @@
 use std::task;
 
 pub fn foo<T:Send + Clone>(x: T) -> Port<T> {
-    let (p, c) = stream();
+    let (p, c) = Chan::new();
     do task::spawn() {
         c.send(x.clone());
     }
diff --git a/src/test/compile-fail/bind-by-move-no-guards.rs b/src/test/compile-fail/bind-by-move-no-guards.rs
index 348781d7497..015d31bf42c 100644
--- a/src/test/compile-fail/bind-by-move-no-guards.rs
+++ b/src/test/compile-fail/bind-by-move-no-guards.rs
@@ -8,10 +8,8 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::comm;
-
 fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     let x = Some(p);
     c.send(false);
     match x {
diff --git a/src/test/compile-fail/builtin-superkinds-self-type.rs b/src/test/compile-fail/builtin-superkinds-self-type.rs
index d7ee3aae4d5..c82f752a454 100644
--- a/src/test/compile-fail/builtin-superkinds-self-type.rs
+++ b/src/test/compile-fail/builtin-superkinds-self-type.rs
@@ -11,10 +11,8 @@
 // Tests (negatively) the ability for the Self type in default methods
 // to use capabilities granted by builtin kinds as supertraits.
 
-use std::comm;
-
 trait Foo : Freeze {
-    fn foo(self, chan: comm::Chan<Self>) {
+    fn foo(self, mut chan: Chan<Self>) {
         chan.send(self); //~ ERROR does not fulfill `Send`
     }
 }
@@ -22,7 +20,7 @@ trait Foo : Freeze {
 impl <T: Freeze> Foo for T { }
 
 fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     1193182.foo(c);
     assert!(p.recv() == 1193182);
 }
diff --git a/src/test/compile-fail/unsendable-class.rs b/src/test/compile-fail/unsendable-class.rs
index 08dbaaac295..1eff31b2aa4 100644
--- a/src/test/compile-fail/unsendable-class.rs
+++ b/src/test/compile-fail/unsendable-class.rs
@@ -13,8 +13,6 @@
 // Test that a class with an unsendable field can't be
 // sent
 
-use std::comm;
-
 struct foo {
   i: int,
   j: @~str,
@@ -29,6 +27,6 @@ fn foo(i:int, j: @~str) -> foo {
 
 fn main() {
   let cat = ~"kitty";
-    let (_, ch) = comm::stream(); //~ ERROR does not fulfill `Send`
+  let (_, ch) = Chan::new(); //~ ERROR does not fulfill `Send`
   ch.send(foo(42, @(cat))); //~ ERROR does not fulfill `Send`
 }
diff --git a/src/test/run-pass/builtin-superkinds-capabilities-transitive.rs b/src/test/run-pass/builtin-superkinds-capabilities-transitive.rs
index ec5af692976..7dc12c70b9b 100644
--- a/src/test/run-pass/builtin-superkinds-capabilities-transitive.rs
+++ b/src/test/run-pass/builtin-superkinds-capabilities-transitive.rs
@@ -14,20 +14,18 @@
 // a Send. Basically this just makes sure rustc is using
 // each_bound_trait_and_supertraits in type_contents correctly.
 
-use std::comm;
-
 trait Bar : Send { }
 trait Foo : Bar { }
 
 impl <T: Send> Foo for T { }
 impl <T: Send> Bar for T { }
 
-fn foo<T: Foo>(val: T, chan: comm::Chan<T>) {
+fn foo<T: Foo>(val: T, chan: Chan<T>) {
     chan.send(val);
 }
 
 pub fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     foo(31337, c);
     assert!(p.recv() == 31337);
 }
diff --git a/src/test/run-pass/builtin-superkinds-capabilities-xc.rs b/src/test/run-pass/builtin-superkinds-capabilities-xc.rs
index ea61b91e3b9..f94d1af84f6 100644
--- a/src/test/run-pass/builtin-superkinds-capabilities-xc.rs
+++ b/src/test/run-pass/builtin-superkinds-capabilities-xc.rs
@@ -17,7 +17,6 @@
 
 extern mod trait_superkinds_in_metadata;
 use trait_superkinds_in_metadata::{RequiresRequiresFreezeAndSend, RequiresFreeze};
-use std::comm;
 
 #[deriving(Eq)]
 struct X<T>(T);
@@ -25,12 +24,12 @@ struct X<T>(T);
 impl <T: Freeze> RequiresFreeze for X<T> { }
 impl <T: Freeze+Send> RequiresRequiresFreezeAndSend for X<T> { }
 
-fn foo<T: RequiresRequiresFreezeAndSend>(val: T, chan: comm::Chan<T>) {
+fn foo<T: RequiresRequiresFreezeAndSend>(val: T, chan: Chan<T>) {
     chan.send(val);
 }
 
 fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     foo(X(31337), c);
     assert!(p.recv() == X(31337));
 }
diff --git a/src/test/run-pass/builtin-superkinds-capabilities.rs b/src/test/run-pass/builtin-superkinds-capabilities.rs
index 148fb5a340f..fa3903b41d1 100644
--- a/src/test/run-pass/builtin-superkinds-capabilities.rs
+++ b/src/test/run-pass/builtin-superkinds-capabilities.rs
@@ -12,18 +12,16 @@
 // builtin-kinds, e.g., if a trait requires Send to implement, then
 // at usage site of that trait, we know we have the Send capability.
 
-use std::comm;
-
 trait Foo : Send { }
 
 impl <T: Send> Foo for T { }
 
-fn foo<T: Foo>(val: T, chan: comm::Chan<T>) {
+fn foo<T: Foo>(val: T, chan: Chan<T>) {
     chan.send(val);
 }
 
 pub fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     foo(31337, c);
     assert!(p.recv() == 31337);
 }
diff --git a/src/test/run-pass/builtin-superkinds-self-type.rs b/src/test/run-pass/builtin-superkinds-self-type.rs
index a8f5f27b4d9..a71bedfefe0 100644
--- a/src/test/run-pass/builtin-superkinds-self-type.rs
+++ b/src/test/run-pass/builtin-superkinds-self-type.rs
@@ -11,10 +11,8 @@
 // Tests the ability for the Self type in default methods to use
 // capabilities granted by builtin kinds as supertraits.
 
-use std::comm;
-
 trait Foo : Send {
-    fn foo(self, chan: comm::Chan<Self>) {
+    fn foo(self, chan: Chan<Self>) {
         chan.send(self);
     }
 }
@@ -22,7 +20,7 @@ trait Foo : Send {
 impl <T: Send> Foo for T { }
 
 pub fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     1193182.foo(c);
     assert!(p.recv() == 1193182);
 }
diff --git a/src/test/run-pass/capture_nil.rs b/src/test/run-pass/capture_nil.rs
index edd1a9cce65..0d9fdea4a9d 100644
--- a/src/test/run-pass/capture_nil.rs
+++ b/src/test/run-pass/capture_nil.rs
@@ -27,7 +27,7 @@
 use std::task;
 
 fn foo(x: ()) -> Port<()> {
-    let (p, c) = stream::<()>();
+    let (p, c) = Chan::<()>::new();
     do task::spawn() {
         c.send(x);
     }
diff --git a/src/test/run-pass/closure-bounds-can-capture-chan.rs b/src/test/run-pass/closure-bounds-can-capture-chan.rs
index 16c7eaf1037..3a92f4ba3f4 100644
--- a/src/test/run-pass/closure-bounds-can-capture-chan.rs
+++ b/src/test/run-pass/closure-bounds-can-capture-chan.rs
@@ -15,7 +15,7 @@ fn foo(blk: proc()) {
 }
 
 pub fn main() {
-    let (p,c) = comm::stream();
+    let (p,c) = Chan::new();
     do foo {
         c.send(());
     }
diff --git a/src/test/run-pass/comm.rs b/src/test/run-pass/comm.rs
index 5eb3e247d67..25e31f0b548 100644
--- a/src/test/run-pass/comm.rs
+++ b/src/test/run-pass/comm.rs
@@ -11,8 +11,8 @@
 use std::task;
 
 pub fn main() {
-    let (p, ch) = stream();
-    let _t = task::spawn(proc() child(&ch));
+    let (p, ch) = Chan::new();
+    let _t = task::spawn(proc() { child(&ch) });
     let y = p.recv();
     error!("received");
     error!("{:?}", y);
diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs
index 163a1253ef0..bacf8353a2e 100644
--- a/src/test/run-pass/hashmap-memory.rs
+++ b/src/test/run-pass/hashmap-memory.rs
@@ -21,7 +21,6 @@
 pub fn map(filename: ~str, emit: map_reduce::putter) { emit(filename, ~"1"); }
 
 mod map_reduce {
-    use std::comm::{stream, SharedChan};
     use std::hashmap::HashMap;
     use std::str;
     use std::task;
@@ -43,12 +42,13 @@ mod map_reduce {
     fn map_task(ctrl: SharedChan<ctrl_proto>, input: ~str) {
         let intermediates = @mut HashMap::new();
 
-        fn emit(im: &mut HashMap<~str, int>, ctrl: SharedChan<ctrl_proto>, key: ~str,
+        fn emit(im: &mut HashMap<~str, int>,
+                ctrl: SharedChan<ctrl_proto>, key: ~str,
                 _val: ~str) {
             if im.contains_key(&key) {
                 return;
             }
-            let (pp, cc) = stream();
+            let (pp, cc) = Chan::new();
             error!("sending find_reducer");
             ctrl.send(find_reducer(key.as_bytes().to_owned(), cc));
             error!("receiving");
@@ -63,8 +63,7 @@ mod map_reduce {
     }
 
     pub fn map_reduce(inputs: ~[~str]) {
-        let (ctrl_port, ctrl_chan) = stream();
-        let ctrl_chan = SharedChan::new(ctrl_chan);
+        let (ctrl_port, ctrl_chan) = SharedChan::new();
 
         // This task becomes the master control task. It spawns others
         // to do the rest.
@@ -81,10 +80,11 @@ mod map_reduce {
             match ctrl_port.recv() {
               mapper_done => { num_mappers -= 1; }
               find_reducer(k, cc) => {
-                let c = match reducers.find(&str::from_utf8_owned(k)) {
-                  Some(&_c) => _c,
-                  None => 0
-                };
+                let mut c;
+                match reducers.find(&str::from_utf8(k)) {
+                  Some(&_c) => { c = _c; }
+                  None => { c = 0; }
+                }
                 cc.send(c);
               }
             }
diff --git a/src/test/run-pass/issue-3609.rs b/src/test/run-pass/issue-3609.rs
index ab641e51960..4922ab18f8d 100644
--- a/src/test/run-pass/issue-3609.rs
+++ b/src/test/run-pass/issue-3609.rs
@@ -14,6 +14,7 @@ enum Msg
 fn foo(name: ~str, samples_chan: Chan<Msg>) {
     do task::spawn
     {
+        let mut samples_chan = samples_chan;
         let callback: SamplesFn = proc(buffer) {
             for i in range(0u, buffer.len()) {
                 error!("{}: {}", i, buffer[i])
diff --git a/src/test/run-pass/issue-4446.rs b/src/test/run-pass/issue-4446.rs
index ddcb544c64f..53b45ba99ca 100644
--- a/src/test/run-pass/issue-4446.rs
+++ b/src/test/run-pass/issue-4446.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 pub fn main() {
-    let (port, chan) = stream();
+    let (port, chan) = Chan::new();
 
     do spawn {
         println(port.recv());
diff --git a/src/test/run-pass/issue-4448.rs b/src/test/run-pass/issue-4448.rs
index 212406f9a20..c3f871c7912 100644
--- a/src/test/run-pass/issue-4448.rs
+++ b/src/test/run-pass/issue-4448.rs
@@ -8,11 +8,10 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::comm;
 use std::task;
 
 pub fn main() {
-    let (port, chan) = comm::stream::<&'static str>();
+    let (port, chan) = Chan::<&'static str>::new();
 
     do task::spawn {
         assert_eq!(port.recv(), "hello, world");
diff --git a/src/test/run-pass/ivec-tag.rs b/src/test/run-pass/ivec-tag.rs
index c7d5a2f77da..aa009a91aec 100644
--- a/src/test/run-pass/ivec-tag.rs
+++ b/src/test/run-pass/ivec-tag.rs
@@ -7,8 +7,10 @@ fn producer(c: &Chan<~[u8]>) {
 }
 
 pub fn main() {
-    let (p, ch) = stream::<~[u8]>();
-    let _prod = task::spawn(proc() producer(&ch) );
+    let (p, ch) = Chan::<~[u8]>::new();
+    let _prod = task::spawn(proc() {
+        producer(&ch)
+    });
 
     let _data: ~[u8] = p.recv();
 }
diff --git a/src/test/run-pass/logging-only-prints-once.rs b/src/test/run-pass/logging-only-prints-once.rs
index 702eb5fffc7..6a88b804c22 100644
--- a/src/test/run-pass/logging-only-prints-once.rs
+++ b/src/test/run-pass/logging-only-prints-once.rs
@@ -25,7 +25,7 @@ impl fmt::Default for Foo {
 }
 
 pub fn main() {
-    let (p,c) = stream();
+    let (p,c) = Chan::new();
     do spawn {
         let f = Foo(@mut 0);
         debug!("{}", f);
diff --git a/src/test/run-pass/send-resource.rs b/src/test/run-pass/send-resource.rs
index 81e2d0a8ac8..7ede574a4d5 100644
--- a/src/test/run-pass/send-resource.rs
+++ b/src/test/run-pass/send-resource.rs
@@ -25,10 +25,10 @@ fn test(f: int) -> test {
 }
 
 pub fn main() {
-    let (p, c) = stream();
+    let (p, c) = Chan::new();
 
     do task::spawn() {
-        let (pp, cc) = stream();
+        let (pp, cc) = Chan::new();
         c.send(cc);
 
         let _r = pp.recv();
diff --git a/src/test/run-pass/send-type-inference.rs b/src/test/run-pass/send-type-inference.rs
index 79b05915b01..a8dc4a68e94 100644
--- a/src/test/run-pass/send-type-inference.rs
+++ b/src/test/run-pass/send-type-inference.rs
@@ -14,8 +14,8 @@ struct Command<K, V> {
     val: V
 }
 
-fn cache_server<K:Send,V:Send>(c: Chan<Chan<Command<K, V>>>) {
-    let (_ctrl_port, ctrl_chan) = stream();
+fn cache_server<K:Send,V:Send>(mut c: Chan<Chan<Command<K, V>>>) {
+    let (_ctrl_port, ctrl_chan) = Chan::new();
     c.send(ctrl_chan);
 }
 pub fn main() { }
diff --git a/src/test/run-pass/sendable-class.rs b/src/test/run-pass/sendable-class.rs
index 6b262966277..0c7091a4454 100644
--- a/src/test/run-pass/sendable-class.rs
+++ b/src/test/run-pass/sendable-class.rs
@@ -10,8 +10,6 @@
 
 // Test that a class with only sendable fields can be sent
 
-use std::comm;
-
 struct foo {
   i: int,
   j: char,
@@ -25,6 +23,6 @@ fn foo(i:int, j: char) -> foo {
 }
 
 pub fn main() {
-    let (_po, ch) = comm::stream();
+    let (_po, ch) = Chan::new();
     ch.send(foo(42, 'c'));
 }
diff --git a/src/test/run-pass/spawn-types.rs b/src/test/run-pass/spawn-types.rs
index 18459b7a3a3..67d1836b545 100644
--- a/src/test/run-pass/spawn-types.rs
+++ b/src/test/run-pass/spawn-types.rs
@@ -23,6 +23,6 @@ fn iotask(_cx: &ctx, ip: ~str) {
 }
 
 pub fn main() {
-    let (_p, ch) = stream::<int>();
+    let (_p, ch) = Chan::<int>::new();
     task::spawn(proc() iotask(&ch, ~"localhost") );
 }
diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs
index bcdb56a45fd..671ef16c5eb 100644
--- a/src/test/run-pass/task-comm-0.rs
+++ b/src/test/run-pass/task-comm-0.rs
@@ -12,8 +12,6 @@
 
 extern mod extra;
 
-use std::comm::Chan;
-use std::comm;
 use std::task;
 
 pub fn main() { test05(); }
@@ -28,8 +26,8 @@ fn test05_start(ch : &Chan<int>) {
 }
 
 fn test05() {
-    let (po, ch) = comm::stream();
-    task::spawn(proc() test05_start(&ch) );
+    let (po, ch) = Chan::new();
+    task::spawn(proc() { test05_start(&ch) });
     let mut value: int = po.recv();
     error!("{}", value);
     value = po.recv();
diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs
index 76948913288..fc8c6069152 100644
--- a/src/test/run-pass/task-comm-10.rs
+++ b/src/test/run-pass/task-comm-10.rs
@@ -12,11 +12,10 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
-fn start(c: &comm::Chan<comm::Chan<~str>>) {
-    let (p, ch) = comm::stream();
+fn start(c: &Chan<Chan<~str>>) {
+    let (p, ch) = Chan::new();
     c.send(ch);
 
     let mut a;
@@ -30,10 +29,10 @@ fn start(c: &comm::Chan<comm::Chan<~str>>) {
 }
 
 pub fn main() {
-    let (p, ch) = comm::stream();
-    let _child = task::spawn(proc() start(&ch) );
+    let (p, ch) = Chan::new();
+    let _child = task::spawn(proc() { start(&ch) });
 
-    let c = p.recv();
+    let mut c = p.recv();
     c.send(~"A");
     c.send(~"B");
     task::deschedule();
diff --git a/src/test/run-pass/task-comm-11.rs b/src/test/run-pass/task-comm-11.rs
index e87809b2e98..6f5990e7d16 100644
--- a/src/test/run-pass/task-comm-11.rs
+++ b/src/test/run-pass/task-comm-11.rs
@@ -12,16 +12,17 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
-fn start(c: &comm::Chan<comm::Chan<int>>) {
-    let (_p, ch) = comm::stream();
+fn start(c: &Chan<Chan<int>>) {
+    let (_p, ch) = Chan::new();
     c.send(ch);
 }
 
 pub fn main() {
-    let (p, ch) = comm::stream();
-    let _child = task::spawn(proc() start(&ch) );
+    let (mut p, ch) = Chan::new();
+    let _child = task::spawn(proc() {
+        start(&ch)
+    });
     let _c = p.recv();
 }
diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs
index 0d221a6cb1d..ce30071aaf4 100644
--- a/src/test/run-pass/task-comm-12.rs
+++ b/src/test/run-pass/task-comm-12.rs
@@ -19,7 +19,7 @@ fn start(_task_number: int) { info!("Started / Finished task."); }
 fn test00() {
     let i: int = 0;
     let mut builder = task::task();
-    let result = builder.future_result();
+    let mut result = builder.future_result();
     do builder.spawn {
         start(i)
     }
diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs
index c8234aefbe5..ce3cd59bfb1 100644
--- a/src/test/run-pass/task-comm-13.rs
+++ b/src/test/run-pass/task-comm-13.rs
@@ -12,17 +12,16 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
-fn start(c: &comm::Chan<int>, start: int, number_of_messages: int) {
+fn start(c: &Chan<int>, start: int, number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(start + i); i += 1; }
 }
 
 pub fn main() {
     info!("Check that we don't deadlock.");
-    let (_p, ch) = comm::stream();
-    task::try(proc() start(&ch, 0, 10) );
+    let (_p, ch) = Chan::new();
+    task::try(proc() { start(&ch, 0, 10) });
     info!("Joined task");
 }
diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs
index 8ef7f85e768..435d68ada49 100644
--- a/src/test/run-pass/task-comm-14.rs
+++ b/src/test/run-pass/task-comm-14.rs
@@ -10,19 +10,17 @@
 
 // xfail-fast
 
-use std::comm;
 use std::task;
 
 pub fn main() {
-    let (po, ch) = comm::stream();
-    let ch = comm::SharedChan::new(ch);
+    let (po, ch) = SharedChan::new();
 
     // Spawn 10 tasks each sending us back one int.
     let mut i = 10;
     while (i > 0) {
         info!("{}", i);
         let ch = ch.clone();
-        task::spawn({let i = i; proc() child(i, &ch)});
+        task::spawn({let i = i; proc() { child(i, &ch) });
         i = i - 1;
     }
 
@@ -39,7 +37,7 @@ pub fn main() {
     info!("main thread exiting");
 }
 
-fn child(x: int, ch: &comm::SharedChan<int>) {
+fn child(x: int, ch: &SharedChan<int>) {
     info!("{}", x);
     ch.send(x);
 }
diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs
index 66b7b4db5be..9f0d9908f7f 100644
--- a/src/test/run-pass/task-comm-15.rs
+++ b/src/test/run-pass/task-comm-15.rs
@@ -12,10 +12,9 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
-fn start(c: &comm::Chan<int>, i0: int) {
+fn start(c: &Chan<int>, i0: int) {
     let mut i = i0;
     while i > 0 {
         c.send(0);
@@ -28,7 +27,10 @@ pub fn main() {
     // is likely to terminate before the child completes, so from
     // the child's point of view the receiver may die. We should
     // drop messages on the floor in this case, and not crash!
-    let (p, ch) = comm::stream();
-    task::spawn(proc() start(&ch, 10));
+    let (p, ch) = Chan::new();
+    task::spawn(proc() {
+        let mut ch = ch;
+        start(&ch, 10)
+    });
     p.recv();
 }
diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs
index af4a8f68ac2..42d445dc24c 100644
--- a/src/test/run-pass/task-comm-16.rs
+++ b/src/test/run-pass/task-comm-16.rs
@@ -8,14 +8,13 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::comm;
 use std::cmp;
 
 // Tests of ports and channels on various types
 fn test_rec() {
     struct R {val0: int, val1: u8, val2: char}
 
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     let r0: R = R {val0: 0, val1: 1u8, val2: '2'};
     ch.send(r0);
     let mut r1: R;
@@ -26,7 +25,7 @@ fn test_rec() {
 }
 
 fn test_vec() {
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     let v0: ~[int] = ~[0, 1, 2];
     ch.send(v0);
     let v1 = po.recv();
@@ -36,7 +35,7 @@ fn test_vec() {
 }
 
 fn test_str() {
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     let s0 = ~"test";
     ch.send(s0);
     let s1 = po.recv();
@@ -80,7 +79,7 @@ impl cmp::Eq for t {
 }
 
 fn test_tag() {
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     ch.send(tag1);
     ch.send(tag2(10));
     ch.send(tag3(10, 11u8, 'A'));
@@ -94,10 +93,10 @@ fn test_tag() {
 }
 
 fn test_chan() {
-    let (po, ch) = comm::stream();
-    let (po0, ch0) = comm::stream();
+    let (po, ch) = Chan::new();
+    let (po0, ch0) = Chan::new();
     ch.send(ch0);
-    let ch1 = po.recv();
+    let mut ch1 = po.recv();
     // Does the transmitted channel still work?
 
     ch1.send(10);
diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs
index 030b70924f6..9679aebcc9c 100644
--- a/src/test/run-pass/task-comm-3.rs
+++ b/src/test/run-pass/task-comm-3.rs
@@ -12,8 +12,6 @@
 
 extern mod extra;
 
-use std::comm::SharedChan;
-use std::comm;
 use std::task;
 
 pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
@@ -35,8 +33,7 @@ fn test00() {
 
     info!("Creating tasks");
 
-    let (po, ch) = comm::stream();
-    let ch = comm::SharedChan::new(ch);
+    let (po, ch) = SharedChan::new();
 
     let mut i: int = 0;
 
@@ -47,8 +44,11 @@ fn test00() {
         let mut builder = task::task();
         results.push(builder.future_result());
         builder.spawn({
+            let ch = ch;
             let i = i;
-            proc() test00_start(&ch, i, number_of_messages)
+            proc() {
+                test00_start(&ch, i, number_of_messages)
+            }
         });
         i = i + 1;
     }
diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs
index 8e1704fe12f..3ac4c0e0087 100644
--- a/src/test/run-pass/task-comm-4.rs
+++ b/src/test/run-pass/task-comm-4.rs
@@ -10,14 +10,12 @@
 
 #[allow(dead_assignment)];
 
-use std::comm;
-
 pub fn main() { test00(); }
 
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, c) = comm::stream();
+    let (p, c) = Chan::new();
     c.send(1);
     c.send(2);
     c.send(3);
diff --git a/src/test/run-pass/task-comm-5.rs b/src/test/run-pass/task-comm-5.rs
index 40cc7ef49e9..c63bf12db2b 100644
--- a/src/test/run-pass/task-comm-5.rs
+++ b/src/test/run-pass/task-comm-5.rs
@@ -10,14 +10,12 @@
 
 extern mod extra;
 
-use std::comm;
-
 pub fn main() { test00(); }
 
 fn test00() {
     let _r: int = 0;
     let mut sum: int = 0;
-    let (p, c) = comm::stream();
+    let (p, c) = Chan::new();
     let number_of_messages: int = 1000;
     let mut i: int = 0;
     while i < number_of_messages { c.send(i + 0); i += 1; }
diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs
index b398ab41ed6..45994e78d94 100644
--- a/src/test/run-pass/task-comm-6.rs
+++ b/src/test/run-pass/task-comm-6.rs
@@ -10,20 +10,16 @@
 
 #[allow(dead_assignment)];
 
-use std::comm::SharedChan;
-use std::comm;
-
 pub fn main() { test00(); }
 
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, ch) = comm::stream();
-    let ch = SharedChan::new(ch);
-    let c0 = ch.clone();
-    let c1 = ch.clone();
-    let c2 = ch.clone();
-    let c3 = ch.clone();
+    let (p, ch) = SharedChan::new();
+    let mut c0 = ch.clone();
+    let mut c1 = ch.clone();
+    let mut c2 = ch.clone();
+    let mut c3 = ch.clone();
     let number_of_messages: int = 1000;
     let mut i: int = 0;
     while i < number_of_messages {
diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs
index c7c16af597c..43ac3957ae2 100644
--- a/src/test/run-pass/task-comm-7.rs
+++ b/src/test/run-pass/task-comm-7.rs
@@ -14,12 +14,12 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
 pub fn main() { test00(); }
 
-fn test00_start(c: &comm::SharedChan<int>, start: int, number_of_messages: int) {
+fn test00_start(c: &SharedChan<int>, start: int,
+                number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(start + i); i += 1; }
 }
@@ -27,8 +27,7 @@ fn test00_start(c: &comm::SharedChan<int>, start: int, number_of_messages: int)
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, ch) = comm::stream();
-    let ch = comm::SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
     let number_of_messages: int = 10;
 
     let c = ch.clone();
diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs
index 049810ff569..a2463ff7681 100644
--- a/src/test/run-pass/task-comm-9.rs
+++ b/src/test/run-pass/task-comm-9.rs
@@ -12,12 +12,11 @@
 
 extern mod extra;
 
-use std::comm;
 use std::task;
 
 pub fn main() { test00(); }
 
-fn test00_start(c: &comm::Chan<int>, number_of_messages: int) {
+fn test00_start(c: &Chan<int>, number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(i + 0); i += 1; }
 }
@@ -25,13 +24,14 @@ fn test00_start(c: &comm::Chan<int>, number_of_messages: int) {
 fn test00() {
     let r: int = 0;
     let mut sum: int = 0;
-    let (p, ch) = comm::stream();
+    let (p, ch) = Chan::new();
     let number_of_messages: int = 10;
 
     let mut builder = task::task();
     let result = builder.future_result();
     do builder.spawn {
-        test00_start(&ch, number_of_messages);
+        let mut ch = ch;
+        test00_start(&mut ch, number_of_messages);
     }
 
     let mut i: int = 0;
diff --git a/src/test/run-pass/task-comm-chan-nil.rs b/src/test/run-pass/task-comm-chan-nil.rs
index cd53c633c4e..32774007049 100644
--- a/src/test/run-pass/task-comm-chan-nil.rs
+++ b/src/test/run-pass/task-comm-chan-nil.rs
@@ -11,13 +11,11 @@
 
 extern mod extra;
 
-use std::comm;
-
 // rustboot can't transmit nils across channels because they don't have
 // any size, but rustc currently can because they do have size. Whether
 // or not this is desirable I don't know, but here's a regression test.
 pub fn main() {
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     ch.send(());
     let n: () = po.recv();
     assert_eq!(n, ());
diff --git a/src/test/run-pass/task-spawn-move-and-copy.rs b/src/test/run-pass/task-spawn-move-and-copy.rs
index aeca54c1fb5..1ed0c23fd7a 100644
--- a/src/test/run-pass/task-spawn-move-and-copy.rs
+++ b/src/test/run-pass/task-spawn-move-and-copy.rs
@@ -12,7 +12,7 @@ use std::ptr;
 use std::task;
 
 pub fn main() {
-    let (p, ch) = stream::<uint>();
+    let (p, ch) = Chan::<uint>::new();
 
     let x = ~1;
     let x_in_parent = ptr::to_unsafe_ptr(&(*x)) as uint;
diff --git a/src/test/run-pass/tempfile.rs b/src/test/run-pass/tempfile.rs
index 663eafe1700..d3bb8128a3b 100644
--- a/src/test/run-pass/tempfile.rs
+++ b/src/test/run-pass/tempfile.rs
@@ -38,7 +38,7 @@ fn test_tempdir() {
 }
 
 fn test_rm_tempdir() {
-    let (rd, wr) = stream();
+    let (rd, wr) = Chan::new();
     let f: proc() = proc() {
         let tmp = TempDir::new("test_rm_tempdir").unwrap();
         wr.send(tmp.path().clone());
diff --git a/src/test/run-pass/trait-bounds-in-arc.rs b/src/test/run-pass/trait-bounds-in-arc.rs
index abd9ea1733f..7e13d960947 100644
--- a/src/test/run-pass/trait-bounds-in-arc.rs
+++ b/src/test/run-pass/trait-bounds-in-arc.rs
@@ -18,7 +18,6 @@
 extern mod extra;
 
 use extra::arc;
-use std::comm;
 use std::task;
 
 trait Pet {
@@ -70,13 +69,13 @@ fn main() {
                          ~dogge1 as ~Pet:Freeze+Send,
                          ~fishe  as ~Pet:Freeze+Send,
                          ~dogge2 as ~Pet:Freeze+Send]);
-    let (p1,c1) = comm::stream();
+    let (p1,c1) = Chan::new();
     let arc1 = arc.clone();
     do task::spawn { check_legs(arc1); c1.send(()); }
-    let (p2,c2) = comm::stream();
+    let (p2,c2) = Chan::new();
     let arc2 = arc.clone();
     do task::spawn { check_names(arc2); c2.send(()); }
-    let (p3,c3) = comm::stream();
+    let (p3,c3) = Chan::new();
     let arc3 = arc.clone();
     do task::spawn { check_pedigree(arc3); c3.send(()); }
     p1.recv();
diff --git a/src/test/run-pass/trivial-message.rs b/src/test/run-pass/trivial-message.rs
index faea070c94d..e95f9184a63 100644
--- a/src/test/run-pass/trivial-message.rs
+++ b/src/test/run-pass/trivial-message.rs
@@ -13,10 +13,8 @@
   message.
  */
 
-use std::comm;
-
 pub fn main() {
-    let (po, ch) = comm::stream();
+    let (po, ch) = Chan::new();
     ch.send(42);
     let r = po.recv();
     error!("{:?}", r);
diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs
index f44802a9b34..d1c45a336fa 100644
--- a/src/test/run-pass/unique-send-2.rs
+++ b/src/test/run-pass/unique-send-2.rs
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::comm::{SharedChan, stream};
 use std::task;
 
 fn child(c: &SharedChan<~uint>, i: uint) {
@@ -16,13 +15,14 @@ fn child(c: &SharedChan<~uint>, i: uint) {
 }
 
 pub fn main() {
-    let (p, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
     let n = 100u;
     let mut expected = 0u;
     for i in range(0u, n) {
         let ch = ch.clone();
-        task::spawn(proc() child(&ch, i) );
+        task::spawn(proc() {
+            child(&ch, i)
+        });
         expected += i;
     }
 
diff --git a/src/test/run-pass/unique-send.rs b/src/test/run-pass/unique-send.rs
index a7d2f6a16a1..a1c0050e725 100644
--- a/src/test/run-pass/unique-send.rs
+++ b/src/test/run-pass/unique-send.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 pub fn main() {
-    let (p, c) = stream();
+    let (p, c) = Chan::new();
     c.send(~100);
     let v = p.recv();
     assert_eq!(v, ~100);
diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs
index 2e01eef1c69..f46769fa28e 100644
--- a/src/test/run-pass/unwind-resource.rs
+++ b/src/test/run-pass/unwind-resource.rs
@@ -12,7 +12,6 @@
 
 extern mod extra;
 
-use std::comm::{stream, SharedChan};
 use std::task;
 
 struct complainer {
@@ -40,8 +39,7 @@ fn f(c: SharedChan<bool>) {
 }
 
 pub fn main() {
-    let (p, c) = stream();
-    let c = SharedChan::new(c);
+    let (p, c) = SharedChan::new();
     task::spawn(proc() f(c.clone()));
     error!("hiiiiiiiii");
     assert!(p.recv());
diff --git a/src/test/run-pass/yield.rs b/src/test/run-pass/yield.rs
index 47bf6323c32..4ec30d6e02a 100644
--- a/src/test/run-pass/yield.rs
+++ b/src/test/run-pass/yield.rs
@@ -12,7 +12,7 @@ use std::task;
 
 pub fn main() {
     let mut builder = task::task();
-    let result = builder.future_result();
+    let mut result = builder.future_result();
     builder.spawn(child);
     error!("1");
     task::deschedule();
diff --git a/src/test/run-pass/yield1.rs b/src/test/run-pass/yield1.rs
index 39d6bf7390f..c08c62b47a2 100644
--- a/src/test/run-pass/yield1.rs
+++ b/src/test/run-pass/yield1.rs
@@ -12,7 +12,7 @@ use std::task;
 
 pub fn main() {
     let mut builder = task::task();
-    let result = builder.future_result();
+    let mut result = builder.future_result();
     builder.spawn(child);
     error!("1");
     task::deschedule();