about summary refs log tree commit diff
path: root/src/libextra
diff options
context:
space:
mode:
Diffstat (limited to 'src/libextra')
-rw-r--r--src/libextra/arc.rs27
-rw-r--r--src/libextra/comm.rs119
-rw-r--r--src/libextra/future.rs8
-rw-r--r--src/libextra/sync.rs101
-rw-r--r--src/libextra/task_pool.rs6
-rw-r--r--src/libextra/test.rs19
-rw-r--r--src/libextra/workcache.rs11
7 files changed, 121 insertions, 170 deletions
diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs
index 5700a299945..ea8066b786f 100644
--- a/src/libextra/arc.rs
+++ b/src/libextra/arc.rs
@@ -597,7 +597,6 @@ mod tests {
 
     use arc::*;
 
-    use std::comm;
     use std::task;
 
     #[test]
@@ -605,7 +604,7 @@ mod tests {
         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
         let arc_v = Arc::new(v);
 
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do task::spawn {
             let arc_v: Arc<~[int]> = p.recv();
@@ -626,7 +625,7 @@ mod tests {
     fn test_mutex_arc_condvar() {
         let arc = ~MutexArc::new(false);
         let arc2 = ~arc.clone();
-        let (p,c) = comm::oneshot();
+        let (p,c) = Chan::new();
         do task::spawn {
             // wait until parent gets in
             p.recv();
@@ -638,7 +637,7 @@ mod tests {
 
         let mut c = Some(c);
         arc.access_cond(|state, cond| {
-            c.take_unwrap().send(());
+            c.take_unwrawp().send(());
             assert!(!*state);
             while !*state {
                 cond.wait();
@@ -650,7 +649,7 @@ mod tests {
     fn test_arc_condvar_poison() {
         let arc = ~MutexArc::new(1);
         let arc2 = ~arc.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do spawn {
             let _ = p.recv();
@@ -687,7 +686,7 @@ mod tests {
     pub fn test_mutex_arc_unwrap_poison() {
         let arc = MutexArc::new(1);
         let arc2 = ~(&arc).clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         do task::spawn {
             arc2.access(|one| {
                 c.send(());
@@ -804,7 +803,7 @@ mod tests {
     fn test_rw_arc() {
         let arc = RWArc::new(0);
         let arc2 = arc.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         do task::spawn {
             arc2.write(|num| {
@@ -832,7 +831,7 @@ mod tests {
         });
 
         // Wait for children to pass their asserts
-        for r in children.iter() {
+        for r in children.mut_iter() {
             r.recv();
         }
 
@@ -855,7 +854,7 @@ mod tests {
         // Reader tasks
         let mut reader_convos = ~[];
         10.times(|| {
-            let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
+            let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new());
             reader_convos.push((rc1, rp2));
             let arcn = arc.clone();
             do task::spawn {
@@ -869,7 +868,7 @@ mod tests {
 
         // Writer task
         let arc2 = arc.clone();
-        let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
+        let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new());
         do task::spawn || {
             wp1.recv();
             arc2.write_cond(|state, cond| {
@@ -897,14 +896,14 @@ mod tests {
                 assert_eq!(*state, 42);
                 *state = 31337;
                 // send to other readers
-                for &(ref rc, _) in reader_convos.iter() {
+                for &(ref mut rc, _) in reader_convos.mut_iter() {
                     rc.send(())
                 }
             });
             let read_mode = arc.downgrade(write_mode);
             read_mode.read(|state| {
                 // complete handshake with other readers
-                for &(_, ref rp) in reader_convos.iter() {
+                for &(_, ref mut rp) in reader_convos.mut_iter() {
                     rp.recv()
                 }
                 wc1.send(()); // tell writer to try again
@@ -926,7 +925,7 @@ mod tests {
         //     "blk(&Condvar { order: opt_lock, ..*cond })"
         // with just "blk(cond)".
         let x = RWArc::new(true);
-        let (wp, wc) = comm::stream();
+        let (wp, wc) = Chan::new();
 
         // writer task
         let xw = x.clone();
@@ -951,7 +950,7 @@ mod tests {
             });
             // make a reader task to trigger the "reader cloud lock" handoff
             let xr = x.clone();
-            let (rp, rc) = comm::stream();
+            let (rp, rc) = Chan::new();
             do task::spawn {
                 rc.send(());
                 xr.read(|_state| { })
diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs
index 42287736ffa..09dd85fe0de 100644
--- a/src/libextra/comm.rs
+++ b/src/libextra/comm.rs
@@ -16,11 +16,6 @@ Higher level communication abstractions.
 
 #[allow(missing_doc)];
 
-
-use std::comm::{GenericChan, GenericSmartChan, GenericPort};
-use std::comm::{Chan, Port, Peekable};
-use std::comm;
-
 /// An extension of `pipes::stream` that allows both sending and receiving.
 pub struct DuplexStream<T, U> {
     priv chan: Chan<T>,
@@ -29,108 +24,73 @@ pub struct DuplexStream<T, U> {
 
 // Allow these methods to be used without import:
 impl<T:Send,U:Send> DuplexStream<T, U> {
+    /// Creates a bidirectional stream.
+    pub fn new() -> (DuplexStream<T, U>, DuplexStream<U, T>) {
+        let (p1, c2) = Chan::new();
+        let (p2, c1) = Chan::new();
+        (DuplexStream { chan: c1, port: p1 },
+         DuplexStream { chan: c2, port: p2 })
+    }
     pub fn send(&self, x: T) {
         self.chan.send(x)
     }
     pub fn try_send(&self, x: T) -> bool {
         self.chan.try_send(x)
     }
-    pub fn recv(&self, ) -> U {
+    pub fn recv(&self) -> U {
         self.port.recv()
     }
     pub fn try_recv(&self) -> Option<U> {
         self.port.try_recv()
     }
-    pub fn peek(&self) -> bool {
-        self.port.peek()
-    }
-}
-
-impl<T:Send,U:Send> GenericChan<T> for DuplexStream<T, U> {
-    fn send(&self, x: T) {
-        self.chan.send(x)
-    }
-}
-
-impl<T:Send,U:Send> GenericSmartChan<T> for DuplexStream<T, U> {
-    fn try_send(&self, x: T) -> bool {
-        self.chan.try_send(x)
-    }
-}
-
-impl<T:Send,U:Send> GenericPort<U> for DuplexStream<T, U> {
-    fn recv(&self) -> U {
-        self.port.recv()
-    }
-
-    fn try_recv(&self) -> Option<U> {
-        self.port.try_recv()
-    }
-}
-
-impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> {
-    fn peek(&self) -> bool {
-        self.port.peek()
+    pub fn recv_opt(&self) -> Option<U> {
+        self.port.recv_opt()
     }
 }
 
-/// Creates a bidirectional stream.
-pub fn DuplexStream<T:Send,U:Send>()
-    -> (DuplexStream<T, U>, DuplexStream<U, T>)
-{
-    let (p1, c2) = comm::stream();
-    let (p2, c1) = comm::stream();
-    (DuplexStream {
-        chan: c1,
-        port: p1
-    },
-     DuplexStream {
-         chan: c2,
-         port: p2
-     })
-}
-
 /// An extension of `pipes::stream` that provides synchronous message sending.
 pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> }
 /// An extension of `pipes::stream` that acknowledges each message received.
 pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> }
 
-impl<T: Send> GenericChan<T> for SyncChan<T> {
-    fn send(&self, val: T) {
+impl<T: Send> SyncChan<T> {
+    pub fn send(&self, val: T) {
         assert!(self.try_send(val), "SyncChan.send: receiving port closed");
     }
-}
 
-impl<T: Send> GenericSmartChan<T> for SyncChan<T> {
-    /// Sends a message, or report if the receiver has closed the connection before receiving.
-    fn try_send(&self, val: T) -> bool {
-        self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some()
+    /// Sends a message, or report if the receiver has closed the connection
+    /// before receiving.
+    pub fn try_send(&self, val: T) -> bool {
+        self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
     }
 }
 
-impl<T: Send> GenericPort<T> for SyncPort<T> {
-    fn recv(&self) -> T {
-        self.try_recv().expect("SyncPort.recv: sending channel closed")
+impl<T: Send> SyncPort<T> {
+    pub fn recv(&self) -> T {
+        self.recv_opt().expect("SyncPort.recv: sending channel closed")
     }
 
-    fn try_recv(&self) -> Option<T> {
-        self.duplex_stream.try_recv().map(|val| {
+    pub fn recv_opt(&self) -> Option<T> {
+        self.duplex_stream.recv_opt().map(|val| {
             self.duplex_stream.try_send(());
             val
         })
     }
-}
 
-impl<T: Send> Peekable<T> for SyncPort<T> {
-    fn peek(&self) -> bool {
-        self.duplex_stream.peek()
+    pub fn try_recv(&self) -> Option<T> {
+        self.duplex_stream.try_recv().map(|val| {
+            self.duplex_stream.try_send(());
+            val
+        })
     }
 }
 
-/// Creates a stream whose channel, upon sending a message, blocks until the message is received.
+/// Creates a stream whose channel, upon sending a message, blocks until the
+/// message is received.
 pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
-    let (chan_stream, port_stream) = DuplexStream();
-    (SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream })
+    let (chan_stream, port_stream) = DuplexStream::new();
+    (SyncPort { duplex_stream: port_stream },
+     SyncChan { duplex_stream: chan_stream })
 }
 
 #[cfg(test)]
@@ -141,7 +101,7 @@ mod test {
 
     #[test]
     pub fn DuplexStream1() {
-        let (left, right) = DuplexStream();
+        let (mut left, mut right) = DuplexStream::new();
 
         left.send(~"abc");
         right.send(123);
@@ -152,9 +112,10 @@ mod test {
 
     #[test]
     pub fn basic_rendezvous_test() {
-        let (port, chan) = rendezvous();
+        let (mut port, chan) = rendezvous();
 
         do spawn {
+            let mut chan = chan;
             chan.send("abc");
         }
 
@@ -165,8 +126,9 @@ mod test {
     fn recv_a_lot() {
         // Rendezvous streams should be able to handle any number of messages being sent
         do run_in_uv_task {
-            let (port, chan) = rendezvous();
+            let (mut port, chan) = rendezvous();
             do spawn {
+                let mut chan = chan;
                 1000000.times(|| { chan.send(()) })
             }
             1000000.times(|| { port.recv() })
@@ -175,8 +137,9 @@ mod test {
 
     #[test]
     fn send_and_fail_and_try_recv() {
-        let (port, chan) = rendezvous();
+        let (mut port, chan) = rendezvous();
         do spawn {
+            let mut chan = chan;
             chan.duplex_stream.send(()); // Can't access this field outside this module
             fail!()
         }
@@ -185,8 +148,9 @@ mod test {
 
     #[test]
     fn try_send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
+        let (port, mut chan) = rendezvous();
         do spawn {
+            let mut port = port;
             port.duplex_stream.recv();
             fail!()
         }
@@ -196,8 +160,9 @@ mod test {
     #[test]
     #[should_fail]
     fn send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
+        let (port, mut chan) = rendezvous();
         do spawn {
+            let mut port = port;
             port.duplex_stream.recv();
             fail!()
         }
diff --git a/src/libextra/future.rs b/src/libextra/future.rs
index 1a2ac398132..eb61b7781f1 100644
--- a/src/libextra/future.rs
+++ b/src/libextra/future.rs
@@ -25,7 +25,6 @@
 
 #[allow(missing_doc)];
 
-use std::comm::{PortOne, oneshot};
 use std::util::replace;
 
 /// A type encapsulating the result of a computation which may not be complete
@@ -104,7 +103,7 @@ impl<A> Future<A> {
 }
 
 impl<A:Send> Future<A> {
-    pub fn from_port(port: PortOne<A>) -> Future<A> {
+    pub fn from_port(port: Port<A>) -> Future<A> {
         /*!
          * Create a future from a port
          *
@@ -125,7 +124,7 @@ impl<A:Send> Future<A> {
          * value of the future.
          */
 
-        let (port, chan) = oneshot();
+        let (port, chan) = Chan::new();
 
         do spawn {
             chan.send(blk());
@@ -139,7 +138,6 @@ impl<A:Send> Future<A> {
 mod test {
     use future::Future;
 
-    use std::comm::oneshot;
     use std::task;
 
     #[test]
@@ -150,7 +148,7 @@ mod test {
 
     #[test]
     fn test_from_port() {
-        let (po, ch) = oneshot();
+        let (po, ch) = Chan::new();
         ch.send(~"whale");
         let mut f = Future::from_port(po);
         assert_eq!(f.get(), ~"whale");
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs
index f00c3d8db9a..1cc403c32f4 100644
--- a/src/libextra/sync.rs
+++ b/src/libextra/sync.rs
@@ -19,9 +19,6 @@
 
 
 use std::borrow;
-use std::comm;
-use std::comm::SendDeferred;
-use std::comm::{GenericPort, Peekable};
 use std::unstable::sync::{Exclusive, UnsafeArc};
 use std::unstable::atomics;
 use std::unstable::finally::Finally;
@@ -34,48 +31,53 @@ use std::util::NonCopyable;
 
 // Each waiting task receives on one of these.
 #[doc(hidden)]
-type WaitEnd = comm::PortOne<()>;
+type WaitEnd = Port<()>;
 #[doc(hidden)]
-type SignalEnd = comm::ChanOne<()>;
+type SignalEnd = Chan<()>;
 // A doubly-ended queue of waiting tasks.
 #[doc(hidden)]
-struct WaitQueue { head: comm::Port<SignalEnd>,
-                   tail: comm::Chan<SignalEnd> }
+struct WaitQueue { head: Port<SignalEnd>,
+                   tail: Chan<SignalEnd> }
 
 impl WaitQueue {
     fn new() -> WaitQueue {
-        let (block_head, block_tail) = comm::stream();
+        let (block_head, block_tail) = Chan::new();
         WaitQueue { head: block_head, tail: block_tail }
     }
 
     // Signals one live task from the queue.
     fn signal(&self) -> bool {
-        // The peek is mandatory to make sure recv doesn't block.
-        if self.head.peek() {
-            // Pop and send a wakeup signal. If the waiter was killed, its port
-            // will have closed. Keep trying until we get a live task.
-            if self.head.recv().try_send_deferred(()) {
-                true
-            } else {
-                self.signal()
+        match self.head.try_recv() {
+            Some(ch) => {
+                // Send a wakeup signal. If the waiter was killed, its port will
+                // have closed. Keep trying until we get a live task.
+                if ch.try_send_deferred(()) {
+                    true
+                } else {
+                    self.signal()
+                }
             }
-        } else {
-            false
+            None => false
         }
     }
 
     fn broadcast(&self) -> uint {
         let mut count = 0;
-        while self.head.peek() {
-            if self.head.recv().try_send_deferred(()) {
-                count += 1;
+        loop {
+            match self.head.try_recv() {
+                None => break,
+                Some(ch) => {
+                    if ch.try_send_deferred(()) {
+                        count += 1;
+                    }
+                }
             }
         }
         count
     }
 
     fn wait_end(&self) -> WaitEnd {
-        let (wait_end, signal_end) = comm::oneshot();
+        let (wait_end, signal_end) = Chan::new();
         self.tail.send_deferred(signal_end);
         wait_end
     }
@@ -282,8 +284,7 @@ impl<'a> Condvar<'a> {
                               condvar_id,
                               "cond.signal_on()",
                               || {
-                let queue = queue.take_unwrap();
-                queue.broadcast()
+                queue.take_unwrap().broadcast()
             })
         }
     }
@@ -676,7 +677,6 @@ mod tests {
     use sync::*;
 
     use std::cast;
-    use std::comm;
     use std::result;
     use std::task;
 
@@ -711,7 +711,7 @@ mod tests {
     #[test]
     fn test_sem_as_cvar() {
         /* Child waits and parent signals */
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let s = Semaphore::new(0);
         let s2 = s.clone();
         do task::spawn {
@@ -723,7 +723,7 @@ mod tests {
         let _ = p.recv();
 
         /* Parent waits and child signals */
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let s = Semaphore::new(0);
         let s2 = s.clone();
         do task::spawn {
@@ -740,8 +740,8 @@ mod tests {
         // time, and shake hands.
         let s = Semaphore::new(2);
         let s2 = s.clone();
-        let (p1,c1) = comm::stream();
-        let (p2,c2) = comm::stream();
+        let (p1,c1) = Chan::new();
+        let (p2,c2) = Chan::new();
         do task::spawn {
             s2.access(|| {
                 let _ = p2.recv();
@@ -760,7 +760,7 @@ mod tests {
         do task::spawn_sched(task::SingleThreaded) {
             let s = Semaphore::new(1);
             let s2 = s.clone();
-            let (p, c) = comm::stream();
+            let (p, c) = Chan::new();
             let mut child_data = Some((s2, c));
             s.access(|| {
                 let (s2, c) = child_data.take_unwrap();
@@ -782,7 +782,7 @@ mod tests {
     fn test_mutex_lock() {
         // Unsafely achieve shared state, and do the textbook
         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let m = Mutex::new();
         let m2 = m.clone();
         let mut sharedstate = ~0;
@@ -829,7 +829,7 @@ mod tests {
             cond.wait();
         });
         // Parent wakes up child
-        let (port,chan) = comm::stream();
+        let (port,chan) = Chan::new();
         let m3 = m.clone();
         do task::spawn {
             m3.lock_cond(|cond| {
@@ -852,7 +852,7 @@ mod tests {
 
         num_waiters.times(|| {
             let mi = m.clone();
-            let (port, chan) = comm::stream();
+            let (port, chan) = Chan::new();
             ports.push(port);
             do task::spawn {
                 mi.lock_cond(|cond| {
@@ -864,13 +864,13 @@ mod tests {
         });
 
         // wait until all children get in the mutex
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
         m.lock_cond(|cond| {
             let num_woken = cond.broadcast();
             assert_eq!(num_woken, num_waiters);
         });
         // wait until all children wake up
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
     }
     #[test]
     fn test_mutex_cond_broadcast() {
@@ -915,8 +915,8 @@ mod tests {
         let m2 = m.clone();
 
         let result: result::Result<(), ~Any> = do task::try {
-            let (p, c) = comm::stream();
-            do task::spawn || { // linked
+            let (p, c) = Chan::new();
+            do task::spawn { // linked
                 let _ = p.recv(); // wait for sibling to get in the mutex
                 task::deschedule();
                 fail!();
@@ -940,19 +940,18 @@ mod tests {
 
         let m = Mutex::new();
         let m2 = m.clone();
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
 
         let result: result::Result<(), ~Any> = do task::try {
             let mut sibling_convos = ~[];
             2.times(|| {
-                let (p, c) = comm::stream();
+                let (p, c) = Chan::new();
                 sibling_convos.push(p);
                 let mi = m2.clone();
                 // spawn sibling task
                 do task::spawn { // linked
                     let mut c = Some(c);
                     mi.lock_cond(|cond| {
-                        let c = c.take_unwrap();
                         c.send(()); // tell sibling to go ahead
                         (|| {
                             cond.wait(); // block forever
@@ -964,7 +963,7 @@ mod tests {
                     })
                 }
             });
-            for p in sibling_convos.iter() {
+            for p in sibling_convos.mut_iter() {
                 let _ = p.recv(); // wait for sibling to get in the mutex
             }
             m2.lock(|| { });
@@ -973,8 +972,8 @@ mod tests {
         };
         assert!(result.is_err());
         // child task must have finished by the time try returns
-        let r = p.recv();
-        for p in r.iter() { p.recv(); } // wait on all its siblings
+        let mut r = p.recv();
+        for p in r.mut_iter() { p.recv(); } // wait on all its siblings
         m.lock_cond(|cond| {
             let woken = cond.broadcast();
             assert_eq!(woken, 0);
@@ -999,7 +998,7 @@ mod tests {
         let result = do task::try {
             let m = Mutex::new_with_condvars(2);
             let m2 = m.clone();
-            let (p, c) = comm::stream();
+            let (p, c) = Chan::new();
             do task::spawn {
                 m2.lock_cond(|cond| {
                     c.send(());
@@ -1060,7 +1059,7 @@ mod tests {
                                  mode2: RWLockMode) {
         // Test mutual exclusion between readers and writers. Just like the
         // mutex mutual exclusion test, a ways above.
-        let (p, c) = comm::stream();
+        let (p, c) = Chan::new();
         let x2 = x.clone();
         let mut sharedstate = ~0;
         {
@@ -1111,8 +1110,8 @@ mod tests {
                                  make_mode2_go_first: bool) {
         // Much like sem_multi_resource.
         let x2 = x.clone();
-        let (p1, c1) = comm::stream();
-        let (p2, c2) = comm::stream();
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
         do task::spawn {
             if !make_mode2_go_first {
                 let _ = p2.recv(); // parent sends to us once it locks, or ...
@@ -1177,7 +1176,7 @@ mod tests {
             cond.wait();
         });
         // Parent wakes up child
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         let x3 = x.clone();
         do task::spawn {
             x3.write_cond(|cond| {
@@ -1214,7 +1213,7 @@ mod tests {
 
         num_waiters.times(|| {
             let xi = x.clone();
-            let (port, chan) = comm::stream();
+            let (port, chan) = Chan::new();
             ports.push(port);
             do task::spawn {
                 lock_cond(&xi, dg1, |cond| {
@@ -1226,13 +1225,13 @@ mod tests {
         });
 
         // wait until all children get in the mutex
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
         lock_cond(&x, dg2, |cond| {
             let num_woken = cond.broadcast();
             assert_eq!(num_woken, num_waiters);
         });
         // wait until all children wake up
-        for port in ports.iter() { let _ = port.recv(); }
+        for port in ports.mut_iter() { let _ = port.recv(); }
     }
     #[test]
     fn test_rwlock_cond_broadcast() {
diff --git a/src/libextra/task_pool.rs b/src/libextra/task_pool.rs
index bda6935643f..f0c9833adf8 100644
--- a/src/libextra/task_pool.rs
+++ b/src/libextra/task_pool.rs
@@ -14,8 +14,6 @@
 /// parallelism.
 
 
-use std::comm::{Chan, GenericChan, GenericPort};
-use std::comm;
 use std::task::SchedMode;
 use std::task;
 use std::vec;
@@ -35,7 +33,7 @@ pub struct TaskPool<T> {
 #[unsafe_destructor]
 impl<T> Drop for TaskPool<T> {
     fn drop(&mut self) {
-        for channel in self.channels.iter() {
+        for channel in self.channels.mut_iter() {
             channel.send(Quit);
         }
     }
@@ -54,7 +52,7 @@ impl<T> TaskPool<T> {
         assert!(n_tasks >= 1);
 
         let channels = vec::from_fn(n_tasks, |i| {
-            let (port, chan) = comm::stream::<Msg<T>>();
+            let (port, chan) = Chan::<Msg<T>>::new();
             let init_fn = init_fn_factory();
 
             let task_body: proc() = proc() {
diff --git a/src/libextra/test.rs b/src/libextra/test.rs
index c2b4ff05d5d..974d4dc1dc5 100644
--- a/src/libextra/test.rs
+++ b/src/libextra/test.rs
@@ -29,7 +29,6 @@ use time::precise_time_ns;
 use treemap::TreeMap;
 
 use std::clone::Clone;
-use std::comm::{stream, SharedChan, GenericPort, GenericChan};
 use std::io;
 use std::io::File;
 use std::io::Writer;
@@ -746,8 +745,7 @@ fn run_tests(opts: &TestOpts,
     remaining.reverse();
     let mut pending = 0;
 
-    let (p, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let (p, ch) = SharedChan::new();
 
     while pending > 0 || !remaining.is_empty() {
         while pending < concurrency && !remaining.is_empty() {
@@ -872,7 +870,7 @@ pub fn run_test(force_ignore: bool,
     fn run_test_inner(desc: TestDesc,
                       monitor_ch: SharedChan<MonitorMsg>,
                       testfn: proc()) {
-        do task::spawn {
+        do spawn {
             let mut task = task::task();
             task.name(match desc.name {
                 DynTestName(ref name) => SendStrOwned(name.clone()),
@@ -1206,7 +1204,6 @@ mod tests {
                StaticTestName, DynTestName, DynTestFn};
     use test::{TestOpts, run_test};
 
-    use std::comm::{stream, SharedChan};
     use tempfile::TempDir;
 
     #[test]
@@ -1220,8 +1217,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert!(res != TrOk);
@@ -1238,8 +1234,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrIgnored);
@@ -1256,8 +1251,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrOk);
@@ -1274,8 +1268,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = stream();
-        let ch = SharedChan::new(ch);
+        let (p, ch) = SharedChan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrFailed);
diff --git a/src/libextra/workcache.rs b/src/libextra/workcache.rs
index 91f1f1a0d0b..8713dbde920 100644
--- a/src/libextra/workcache.rs
+++ b/src/libextra/workcache.rs
@@ -15,8 +15,7 @@ use json::ToJson;
 use serialize::{Encoder, Encodable, Decoder, Decodable};
 use arc::{Arc,RWArc};
 use treemap::TreeMap;
-use std::comm::{PortOne, oneshot};
-use std::{str, task};
+use std::str;
 use std::io;
 use std::io::{File, Decorator};
 use std::io::mem::MemWriter;
@@ -252,7 +251,7 @@ pub struct Exec {
 
 enum Work<'a, T> {
     WorkValue(T),
-    WorkFromTask(&'a Prep<'a>, PortOne<(Exec, T)>),
+    WorkFromTask(&'a Prep<'a>, Port<(Exec, T)>),
 }
 
 fn json_encode<'a, T:Encodable<json::Encoder<'a>>>(t: &T) -> ~str {
@@ -427,11 +426,11 @@ impl<'a> Prep<'a> {
 
             _ => {
                 debug!("Cache miss!");
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
                 let blk = bo.take_unwrap();
 
                 // XXX: What happens if the task fails?
-                do task::spawn {
+                do spawn {
                     let mut exe = Exec {
                         discovered_inputs: WorkMap::new(),
                         discovered_outputs: WorkMap::new(),
@@ -453,7 +452,7 @@ impl<'a, T:Send +
     pub fn from_value(elt: T) -> Work<'a, T> {
         WorkValue(elt)
     }
-    pub fn from_task(prep: &'a Prep<'a>, port: PortOne<(Exec, T)>)
+    pub fn from_task(prep: &'a Prep<'a>, port: Port<(Exec, T)>)
         -> Work<'a, T> {
         WorkFromTask(prep, port)
     }