about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/doc/guide-tasks.md2
-rw-r--r--src/libextra/test.rs14
-rw-r--r--src/libgreen/lib.rs4
-rw-r--r--src/libnative/io/mod.rs3
-rw-r--r--src/libnative/io/timer_helper.rs10
-rw-r--r--src/librustuv/signal.rs7
-rw-r--r--src/librustuv/uvio.rs3
-rw-r--r--src/libstd/comm/mod.rs805
-rw-r--r--src/libstd/comm/oneshot.rs382
-rw-r--r--src/libstd/comm/select.rs312
-rw-r--r--src/libstd/comm/shared.rs483
-rw-r--r--src/libstd/comm/stream.rs460
-rw-r--r--src/libstd/io/signal.rs6
-rw-r--r--src/libstd/prelude.rs2
-rw-r--r--src/libstd/rt/mod.rs1
-rw-r--r--src/libstd/rt/rtio.rs4
-rw-r--r--src/libstd/rt/task.rs2
-rw-r--r--src/libstd/rt/unwind.rs1
-rw-r--r--src/libstd/run.rs4
-rw-r--r--src/libstd/sync/mpmc_bounded_queue.rs2
-rw-r--r--src/libstd/sync/mpsc_queue.rs11
-rw-r--r--src/libstd/sync/spsc_queue.rs16
-rw-r--r--src/libstd/task.rs5
-rw-r--r--src/libsync/sync/mod.rs4
-rw-r--r--src/libsync/sync/mutex.rs2
-rw-r--r--src/libsync/sync/one.rs2
-rw-r--r--src/test/bench/msgsend-pipes-shared.rs2
-rw-r--r--src/test/bench/msgsend-pipes.rs2
-rw-r--r--src/test/bench/shootout-chameneos-redux.rs8
-rw-r--r--src/test/bench/shootout-pfib.rs6
-rw-r--r--src/test/bench/task-perf-linked-failure.rs4
-rw-r--r--src/test/compile-fail/comm-not-freeze.rs2
-rw-r--r--src/test/run-pass/hashmap-memory.rs8
-rw-r--r--src/test/run-pass/task-comm-14.rs4
-rw-r--r--src/test/run-pass/task-comm-3.rs4
-rw-r--r--src/test/run-pass/task-comm-6.rs2
-rw-r--r--src/test/run-pass/task-comm-7.rs4
-rw-r--r--src/test/run-pass/unique-send-2.rs4
-rw-r--r--src/test/run-pass/unwind-resource.rs8
39 files changed, 1907 insertions, 698 deletions
diff --git a/src/doc/guide-tasks.md b/src/doc/guide-tasks.md
index 387f481025d..9ff712df021 100644
--- a/src/doc/guide-tasks.md
+++ b/src/doc/guide-tasks.md
@@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single
 ~~~
 # use std::task::spawn;
 
-let (port, chan) = SharedChan::new();
+let (port, chan) = Chan::new();
 
 for init_val in range(0u, 3) {
     // Create a new channel handle to distribute to the child task
diff --git a/src/libextra/test.rs b/src/libextra/test.rs
index d207bd2298b..9ebd91bdfb6 100644
--- a/src/libextra/test.rs
+++ b/src/libextra/test.rs
@@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts,
     remaining.reverse();
     let mut pending = 0;
 
-    let (p, ch) = SharedChan::new();
+    let (p, ch) = Chan::new();
 
     while pending > 0 || !remaining.is_empty() {
         while pending < concurrency && !remaining.is_empty() {
@@ -878,7 +878,7 @@ pub fn filter_tests(
 
 pub fn run_test(force_ignore: bool,
                 test: TestDescAndFn,
-                monitor_ch: SharedChan<MonitorMsg>) {
+                monitor_ch: Chan<MonitorMsg>) {
 
     let TestDescAndFn {desc, testfn} = test;
 
@@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool,
     }
 
     fn run_test_inner(desc: TestDesc,
-                      monitor_ch: SharedChan<MonitorMsg>,
+                      monitor_ch: Chan<MonitorMsg>,
                       testfn: proc()) {
         spawn(proc() {
             let mut task = task::task();
@@ -1260,7 +1260,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = SharedChan::new();
+        let (p, ch) = Chan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert!(res != TrOk);
@@ -1277,7 +1277,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = SharedChan::new();
+        let (p, ch) = Chan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrIgnored);
@@ -1294,7 +1294,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = SharedChan::new();
+        let (p, ch) = Chan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrOk);
@@ -1311,7 +1311,7 @@ mod tests {
             },
             testfn: DynTestFn(proc() f()),
         };
-        let (p, ch) = SharedChan::new();
+        let (p, ch) = Chan::new();
         run_test(false, desc, ch);
         let (_, res) = p.recv();
         assert_eq!(res, TrFailed);
diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs
index 495f6ead1ca..834bf7951ef 100644
--- a/src/libgreen/lib.rs
+++ b/src/libgreen/lib.rs
@@ -315,7 +315,7 @@ pub struct SchedPool {
 #[deriving(Clone)]
 struct TaskState {
     cnt: UnsafeArc<AtomicUint>,
-    done: SharedChan<()>,
+    done: Chan<()>,
 }
 
 impl SchedPool {
@@ -469,7 +469,7 @@ impl SchedPool {
 
 impl TaskState {
     fn new() -> (Port<()>, TaskState) {
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         (p, TaskState {
             cnt: UnsafeArc::new(AtomicUint::new(0)),
             done: c,
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index c94554f510e..69ef10ac11b 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -22,7 +22,6 @@
 //! that you would find on the respective platform.
 
 use std::c_str::CString;
-use std::comm::SharedChan;
 use std::io;
 use std::io::IoError;
 use std::io::net::ip::SocketAddr;
@@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory {
             })
         }
     }
-    fn signal(&mut self, _signal: Signum, _channel: SharedChan<Signum>)
+    fn signal(&mut self, _signal: Signum, _channel: Chan<Signum>)
         -> IoResult<~RtioSignal> {
         Err(unimpl())
     }
diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs
index c00b0efadb5..2c976e67d25 100644
--- a/src/libnative/io/timer_helper.rs
+++ b/src/libnative/io/timer_helper.rs
@@ -33,7 +33,7 @@ use task;
 // only torn down after everything else has exited. This means that these
 // variables are read-only during use (after initialization) and both of which
 // are safe to use concurrently.
-static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
+static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
 static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
 
 pub fn boot(helper: fn(imp::signal, Port<Req>)) {
@@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
     unsafe {
         LOCK.lock();
         if !INITIALIZED {
-            let (msgp, msgc) = SharedChan::new();
+            let (msgp, msgc) = Chan::new();
+            // promote this to a shared channel
+            drop(msgc.clone());
             HELPER_CHAN = cast::transmute(~msgc);
             let (receive, send) = imp::new();
             HELPER_SIGNAL = send;
@@ -84,8 +86,8 @@ fn shutdown() {
     // Clean up after ther helper thread
     unsafe {
         imp::close(HELPER_SIGNAL);
-        let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
-        HELPER_CHAN = 0 as *mut SharedChan<Req>;
+        let _chan: ~Chan<Req> = cast::transmute(HELPER_CHAN);
+        HELPER_CHAN = 0 as *mut Chan<Req>;
         HELPER_SIGNAL = 0 as imp::signal;
     }
 }
diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs
index 2fcc61be79b..0a66c3445ee 100644
--- a/src/librustuv/signal.rs
+++ b/src/librustuv/signal.rs
@@ -10,7 +10,6 @@
 
 use std::libc::c_int;
 use std::io::signal::Signum;
-use std::comm::SharedChan;
 use std::rt::rtio::RtioSignal;
 
 use homing::{HomingIO, HomeHandle};
@@ -22,13 +21,13 @@ pub struct SignalWatcher {
     handle: *uvll::uv_signal_t,
     home: HomeHandle,
 
-    channel: SharedChan<Signum>,
+    channel: Chan<Signum>,
     signal: Signum,
 }
 
 impl SignalWatcher {
     pub fn new(io: &mut UvIoFactory, signum: Signum,
-               channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
+               channel: Chan<Signum>) -> Result<~SignalWatcher, UvError> {
         let s = ~SignalWatcher {
             handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
             home: io.make_handle(),
@@ -81,7 +80,7 @@ mod test {
     #[test]
     fn closing_channel_during_drop_doesnt_kill_everything() {
         // see issue #10375, relates to timers as well.
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
                                          chan);
 
diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs
index 8a8ef4a41ec..54db4b4d3d1 100644
--- a/src/librustuv/uvio.rs
+++ b/src/librustuv/uvio.rs
@@ -10,7 +10,6 @@
 
 use std::c_str::CString;
 use std::cast;
-use std::comm::SharedChan;
 use std::io::IoError;
 use std::io::net::ip::SocketAddr;
 use std::io::process::ProcessConfig;
@@ -304,7 +303,7 @@ impl IoFactory for UvIoFactory {
         }
     }
 
-    fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
+    fn signal(&mut self, signum: Signum, channel: Chan<Signum>)
         -> Result<~rtio::RtioSignal, IoError> {
         match SignalWatcher::new(self, signum, channel) {
             Ok(s) => Ok(s as ~rtio::RtioSignal),
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index 63d69100671..9b320dfe62b 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -15,17 +15,16 @@
 //! communication between concurrent tasks. The primitives defined in this
 //! module are the building blocks for synchronization in rust.
 //!
-//! This module currently provides three main types:
+//! This module currently provides two types:
 //!
 //! * `Chan`
 //! * `Port`
-//! * `SharedChan`
 //!
-//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A
-//! `SharedChan` is clone-able such that many tasks can send simultaneously to
-//! one receiving port. These communication primitives are *task blocking*, not
-//! *thread blocking*. This means that if one task is blocked on a channel,
-//! other tasks can continue to make progress.
+//! `Chan` is used to send data to a `Port`. A `Chan` is clone-able such that
+//! many tasks can send simultaneously to one receiving port. These
+//! communication primitives are *task blocking*, not *thread blocking*. This
+//! means that if one task is blocked on a channel, other tasks can continue to
+//! make progress.
 //!
 //! Rust channels can be used as if they have an infinite internal buffer. What
 //! this means is that the `send` operation will never block. `Port`s, on the
@@ -39,8 +38,8 @@
 //! next operation `fail!`. The purpose of this is to allow propagation of
 //! failure among tasks that are linked to one another via channels.
 //!
-//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform
-//! their respective operations without failing, however.
+//! There are methods on both of `Chan` and `Port` to perform their respective
+//! operations without failing, however.
 //!
 //! ## Outside the Runtime
 //!
@@ -66,7 +65,7 @@
 //! assert_eq!(port.recv(), 10);
 //!
 //! // Create a shared channel which can be sent along from many tasks
-//! let (port, chan) = SharedChan::new();
+//! let (port, chan) = Chan::new();
 //! for i in range(0, 10) {
 //!     let chan = chan.clone();
 //!     spawn(proc() {
@@ -100,10 +99,22 @@
 //
 // ## Flavors of channels
 //
-// Rust channels come in two flavors: streams and shared channels. A stream has
-// one sender and one receiver while a shared channel could have multiple
-// senders. This choice heavily influences the design of the protocol set
-// forth for both senders/receivers.
+// From the perspective of a consumer of this library, there is only one flavor
+// of channel. This channel can be used as a stream and cloned to allow multiple
+// senders. Under the hood, however, there are actually three flavors of
+// channels in play.
+//
+// * Oneshots - these channels are highly optimized for the one-send use case.
+//              They contain as few atomics as possible and involve one and
+//              exactly one allocation.
+// * Streams - these channels are optimized for the non-shared use case. They
+//             use a different concurrent queue which is more tailored for this
+//             use case. The initial allocation of this flavor of channel is not
+//             optimized.
+// * Shared - this is the most general form of channel that this module offers,
+//            a channel with multiple senders. This type is as optimized as it
+//            can be, but the previous two types mentioned are much faster for
+//            their use-cases.
 //
 // ## Concurrent queues
 //
@@ -226,26 +237,20 @@
 // here's the code for you to find some more!
 
 use cast;
+use cell::Cell;
 use clone::Clone;
-use container::Container;
-use int;
 use iter::Iterator;
-use kinds::marker;
 use kinds::Send;
+use kinds::marker;
 use ops::Drop;
-use option::{Option, Some, None};
-use result::{Ok, Err};
+use option::{Some, None, Option};
+use result::{Ok, Err, Result};
 use rt::local::Local;
 use rt::task::{Task, BlockedTask};
-use rt::thread::Thread;
 use sync::arc::UnsafeArc;
-use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
-use vec::OwnedVector;
-
-use spsc = sync::spsc_queue;
-use mpsc = sync::mpsc_queue;
+use util;
 
-pub use self::select::{Select, Handle};
+pub use comm::select::{Select, Handle};
 
 macro_rules! test (
     { fn $name:ident() $b:block $($a:attr)*} => (
@@ -273,15 +278,19 @@ macro_rules! test (
 )
 
 mod select;
+mod oneshot;
+mod stream;
+mod shared;
 
-///////////////////////////////////////////////////////////////////////////////
-// Public structs
-///////////////////////////////////////////////////////////////////////////////
+// Use a power of 2 to allow LLVM to optimize to something that's not a
+// division, this is hit pretty regularly.
+static RESCHED_FREQ: int = 256;
 
 /// The receiving-half of Rust's channel type. This half can only be owned by
 /// one task
 pub struct Port<T> {
-    priv inner: PortInner<T>,
+    priv inner: Flavor<T>,
+    priv receives: Cell<uint>,
     // can't share in an arc
     priv marker: marker::NoFreeze,
 }
@@ -296,20 +305,12 @@ pub struct Messages<'a, T> {
 /// The sending-half of Rust's channel type. This half can only be owned by one
 /// task
 pub struct Chan<T> {
-    priv inner: UnsafeArc<SingleInner<T>>,
+    priv inner: Flavor<T>,
+    priv sends: Cell<uint>,
     // can't share in an arc
     priv marker: marker::NoFreeze,
 }
 
-/// The sending-half of Rust's channel type. This half can be shared among many
-/// tasks by creating copies of itself through the `clone` method.
-pub struct SharedChan<T> {
-    // can't share in an arc -- technically this implementation is
-    // shareable, but it shouldn't be required to be shareable in an
-    // arc
-    priv marker: marker::NoFreeze,
-}
-
 /// This enumeration is the list of the possible reasons that try_recv could not
 /// return data when called.
 #[deriving(Eq, Clone)]
@@ -324,221 +325,10 @@ pub enum TryRecvResult<T> {
     Data(T),
 }
 
-///////////////////////////////////////////////////////////////////////////////
-// Internal struct definitions
-///////////////////////////////////////////////////////////////////////////////
-
-enum PortInner<T> {
-    Single(UnsafeArc<SingleInner<T>>),
-    Shared(UnsafeArc<SharedInner<T>>),
-}
-
-struct SingleInner<T> {
-    queue: spsc::Queue<T>,
-    packet: Packet,
-}
-
-struct SharedInner<T> {
-    queue: mpsc::Queue<T>,
-    packet: Packet,
-}
-
-struct Packet {
-    cnt: AtomicInt, // How many items are on this channel
-    steals: int,    // How many times has a port received without blocking?
-    to_wake: Option<BlockedTask>, // Task to wake up
-
-    // The number of channels which are currently using this packet. This is
-    // used to reference count shared channels.
-    channels: AtomicInt,
-
-    selecting: AtomicBool,
-    selection_id: uint,
-    select_next: *mut Packet,
-    select_prev: *mut Packet,
-    recv_cnt: int,
-}
-
-///////////////////////////////////////////////////////////////////////////////
-// All implementations -- the fun part
-///////////////////////////////////////////////////////////////////////////////
-
-static DISCONNECTED: int = int::MIN;
-static RESCHED_FREQ: int = 200;
-
-impl<T: Send> PortInner<T> {
-    fn packet<'a>(&'a mut self) -> &'a mut Packet {
-        match *self {
-            Single(ref arc) => unsafe { &mut (*arc.get()).packet },
-            Shared(ref arc) => unsafe { &mut (*arc.get()).packet },
-        }
-    }
-}
-
-impl Packet {
-    fn new() -> Packet {
-        Packet {
-            cnt: AtomicInt::new(0),
-            steals: 0,
-            to_wake: None,
-            channels: AtomicInt::new(1),
-
-            selecting: AtomicBool::new(false),
-            selection_id: 0,
-            select_next: 0 as *mut Packet,
-            select_prev: 0 as *mut Packet,
-            recv_cnt: 0,
-        }
-    }
-
-    // Increments the channel size count, preserving the disconnected state if
-    // the other end has disconnected.
-    fn increment(&mut self) -> int {
-        match self.cnt.fetch_add(1, SeqCst) {
-            DISCONNECTED => {
-                // see the comment in 'try' for a shared channel for why this
-                // window of "not disconnected" is "ok".
-                self.cnt.store(DISCONNECTED, SeqCst);
-                DISCONNECTED
-            }
-            n => n
-        }
-    }
-
-    // Decrements the reference count of the channel, returning whether the task
-    // should block or not. This assumes that the task is ready to sleep in that
-    // the `to_wake` field has already been filled in. Once this decrement
-    // happens, the task could wake up on the other end.
-    //
-    // From an implementation perspective, this is also when our "steal count"
-    // gets merged into the "channel count". Our steal count is reset to 0 after
-    // this function completes.
-    //
-    // As with increment(), this preserves the disconnected state if the
-    // channel is disconnected.
-    fn decrement(&mut self) -> bool {
-        let steals = self.steals;
-        self.steals = 0;
-        match self.cnt.fetch_sub(1 + steals, SeqCst) {
-            DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, SeqCst);
-                false
-            }
-            n => {
-                assert!(n >= 0);
-                n - steals <= 0
-            }
-        }
-    }
-
-    // Helper function for select, tests whether this port can receive without
-    // blocking (obviously not an atomic decision).
-    fn can_recv(&self) -> bool {
-        let cnt = self.cnt.load(SeqCst);
-        cnt == DISCONNECTED || cnt - self.steals > 0
-    }
-
-    // This function must have had at least an acquire fence before it to be
-    // properly called.
-    fn wakeup(&mut self) {
-        match self.to_wake.take_unwrap().wake() {
-            Some(task) => task.reawaken(),
-            None => {}
-        }
-        self.selecting.store(false, Relaxed);
-    }
-
-    // Aborts the selection process for a port. This happens as part of select()
-    // once the task has reawoken. This will place the channel back into a
-    // consistent state which is ready to be received from again.
-    //
-    // The method of doing this is a little subtle. These channels have the
-    // invariant that if -1 is seen, then to_wake is always Some(..) and should
-    // be woken up. This aborting process at least needs to add 1 to the
-    // reference count, but that is not guaranteed to make the count positive
-    // (our steal count subtraction could mean that after the addition the
-    // channel count is still negative).
-    //
-    // In order to get around this, we force our channel count to go above 0 by
-    // adding a large number >= 1 to it. This way no sender will see -1 unless
-    // we are indeed blocking. This "extra lump" we took out of the channel
-    // becomes our steal count (which will get re-factored into the count on the
-    // next blocking recv)
-    //
-    // The return value of this method is whether there is data on this channel
-    // to receive or not.
-    fn abort_selection(&mut self, take_to_wake: bool) -> bool {
-        // make sure steals + 1 makes the count go non-negative
-        let steals = {
-            let cnt = self.cnt.load(SeqCst);
-            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
-        };
-        let prev = self.cnt.fetch_add(steals + 1, SeqCst);
-
-        // If we were previously disconnected, then we know for sure that there
-        // is no task in to_wake, so just keep going
-        if prev == DISCONNECTED {
-            assert!(self.to_wake.is_none());
-            self.cnt.store(DISCONNECTED, SeqCst);
-            self.selecting.store(false, SeqCst);
-            true // there is data, that data is that we're disconnected
-        } else {
-            let cur = prev + steals + 1;
-            assert!(cur >= 0);
-
-            // If the previous count was negative, then we just made things go
-            // positive, hence we passed the -1 boundary and we're responsible
-            // for removing the to_wake() field and trashing it.
-            if prev < 0 {
-                if take_to_wake {
-                    self.to_wake.take_unwrap().trash();
-                } else {
-                    assert!(self.to_wake.is_none());
-                }
-
-                // We woke ourselves up, we're responsible for cancelling
-                assert!(self.selecting.load(Relaxed));
-                self.selecting.store(false, Relaxed);
-            }
-            assert_eq!(self.steals, 0);
-            self.steals = steals;
-
-            // if we were previously positive, then there's surely data to
-            // receive
-            prev >= 0
-        }
-    }
-
-    // Decrement the reference count on a channel. This is called whenever a
-    // Chan is dropped and may end up waking up a receiver. It's the receiver's
-    // responsibility on the other end to figure out that we've disconnected.
-    unsafe fn drop_chan(&mut self) {
-        match self.channels.fetch_sub(1, SeqCst) {
-            1 => {
-                match self.cnt.swap(DISCONNECTED, SeqCst) {
-                    -1 => { self.wakeup(); }
-                    DISCONNECTED => {}
-                    n => { assert!(n >= 0); }
-                }
-            }
-            n if n > 1 => {},
-            n => fail!("bad number of channels left {}", n),
-        }
-    }
-}
-
-impl Drop for Packet {
-    fn drop(&mut self) {
-        unsafe {
-            // Note that this load is not only an assert for correctness about
-            // disconnection, but also a proper fence before the read of
-            // `to_wake`, so this assert cannot be removed with also removing
-            // the `to_wake` assert.
-            assert_eq!(self.cnt.load(SeqCst), DISCONNECTED);
-            assert!(self.to_wake.is_none());
-            assert_eq!(self.channels.load(SeqCst), 0);
-        }
-    }
+enum Flavor<T> {
+    Oneshot(UnsafeArc<oneshot::Packet<T>>),
+    Stream(UnsafeArc<stream::Packet<T>>),
+    Shared(UnsafeArc<shared::Packet<T>>),
 }
 
 impl<T: Send> Chan<T> {
@@ -546,14 +336,12 @@ impl<T: Send> Chan<T> {
     /// will become available on the port as well. See the documentation of
     /// `Port` and `Chan` to see what's possible with them.
     pub fn new() -> (Port<T>, Chan<T>) {
-        // arbitrary 128 size cache -- this is just a max cache size, not a
-        // maximum buffer size
-        let (a, b) = UnsafeArc::new2(SingleInner {
-            queue: spsc::Queue::new(128),
-            packet: Packet::new(),
-        });
-        (Port { inner: Single(a), marker: marker::NoFreeze },
-         Chan { inner: b, marker: marker::NoFreeze })
+        let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
+        (Port::my_new(Oneshot(a)), Chan::my_new(Oneshot(b)))
+    }
+
+    fn my_new(inner: Flavor<T>) -> Chan<T> {
+        Chan { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze }
     }
 
     /// Sends a value along this channel to be received by the corresponding
@@ -595,132 +383,105 @@ impl<T: Send> Chan<T> {
     /// Like `send`, this method will never block. If the failure of send cannot
     /// be tolerated, then this method should be used instead.
     pub fn try_send(&self, t: T) -> bool {
-        unsafe {
-            let inner = self.inner.get();
-            (*inner).queue.push(t);
-            match (*inner).packet.increment() {
-                // As described above, -1 == wakeup
-                -1 => { (*inner).packet.wakeup(); true }
-                // Also as above, SPSC queues must be >= -2
-                -2 => true,
-                // We succeeded if we sent data
-                DISCONNECTED => (*inner).queue.is_empty(),
-                // In order to prevent starvation of other tasks in situations
-                // where a task sends repeatedly without ever receiving, we
-                // occassionally yield instead of doing a send immediately.
-                // Only doing this if we're doing a rescheduling send, otherwise
-                // the caller is expecting not to context switch.
-                //
-                // Note that we don't unconditionally attempt to yield because
-                // the TLS overhead can be a bit much.
-                n => {
-                    assert!(n >= 0);
-                    if n > 0 && n % RESCHED_FREQ == 0 {
-                        let task: ~Task = Local::take();
-                        task.maybe_yield();
+        // In order to prevent starvation of other tasks in situations where
+        // a task sends repeatedly without ever receiving, we occassionally
+        // yield instead of doing a send immediately.  Only doing this if
+        // we're doing a rescheduling send, otherwise the caller is
+        // expecting not to context switch.
+        //
+        // Note that we don't unconditionally attempt to yield because the
+        // TLS overhead can be a bit much.
+        let cnt = self.sends.get() + 1;
+        self.sends.set(cnt);
+        if cnt % (RESCHED_FREQ as uint) == 0 {
+            let task: ~Task = Local::take();
+            task.maybe_yield();
+        }
+
+        let (new_inner, ret) = match self.inner {
+            Oneshot(ref p) => {
+                let p = p.get();
+                unsafe {
+                    if !(*p).sent() {
+                        return (*p).send(t);
+                    } else {
+                        let (a, b) = UnsafeArc::new2(stream::Packet::new());
+                        match (*p).upgrade(Port::my_new(Stream(b))) {
+                            oneshot::UpSuccess => {
+                                (*a.get()).send(t);
+                                (a, true)
+                            }
+                            oneshot::UpDisconnected => (a, false),
+                            oneshot::UpWoke(task) => {
+                                (*a.get()).send(t);
+                                task.wake().map(|t| t.reawaken());
+                                (a, true)
+                            }
+                        }
                     }
-                    true
                 }
             }
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Chan<T> {
-    fn drop(&mut self) {
-        unsafe { (*self.inner.get()).packet.drop_chan(); }
-    }
-}
-
-impl<T: Send> SharedChan<T> {
-    /// Creates a new shared channel and port pair. The purpose of a shared
-    /// channel is to be cloneable such that many tasks can send data at the
-    /// same time. All data sent on any channel will become available on the
-    /// provided port as well.
-    pub fn new() -> (Port<T>, SharedChan<T>) {
-        let (a, b) = UnsafeArc::new2(SharedInner {
-            queue: mpsc::Queue::new(),
-            packet: Packet::new(),
-        });
-        (Port { inner: Shared(a), marker: marker::NoFreeze },
-         SharedChan { inner: b, marker: marker::NoFreeze })
-        (Port { inner: Shared(a) }, SharedChan { inner: b })
-    }
+            Stream(ref p) => return unsafe { (*p.get()).send(t) },
+            Shared(ref p) => return unsafe { (*p.get()).send(t) },
+        };
 
-    /// Equivalent method to `send` on the `Chan` type (using the same
-    /// semantics)
-    pub fn send(&self, t: T) {
-        if !self.try_send(t) {
-            fail!("sending on a closed channel");
+        unsafe {
+            let mut tmp = Chan::my_new(Stream(new_inner));
+            util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
         }
+        return ret;
     }
+}
 
-    /// Equivalent method to `try_send` on the `Chan` type (using the same
-    /// semantics)
-    pub fn try_send(&self, t: T) -> bool {
-        unsafe {
-            // Note that the multiple sender case is a little tricker
-            // semantically than the single sender case. The logic for
-            // incrementing is "add and if disconnected store disconnected".
-            // This could end up leading some senders to believe that there
-            // wasn't a disconnect if in fact there was a disconnect. This means
-            // that while one thread is attempting to re-store the disconnected
-            // states, other threads could walk through merrily incrementing
-            // this very-negative disconnected count. To prevent senders from
-            // spuriously attempting to send when the channels is actually
-            // disconnected, the count has a ranged check here.
-            //
-            // This is also done for another reason. Remember that the return
-            // value of this function is:
-            //
-            //  `true` == the data *may* be received, this essentially has no
-            //            meaning
-            //  `false` == the data will *never* be received, this has a lot of
-            //             meaning
-            //
-            // In the SPSC case, we have a check of 'queue.is_empty()' to see
-            // whether the data was actually received, but this same condition
-            // means nothing in a multi-producer context. As a result, this
-            // preflight check serves as the definitive "this will never be
-            // received". Once we get beyond this check, we have permanently
-            // entered the realm of "this may be received"
-            let inner = self.inner.get();
-            if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 {
-                return false
+impl<T: Send> Clone for Chan<T> {
+    fn clone(&self) -> Chan<T> {
+        let (packet, sleeper) = match self.inner {
+            Oneshot(ref p) => {
+                let (a, b) = UnsafeArc::new2(shared::Packet::new());
+                match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } {
+                    oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
+                    oneshot::UpWoke(task) => (b, Some(task))
+                }
             }
-
-            (*inner).queue.push(t);
-            match (*inner).packet.increment() {
-                DISCONNECTED => {} // oh well, we tried
-                -1 => { (*inner).packet.wakeup(); }
-                n => {
-                    if n > 0 && n % RESCHED_FREQ == 0 {
-                        let task: ~Task = Local::take();
-                        task.maybe_yield();
-                    }
+            Stream(ref p) => {
+                let (a, b) = UnsafeArc::new2(shared::Packet::new());
+                match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } {
+                    stream::UpSuccess | stream::UpDisconnected => (b, None),
+                    stream::UpWoke(task) => (b, Some(task)),
                 }
             }
-            true
-        }
-    }
-}
+            Shared(ref p) => {
+                unsafe { (*p.get()).clone_chan(); }
+                return Chan::my_new(Shared(p.clone()));
+            }
+        };
+
+        unsafe {
+            (*packet.get()).inherit_blocker(sleeper);
 
-impl<T: Send> Clone for SharedChan<T> {
-    fn clone(&self) -> SharedChan<T> {
-        unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); }
-        SharedChan { inner: self.inner.clone(), marker: marker::NoFreeze }
+            let mut tmp = Chan::my_new(Shared(packet.clone()));
+            util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
+        }
+        Chan::my_new(Shared(packet))
     }
 }
 
 #[unsafe_destructor]
-impl<T: Send> Drop for SharedChan<T> {
+impl<T: Send> Drop for Chan<T> {
     fn drop(&mut self) {
-        unsafe { (*self.inner.get()).packet.drop_chan(); }
+        match self.inner {
+            Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+        }
     }
 }
 
 impl<T: Send> Port<T> {
+    fn my_new(inner: Flavor<T>) -> Port<T> {
+        Port { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze }
+    }
+
     /// Blocks waiting for a value on this port
     ///
     /// This function will block if necessary to wait for a corresponding send
@@ -758,100 +519,45 @@ impl<T: Send> Port<T> {
     ///
     /// This function cannot fail.
     pub fn try_recv(&self) -> TryRecvResult<T> {
-        self.try_recv_inc(true)
-    }
-
-    fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
-        // This is a "best effort" situation, so if a queue is inconsistent just
-        // don't worry about it.
-        let this = unsafe { cast::transmute_mut(self) };
-
-        // See the comment about yielding on sends, but the same applies here.
-        // If a thread is spinning in try_recv we should try
-        {
-            let packet = this.inner.packet();
-            packet.recv_cnt += 1;
-            if packet.recv_cnt % RESCHED_FREQ == 0 {
-                let task: ~Task = Local::take();
-                task.maybe_yield();
-            }
+        // If a thread is spinning in try_recv, we should take the opportunity
+        // to reschedule things occasionally. See notes above in scheduling on
+        // sends for why this doesn't always hit TLS.
+        let cnt = self.receives.get() + 1;
+        self.receives.set(cnt);
+        if cnt % (RESCHED_FREQ as uint) == 0 {
+            let task: ~Task = Local::take();
+            task.maybe_yield();
         }
 
-        let ret = match this.inner {
-            Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() },
-            Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } {
-                mpsc::Data(t) => Some(t),
-                mpsc::Empty => None,
-
-                // This is a bit of an interesting case. The channel is
-                // reported as having data available, but our pop() has
-                // failed due to the queue being in an inconsistent state.
-                // This means that there is some pusher somewhere which has
-                // yet to complete, but we are guaranteed that a pop will
-                // eventually succeed. In this case, we spin in a yield loop
-                // because the remote sender should finish their enqueue
-                // operation "very quickly".
-                //
-                // Note that this yield loop does *not* attempt to do a green
-                // yield (regardless of the context), but *always* performs an
-                // OS-thread yield. The reasoning for this is that the pusher in
-                // question which is causing the inconsistent state is
-                // guaranteed to *not* be a blocked task (green tasks can't get
-                // pre-empted), so it must be on a different OS thread. Also,
-                // `try_recv` is normally a "guaranteed no rescheduling" context
-                // in a green-thread situation. By yielding control of the
-                // thread, we will hopefully allow time for the remote task on
-                // the other OS thread to make progress.
-                //
-                // Avoiding this yield loop would require a different queue
-                // abstraction which provides the guarantee that after M
-                // pushes have succeeded, at least M pops will succeed. The
-                // current queues guarantee that if there are N active
-                // pushes, you can pop N times once all N have finished.
-                mpsc::Inconsistent => {
-                    let data;
-                    loop {
-                        Thread::yield_now();
-                        match unsafe { (*arc.get()).queue.pop() } {
-                            mpsc::Data(t) => { data = t; break }
-                            mpsc::Empty => fail!("inconsistent => empty"),
-                            mpsc::Inconsistent => {}
-                        }
+        loop {
+            let mut new_port = match self.inner {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Data(t),
+                        Err(oneshot::Empty) => return Empty,
+                        Err(oneshot::Disconnected) => return Disconnected,
+                        Err(oneshot::Upgraded(port)) => port,
                     }
-                    Some(data)
                 }
-            }
-        };
-        if increment && ret.is_some() {
-            this.inner.packet().steals += 1;
-        }
-        match ret {
-            Some(t) => Data(t),
-            None => {
-                // It's possible that between the time that we saw the queue was
-                // empty and here the other side disconnected. It's also
-                // possible for us to see the disconnection here while there is
-                // data in the queue. It's pretty backwards-thinking to return
-                // Disconnected when there's actually data on the queue, so if
-                // we see a disconnected state be sure to check again to be 100%
-                // sure that there's no data in the queue.
-                let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
-                if cnt != DISCONNECTED { return Empty }
-
-                let ret = match this.queue {
-                    SPSC(ref mut queue) => queue.pop(),
-                    MPSC(ref mut queue) => match queue.pop() {
-                        mpsc::Data(t) => Some(t),
-                        mpsc::Empty => None,
-                        mpsc::Inconsistent => {
-                            fail!("inconsistent with no senders?!");
-                        }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Data(t),
+                        Err(stream::Empty) => return Empty,
+                        Err(stream::Disconnected) => return Disconnected,
+                        Err(stream::Upgraded(port)) => port,
+                    }
+                }
+                Shared(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Data(t),
+                        Err(shared::Empty) => return Empty,
+                        Err(shared::Disconnected) => return Disconnected,
                     }
-                };
-                match ret {
-                    Some(data) => Data(data),
-                    None => Disconnected,
                 }
+            };
+            unsafe {
+                util::swap(&mut cast::transmute_mut(self).inner,
+                           &mut new_port.inner);
             }
         }
     }
@@ -869,34 +575,36 @@ impl<T: Send> Port<T> {
     /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
     /// the value found on the port is returned.
     pub fn recv_opt(&self) -> Option<T> {
-        // optimistic preflight check (scheduling is expensive)
-        match self.try_recv() {
-            Empty => {},
-            Disconnected => return None,
-            Data(t) => return Some(t),
-        }
-
-        let packet;
-        let this;
-        unsafe {
-            this = cast::transmute_mut(self);
-            packet = this.inner.packet();
-            let task: ~Task = Local::take();
-            task.deschedule(1, |task| {
-                assert!((*packet).to_wake.is_none());
-                (*packet).to_wake = Some(task);
-                if (*packet).decrement() {
-                    Ok(())
-                } else {
-                    Err((*packet).to_wake.take_unwrap())
+        loop {
+            let mut new_port = match self.inner {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Some(t),
+                        Err(oneshot::Empty) => return unreachable!(),
+                        Err(oneshot::Disconnected) => return None,
+                        Err(oneshot::Upgraded(port)) => port,
+                    }
                 }
-            });
-        }
-
-        match self.try_recv_inc(false) {
-            Data(t) => Some(t),
-            Empty => fail!("bug: woke up too soon"),
-            Disconnected => None,
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Some(t),
+                        Err(stream::Empty) => return unreachable!(),
+                        Err(stream::Disconnected) => return None,
+                        Err(stream::Upgraded(port)) => port,
+                    }
+                }
+                Shared(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Some(t),
+                        Err(shared::Empty) => return unreachable!(),
+                        Err(shared::Disconnected) => return None,
+                    }
+                }
+            };
+            unsafe {
+                util::swap(&mut cast::transmute_mut(self).inner,
+                           &mut new_port.inner);
+            }
         }
     }
 
@@ -907,6 +615,84 @@ impl<T: Send> Port<T> {
     }
 }
 
+impl<T: Send> select::Packet for Port<T> {
+    fn can_recv(&self) -> bool {
+        loop {
+            let mut new_port = match self.inner {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).can_recv() } {
+                        Ok(ret) => return ret,
+                        Err(upgrade) => upgrade,
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).can_recv() } {
+                        Ok(ret) => return ret,
+                        Err(upgrade) => upgrade,
+                    }
+                }
+                Shared(ref p) => {
+                    return unsafe { (*p.get()).can_recv() };
+                }
+            };
+            unsafe {
+                util::swap(&mut cast::transmute_mut(self).inner,
+                           &mut new_port.inner);
+            }
+        }
+    }
+
+    fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
+        loop {
+            let (t, mut new_port) = match self.inner {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).start_selection(task) } {
+                        oneshot::SelSuccess => return Ok(()),
+                        oneshot::SelCanceled(task) => return Err(task),
+                        oneshot::SelUpgraded(t, port) => (t, port),
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).start_selection(task) } {
+                        stream::SelSuccess => return Ok(()),
+                        stream::SelCanceled(task) => return Err(task),
+                        stream::SelUpgraded(t, port) => (t, port),
+                    }
+                }
+                Shared(ref p) => {
+                    return unsafe { (*p.get()).start_selection(task) };
+                }
+            };
+            task = t;
+            unsafe {
+                util::swap(&mut cast::transmute_mut(self).inner,
+                           &mut new_port.inner);
+            }
+        }
+    }
+
+    fn abort_selection(&self) -> bool {
+        let mut was_upgrade = false;
+        loop {
+            let result = match self.inner {
+                Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
+                Stream(ref p) => unsafe {
+                    (*p.get()).abort_selection(was_upgrade)
+                },
+                Shared(ref p) => return unsafe {
+                    (*p.get()).abort_selection(was_upgrade)
+                },
+            };
+            let mut new_port = match result { Ok(b) => return b, Err(p) => p };
+            was_upgrade = true;
+            unsafe {
+                util::swap(&mut cast::transmute_mut(self).inner,
+                           &mut new_port.inner);
+            }
+        }
+    }
+}
+
 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
     fn next(&mut self) -> Option<T> { self.port.recv_opt() }
 }
@@ -914,10 +700,11 @@ impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
 #[unsafe_destructor]
 impl<T: Send> Drop for Port<T> {
     fn drop(&mut self) {
-        // All we need to do is store that we're disconnected. If the channel
-        // half has already disconnected, then we'll just deallocate everything
-        // when the shared packet is deallocated.
-        self.inner.packet().cnt.store(DISCONNECTED, SeqCst);
+        match self.inner {
+            Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
+        }
     }
 }
 
@@ -948,12 +735,12 @@ mod test {
     })
 
     test!(fn drop_full_shared() {
-        let (_p, c) = SharedChan::new();
+        let (_p, c) = Chan::new();
         c.send(~1);
     })
 
     test!(fn smoke_shared() {
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         c.send(1);
         assert_eq!(p.recv(), 1);
         let c = c.clone();
@@ -976,13 +763,13 @@ mod test {
     } #[should_fail])
 
     test!(fn smoke_shared_port_gone() {
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         drop(p);
         c.send(1);
     } #[should_fail])
 
     test!(fn smoke_shared_port_gone2() {
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         drop(p);
         let c2 = c.clone();
         drop(c);
@@ -998,7 +785,7 @@ mod test {
     } #[should_fail])
 
     test!(fn port_gone_concurrent_shared() {
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         let c1 = c.clone();
         spawn(proc() {
             p.recv();
@@ -1016,7 +803,7 @@ mod test {
     } #[should_fail])
 
     test!(fn smoke_chan_gone_shared() {
-        let (p, c) = SharedChan::<()>::new();
+        let (p, c) = Chan::<()>::new();
         let c2 = c.clone();
         drop(c);
         drop(c2);
@@ -1045,7 +832,7 @@ mod test {
     test!(fn stress_shared() {
         static AMT: uint = 10000;
         static NTHREADS: uint = 8;
-        let (p, c) = SharedChan::<int>::new();
+        let (p, c) = Chan::<int>::new();
         let (p1, c1) = Chan::new();
 
         spawn(proc() {
@@ -1072,7 +859,7 @@ mod test {
     fn send_from_outside_runtime() {
         let (p, c) = Chan::<int>::new();
         let (p1, c1) = Chan::new();
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         let chan2 = chan.clone();
         spawn(proc() {
             c1.send(());
@@ -1112,7 +899,7 @@ mod test {
     fn no_runtime() {
         let (p1, c1) = Chan::<int>::new();
         let (p2, c2) = Chan::<int>::new();
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         let chan2 = chan.clone();
         native::task::spawn(proc() {
             assert_eq!(p1.recv(), 1);
@@ -1315,7 +1102,7 @@ mod test {
     })
 
     test!(fn shared_chan_stress() {
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         let total = stress_factor() + 100;
         for _ in range(0, total) {
             let chan_clone = chan.clone();
@@ -1394,4 +1181,26 @@ mod test {
         p2.recv();
         assert_eq!(p.try_recv(), Disconnected);
     })
+
+    // This bug used to end up in a livelock inside of the Port destructor
+    // because the internal state of the Shared port was corrupted
+    test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+        let (p, c) = Chan::new();
+        let (p1, c2) = Chan::new();
+        spawn(proc() {
+            p.recv(); // wait on a oneshot port
+            drop(p);  // destroy a shared port
+            c2.send(());
+        });
+        // make sure the other task has gone to sleep
+        for _ in range(0, 5000) { task::deschedule(); }
+
+        // upgrade to a shared chan and send a message
+        let t = c.clone();
+        drop(c);
+        t.send(());
+
+        // wait for the child task to exit before we exit
+        p1.recv();
+    })
 }
diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs
new file mode 100644
index 00000000000..e58405ebe2d
--- /dev/null
+++ b/src/libstd/comm/oneshot.rs
@@ -0,0 +1,382 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Oneshot channels/ports
+///
+/// This is the initial flavor of channels/ports used for comm module. This is
+/// an optimization for the one-use case of a channel. The major optimization of
+/// this type is to have one and exactly one allocation when the chan/port pair
+/// is created.
+///
+/// Another possible optimization would be to not use an UnsafeArc box because
+/// in theory we know when the shared packet can be deallocated (no real need
+/// for the atomic reference counting), but I was having trouble how to destroy
+/// the data early in a drop of a Port.
+///
+/// # Implementation
+///
+/// Oneshots are implemented around one atomic uint variable. This variable
+/// indicates both the state of the port/chan but also contains any tasks
+/// blocked on the port. All atomic operations happen on this one word.
+///
+/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
+/// on behalf of the channel side of things (it can be mentally thought of as
+/// consuming the port). This upgrade is then also stored in the shared packet.
+/// The one caveat to consider is that when a port sees a disconnected channel
+/// it must check for data because there is no "data plus upgrade" state.
+
+use comm::Port;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use result::{Result, Ok, Err};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use sync::atomics;
+use util;
+
+// Various states you can find a port in.
+static EMPTY: uint = 0;
+static DATA: uint = 1;
+static DISCONNECTED: uint = 2;
+
+pub struct Packet<T> {
+    // Internal state of the chan/port pair (stores the blocked task as well)
+    state: atomics::AtomicUint,
+    // One-shot data slot location
+    data: Option<T>,
+    // when used for the second time, a oneshot channel must be upgraded, and
+    // this contains the slot for the upgrade
+    upgrade: MyUpgrade<T>,
+}
+
+pub enum Failure<T> {
+    Empty,
+    Disconnected,
+    Upgraded(Port<T>),
+}
+
+pub enum UpgradeResult {
+    UpSuccess,
+    UpDisconnected,
+    UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+    SelCanceled(BlockedTask),
+    SelUpgraded(BlockedTask, Port<T>),
+    SelSuccess,
+}
+
+enum MyUpgrade<T> {
+    NothingSent,
+    SendUsed,
+    GoUp(Port<T>),
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new() -> Packet<T> {
+        Packet {
+            data: None,
+            upgrade: NothingSent,
+            state: atomics::AtomicUint::new(EMPTY),
+        }
+    }
+
+    pub fn send(&mut self, t: T) -> bool {
+        // Sanity check
+        match self.upgrade {
+            NothingSent => {}
+            _ => fail!("sending on a oneshot that's already sent on "),
+        }
+        assert!(self.data.is_none());
+        self.data = Some(t);
+        self.upgrade = SendUsed;
+
+        // This atomic swap uses a "Release" memory ordering to ensure that all
+        // our previous memory writes are visible to the other thread (notably
+        // the write of data/upgrade)
+        match self.state.swap(DATA, atomics::Release) {
+            // Sent the data, no one was waiting
+            EMPTY => true,
+
+            // Couldn't send the data, the port hung up first. We need to be
+            // sure to deallocate the sent data (to not leave it stuck in the
+            // queue)
+            DISCONNECTED => {
+                self.data.take_unwrap();
+                false
+            }
+
+            // Not possible, these are one-use channels
+            DATA => unreachable!(),
+
+            // Anything else means that there was a task waiting on the other
+            // end. We leave the 'DATA' state inside so it'll pick it up on the
+            // other end.
+            n => unsafe {
+                let t = BlockedTask::cast_from_uint(n);
+                t.wake().map(|t| t.reawaken());
+                true
+            }
+        }
+    }
+
+    // Just tests whether this channel has been sent on or not, this is only
+    // safe to use from the sender.
+    pub fn sent(&self) -> bool {
+        match self.upgrade {
+            NothingSent => false,
+            _ => true,
+        }
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure<T>> {
+        // Attempt to not block the task (it's a little expensive). If it looks
+        // like we're not empty, then immediately go through to `try_recv`.
+        //
+        // These atomics use an Acquire memory ordering in order to have all the
+        // previous writes of the releasing thread visible to us.
+        if self.state.load(atomics::Acquire) == EMPTY {
+            let t: ~Task = Local::take();
+            t.deschedule(1, |task| {
+                let n = unsafe { task.cast_to_uint() };
+                match self.state.compare_and_swap(EMPTY, n, atomics::Acquire) {
+                    // Nothing on the channel, we legitimately block
+                    EMPTY => Ok(()),
+
+                    // If there's data or it's a disconnected channel, then we
+                    // failed the cmpxchg, so we just wake ourselves back up
+                    DATA | DISCONNECTED => {
+                        unsafe { Err(BlockedTask::cast_from_uint(n)) }
+                    }
+
+                    // Only one thread is allowed to sleep on this port
+                    _ => unreachable!()
+                }
+            });
+        }
+
+        self.try_recv()
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+        // see above for why Acquire is used.
+        match self.state.load(atomics::Acquire) {
+            EMPTY => Err(Empty),
+
+            // We saw some data on the channel, but the channel can be used
+            // again to send us an upgrade. As a result, we need to re-insert
+            // into the channel that there's no data available (otherwise we'll
+            // just see DATA next time). This is done as a cmpxchg because if
+            // the state changes under our feet we'd rather just see that state
+            // change.
+            DATA => {
+                self.state.compare_and_swap(DATA, EMPTY, atomics::Acquire);
+                match self.data.take() {
+                    Some(data) => Ok(data),
+                    None => unreachable!(),
+                }
+            }
+
+            // There's no guarantee that we receive before an upgrade happens,
+            // and an upgrade flags the channel as disconnected, so when we see
+            // this we first need to check if there's data available and *then*
+            // we go through and process the upgrade.
+            DISCONNECTED => {
+                match self.data.take() {
+                    Some(data) => Ok(data),
+                    None => {
+                        match util::replace(&mut self.upgrade, SendUsed) {
+                            SendUsed | NothingSent => Err(Disconnected),
+                            GoUp(upgrade) => Err(Upgraded(upgrade))
+                        }
+                    }
+                }
+            }
+            _ => unreachable!()
+        }
+    }
+
+    // Returns whether the upgrade was completed. If the upgrade wasn't
+    // completed, then the port couldn't get sent to the other half (it will
+    // never receive it).
+    pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult {
+        let prev = match self.upgrade {
+            NothingSent => NothingSent,
+            SendUsed => SendUsed,
+            _ => fail!("upgrading again"),
+        };
+        self.upgrade = GoUp(up);
+
+        // Use a Release memory ordering in order to make sure that our write to
+        // `upgrade` is visible to the other thread.
+        match self.state.swap(DISCONNECTED, atomics::Release) {
+            // If the channel is empty or has data on it, then we're good to go.
+            // Senders will check the data before the upgrade (in case we
+            // plastered over the DATA state).
+            DATA | EMPTY => UpSuccess,
+
+            // If the other end is already disconnected, then we failed the
+            // upgrade. Be sure to trash the port we were given.
+            DISCONNECTED => { self.upgrade = prev; UpDisconnected }
+
+            // If someone's waiting, we gotta wake them up
+            n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
+        }
+    }
+
+    pub fn drop_chan(&mut self) {
+        match self.state.swap(DISCONNECTED, atomics::SeqCst) {
+            DATA | DISCONNECTED | EMPTY => {}
+
+            // If someone's waiting, we gotta wake them up
+            n => unsafe {
+                let t = BlockedTask::cast_from_uint(n);
+                t.wake().map(|t| t.reawaken());
+            }
+        }
+    }
+
+    pub fn drop_port(&mut self) {
+        // Use an Acquire memory ordering in order to see the data that the
+        // senders are sending.
+        match self.state.swap(DISCONNECTED, atomics::Acquire) {
+            // An empty channel has nothing to do, and a remotely disconnected
+            // channel also has nothing to do b/c we're about to run the drop
+            // glue
+            DISCONNECTED | EMPTY => {}
+
+            // There's data on the channel, so make sure we destroy it promptly.
+            // This is why not using an arc is a little difficult (need the box
+            // to stay valid while we take the data).
+            DATA => { self.data.take_unwrap(); }
+
+            // We're the only ones that can block on this port
+            _ => unreachable!()
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // If Ok, the value is whether this port has data, if Err, then the upgraded
+    // port needs to be checked instead of this one.
+    pub fn can_recv(&mut self) -> Result<bool, Port<T>> {
+        // Use Acquire so we can see all previous memory writes
+        match self.state.load(atomics::Acquire) {
+            EMPTY => Ok(false), // Welp, we tried
+            DATA => Ok(true),   // we have some un-acquired data
+            DISCONNECTED if self.data.is_some() => Ok(true), // we have data
+            DISCONNECTED => {
+                match util::replace(&mut self.upgrade, SendUsed) {
+                    // The other end sent us an upgrade, so we need to
+                    // propagate upwards whether the upgrade can receive
+                    // data
+                    GoUp(upgrade) => Err(upgrade),
+
+                    // If the other end disconnected without sending an
+                    // upgrade, then we have data to receive (the channel is
+                    // disconnected).
+                    up => { self.upgrade = up; Ok(true) }
+                }
+            }
+            _ => unreachable!(), // we're the "one blocker"
+        }
+    }
+
+    // Attempts to start selection on this port. This can either succeed, fail
+    // because there is data, or fail because there is an upgrade pending.
+    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+        let n = unsafe { task.cast_to_uint() };
+        match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
+            EMPTY => SelSuccess,
+            DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
+            DISCONNECTED if self.data.is_some() => {
+                SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+            }
+            DISCONNECTED => {
+                match util::replace(&mut self.upgrade, SendUsed) {
+                    // The other end sent us an upgrade, so we need to
+                    // propagate upwards whether the upgrade can receive
+                    // data
+                    GoUp(upgrade) => {
+                        SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
+                                    upgrade)
+                    }
+
+                    // If the other end disconnected without sending an
+                    // upgrade, then we have data to receive (the channel is
+                    // disconnected).
+                    up => {
+                        self.upgrade = up;
+                        SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+                    }
+                }
+            }
+            _ => unreachable!(), // we're the "one blocker"
+        }
+    }
+
+    // Remove a previous selecting task from this port. This ensures that the
+    // blocked task will no longer be visible to any other threads.
+    //
+    // The return value indicates whether there's data on this port.
+    pub fn abort_selection(&mut self) -> Result<bool, Port<T>> {
+        // use Acquire to make sure we see all previous memory writes
+        let state = match self.state.load(atomics::Acquire) {
+            // Each of these states means that no further activity will happen
+            // with regard to abortion selection
+            s @ EMPTY |
+            s @ DATA |
+            s @ DISCONNECTED => s,
+
+            // If we've got a blocked task, then use an atomic to gain ownership
+            // of it (may fail)
+            n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst)
+        };
+
+        // Now that we've got ownership of our state, figure out what to do
+        // about it.
+        match state {
+            EMPTY => unreachable!(),
+            // our task used for select was stolen
+            DATA => Ok(true),
+
+            // If the other end has hung up, then we have complete ownership
+            // of the port. We need to check to see if there was an upgrade
+            // requested, and if so, the other end needs to have its selection
+            // aborted.
+            DISCONNECTED => {
+                assert!(self.data.is_none());
+                match util::replace(&mut self.upgrade, SendUsed) {
+                    GoUp(port) => Err(port),
+                    _ => Ok(true),
+                }
+            }
+
+            // We woke ourselves up from select. Assert that the task should be
+            // trashed and returne that we don't have any data.
+            n => {
+                let t = unsafe { BlockedTask::cast_from_uint(n) };
+                t.trash();
+                Ok(false)
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED);
+    }
+}
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
index cf8df863817..b6b35ccc357 100644
--- a/src/libstd/comm/select.rs
+++ b/src/libstd/comm/select.rs
@@ -45,19 +45,17 @@
 #[allow(dead_code)];
 
 use cast;
-use comm;
+use cell::Cell;
 use iter::Iterator;
 use kinds::marker;
 use kinds::Send;
 use ops::Drop;
 use option::{Some, None, Option};
 use ptr::RawPtr;
-use result::{Ok, Err};
+use result::{Ok, Err, Result};
 use rt::local::Local;
-use rt::task::Task;
-use super::{Packet, Port};
-use sync::atomics::{Relaxed, SeqCst};
-use task;
+use rt::task::{Task, BlockedTask};
+use super::Port;
 use uint;
 
 macro_rules! select {
@@ -67,8 +65,12 @@ macro_rules! select {
     ) => ({
         use std::comm::Select;
         let sel = Select::new();
-        let mut $port1 = sel.add(&mut $port1);
-        $( let mut $port = sel.add(&mut $port); )*
+        let mut $port1 = sel.handle(&$port1);
+        $( let mut $port = sel.handle(&$port); )*
+        unsafe {
+            $port1.add();
+            $( $port.add(); )*
+        }
         let ret = sel.wait();
         if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 }
         $( else if ret == $port.id { let $name = $port.$meth(); $code } )*
@@ -79,9 +81,9 @@ macro_rules! select {
 /// The "port set" of the select interface. This structure is used to manage a
 /// set of ports which are being selected over.
 pub struct Select {
-    priv head: *mut Packet,
-    priv tail: *mut Packet,
-    priv next_id: uint,
+    priv head: *mut Handle<'static, ()>,
+    priv tail: *mut Handle<'static, ()>,
+    priv next_id: Cell<uint>,
     priv marker1: marker::NoSend,
     priv marker2: marker::NoFreeze,
 }
@@ -90,13 +92,28 @@ pub struct Select {
 /// This handle is used to keep the port in the set as well as interact with the
 /// underlying port.
 pub struct Handle<'port, T> {
-    /// A unique ID for this Handle.
+    /// The ID of this handle, used to compare against the return value of
+    /// `Select::wait()`
     id: uint,
     priv selector: &'port Select,
-    priv port: &'port mut Port<T>,
+    priv next: *mut Handle<'static, ()>,
+    priv prev: *mut Handle<'static, ()>,
+    priv added: bool,
+    priv packet: &'port Packet,
+
+    // due to our fun transmutes, we be sure to place this at the end. (nothing
+    // previous relies on T)
+    priv port: &'port Port<T>,
 }
 
-struct Packets { cur: *mut Packet }
+struct Packets { cur: *mut Handle<'static, ()> }
+
+#[doc(hidden)]
+pub trait Packet {
+    fn can_recv(&self) -> bool;
+    fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
+    fn abort_selection(&self) -> bool;
+}
 
 impl Select {
     /// Creates a new selection structure. This set is initially empty and
@@ -106,45 +123,29 @@ impl Select {
     /// rather much easier through the `select!` macro.
     pub fn new() -> Select {
         Select {
-            head: 0 as *mut Packet,
-            tail: 0 as *mut Packet,
-            next_id: 1,
             marker1: marker::NoSend,
             marker2: marker::NoFreeze,
+            head: 0 as *mut Handle<'static, ()>,
+            tail: 0 as *mut Handle<'static, ()>,
+            next_id: Cell::new(1),
         }
     }
 
-    /// Adds a new port to this set, returning a handle which is then used to
-    /// receive on the port.
-    ///
-    /// Note that this port parameter takes `&mut Port` instead of `&Port`. None
-    /// of the methods of receiving on a port require `&mut self`, but `&mut` is
-    /// used here in order to have the compiler guarantee that the same port is
-    /// not added to this set more than once.
-    ///
-    /// When the returned handle falls out of scope, the port will be removed
-    /// from this set. While the handle is in this set, usage of the port can be
-    /// done through the `Handle`'s receiving methods.
-    pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> {
-        let this = unsafe { cast::transmute_mut(self) };
-        let id = this.next_id;
-        this.next_id += 1;
-        unsafe {
-            let packet = port.inner.packet();
-            assert!(!(*packet).selecting.load(Relaxed));
-            assert_eq!((*packet).selection_id, 0);
-            (*packet).selection_id = id;
-            if this.head.is_null() {
-                this.head = packet as *mut Packet;
-                this.tail = packet as *mut Packet;
-            } else {
-                (*packet).select_prev = this.tail;
-                assert!((*packet).select_next.is_null());
-                (*this.tail).select_next = packet as *mut Packet;
-                this.tail = packet as *mut Packet;
-            }
+    /// Creates a new handle into this port set for a new port. Note that this
+    /// does *not* add the port to the port set, for that you must call the
+    /// `add` method on the handle itself.
+    pub fn handle<'a, T: Send>(&'a self, port: &'a Port<T>) -> Handle<'a, T> {
+        let id = self.next_id.get();
+        self.next_id.set(id + 1);
+        Handle {
+            id: id,
+            selector: self,
+            next: 0 as *mut Handle<'static, ()>,
+            prev: 0 as *mut Handle<'static, ()>,
+            added: false,
+            port: port,
+            packet: port,
         }
-        Handle { id: id, selector: this, port: port }
     }
 
     /// Waits for an event on this port set. The returned valus is *not* and
@@ -177,10 +178,9 @@ impl Select {
         unsafe {
             let mut amt = 0;
             for p in self.iter() {
-                assert!(!(*p).selecting.load(Relaxed));
                 amt += 1;
-                if (*p).can_recv() {
-                    return (*p).selection_id;
+                if (*p).packet.can_recv() {
+                    return (*p).id;
                 }
             }
             assert!(amt > 0);
@@ -195,22 +195,14 @@ impl Select {
             let task: ~Task = Local::take();
             task.deschedule(amt, |task| {
                 // Prepare for the block
-                let (i, packet) = iter.next().unwrap();
-                assert!((*packet).to_wake.is_none());
-                (*packet).to_wake = Some(task);
-                (*packet).selecting.store(true, SeqCst);
-
-                if (*packet).decrement() {
-                    Ok(())
-                } else {
-                    // Empty to_wake first to avoid tripping an assertion in
-                    // abort_selection in the disconnected case.
-                    let task = (*packet).to_wake.take_unwrap();
-                    (*packet).abort_selection(false);
-                    (*packet).selecting.store(false, SeqCst);
-                    ready_index = i;
-                    ready_id = (*packet).selection_id;
-                    Err(task)
+                let (i, handle) = iter.next().unwrap();
+                match (*handle).packet.start_selection(task) {
+                    Ok(()) => Ok(()),
+                    Err(task) => {
+                        ready_index = i;
+                        ready_id = (*handle).id;
+                        Err(task)
+                    }
                 }
             });
 
@@ -235,45 +227,17 @@ impl Select {
             // A rewrite should focus on avoiding a yield loop, and for now this
             // implementation is tying us over to a more efficient "don't
             // iterate over everything every time" implementation.
-            for packet in self.iter().take(ready_index) {
-                if (*packet).abort_selection(true) {
-                    ready_id = (*packet).selection_id;
-                    while (*packet).selecting.load(Relaxed) {
-                        task::deschedule();
-                    }
+            for handle in self.iter().take(ready_index) {
+                if (*handle).packet.abort_selection() {
+                    ready_id = (*handle).id;
                 }
             }
 
-            // Sanity check for now to make sure that everyone is turned off.
-            for packet in self.iter() {
-                assert!(!(*packet).selecting.load(Relaxed));
-            }
-
             assert!(ready_id != uint::MAX);
             return ready_id;
         }
     }
 
-    unsafe fn remove(&self, packet: *mut Packet) {
-        let this = cast::transmute_mut(self);
-        assert!(!(*packet).selecting.load(Relaxed));
-        if (*packet).select_prev.is_null() {
-            assert_eq!(packet, this.head);
-            this.head = (*packet).select_next;
-        } else {
-            (*(*packet).select_prev).select_next = (*packet).select_next;
-        }
-        if (*packet).select_next.is_null() {
-            assert_eq!(packet, this.tail);
-            this.tail = (*packet).select_prev;
-        } else {
-            (*(*packet).select_next).select_prev = (*packet).select_prev;
-        }
-        (*packet).select_next = 0 as *mut Packet;
-        (*packet).select_prev = 0 as *mut Packet;
-        (*packet).selection_id = 0;
-    }
-
     fn iter(&self) -> Packets { Packets { cur: self.head } }
 }
 
@@ -285,10 +249,56 @@ impl<'port, T: Send> Handle<'port, T> {
     /// success or `None` if the channel disconnects. This function has the same
     /// semantics as `Port.recv_opt`
     pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
-    /// Immediately attempt to receive a value on a port, this function will
-    /// never block. Has the same semantics as `Port.try_recv`.
-    pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
-        self.port.try_recv()
+
+    /// Adds this handle to the port set that the handle was created from. This
+    /// method can be called multiple times, but it has no effect if `add` was
+    /// called previously.
+    ///
+    /// This method is unsafe because it requires that the `Handle` is not moved
+    /// while it is added to the `Select` set.
+    pub unsafe fn add(&mut self) {
+        if self.added { return }
+        let selector: &mut Select = cast::transmute(&*self.selector);
+        let me: *mut Handle<'static, ()> = cast::transmute(&*self);
+
+        if selector.head.is_null() {
+            selector.head = me;
+            selector.tail = me;
+        } else {
+            (*me).prev = selector.tail;
+            assert!((*me).next.is_null());
+            (*selector.tail).next = me;
+            selector.tail = me;
+        }
+        self.added = true;
+    }
+
+    /// Removes this handle from the `Select` set. This method is unsafe because
+    /// it has no guarantee that the `Handle` was not moved since `add` was
+    /// called.
+    pub unsafe fn remove(&mut self) {
+        if !self.added { return }
+
+        let selector: &mut Select = cast::transmute(&*self.selector);
+        let me: *mut Handle<'static, ()> = cast::transmute(&*self);
+
+        if self.prev.is_null() {
+            assert_eq!(selector.head, me);
+            selector.head = self.next;
+        } else {
+            (*self.prev).next = self.next;
+        }
+        if self.next.is_null() {
+            assert_eq!(selector.tail, me);
+            selector.tail = self.prev;
+        } else {
+            (*self.next).prev = self.prev;
+        }
+
+        self.next = 0 as *mut Handle<'static, ()>;
+        self.prev = 0 as *mut Handle<'static, ()>;
+
+        self.added = false;
     }
 }
 
@@ -303,17 +313,17 @@ impl Drop for Select {
 #[unsafe_destructor]
 impl<'port, T: Send> Drop for Handle<'port, T> {
     fn drop(&mut self) {
-        unsafe { self.selector.remove(self.port.inner.packet()) }
+        unsafe { self.remove() }
     }
 }
 
-impl Iterator<*mut Packet> for Packets {
-    fn next(&mut self) -> Option<*mut Packet> {
+impl Iterator<*mut Handle<'static, ()>> for Packets {
+    fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
         if self.cur.is_null() {
             None
         } else {
             let ret = Some(self.cur);
-            unsafe { self.cur = (*self.cur).select_next; }
+            unsafe { self.cur = (*self.cur).next; }
             ret
         }
     }
@@ -326,8 +336,8 @@ mod test {
     use prelude::*;
 
     test!(fn smoke() {
-        let (mut p1, c1) = Chan::<int>::new();
-        let (mut p2, c2) = Chan::<int>::new();
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, c2) = Chan::<int>::new();
         c1.send(1);
         select! (
             foo = p1.recv() => { assert_eq!(foo, 1); },
@@ -350,11 +360,11 @@ mod test {
     })
 
     test!(fn smoke2() {
-        let (mut p1, _c1) = Chan::<int>::new();
-        let (mut p2, _c2) = Chan::<int>::new();
-        let (mut p3, _c3) = Chan::<int>::new();
-        let (mut p4, _c4) = Chan::<int>::new();
-        let (mut p5, c5) = Chan::<int>::new();
+        let (p1, _c1) = Chan::<int>::new();
+        let (p2, _c2) = Chan::<int>::new();
+        let (p3, _c3) = Chan::<int>::new();
+        let (p4, _c4) = Chan::<int>::new();
+        let (p5, c5) = Chan::<int>::new();
         c5.send(4);
         select! (
             _foo = p1.recv() => { fail!("1") },
@@ -366,8 +376,8 @@ mod test {
     })
 
     test!(fn closed() {
-        let (mut p1, _c1) = Chan::<int>::new();
-        let (mut p2, c2) = Chan::<int>::new();
+        let (p1, _c1) = Chan::<int>::new();
+        let (p2, c2) = Chan::<int>::new();
         drop(c2);
 
         select! (
@@ -377,8 +387,8 @@ mod test {
     })
 
     test!(fn unblocks() {
-        let (mut p1, c1) = Chan::<int>::new();
-        let (mut p2, _c2) = Chan::<int>::new();
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, _c2) = Chan::<int>::new();
         let (p3, c3) = Chan::<int>::new();
 
         spawn(proc() {
@@ -400,8 +410,8 @@ mod test {
     })
 
     test!(fn both_ready() {
-        let (mut p1, c1) = Chan::<int>::new();
-        let (mut p2, c2) = Chan::<int>::new();
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, c2) = Chan::<int>::new();
         let (p3, c3) = Chan::<()>::new();
 
         spawn(proc() {
@@ -426,8 +436,8 @@ mod test {
 
     test!(fn stress() {
         static AMT: int = 10000;
-        let (mut p1, c1) = Chan::<int>::new();
-        let (mut p2, c2) = Chan::<int>::new();
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, c2) = Chan::<int>::new();
         let (p3, c3) = Chan::<()>::new();
 
         spawn(proc() {
@@ -449,4 +459,66 @@ mod test {
             c3.send(());
         }
     })
+
+    test!(fn cloning() {
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, _c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        spawn(proc() {
+            p3.recv();
+            c1.clone();
+            assert_eq!(p3.try_recv(), Empty);
+            c1.send(2);
+            p3.recv();
+        });
+
+        c3.send(());
+        select!(
+            _i1 = p1.recv() => {},
+            _i2 = p2.recv() => fail!()
+        )
+        c3.send(());
+    })
+
+    test!(fn cloning2() {
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, _c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        spawn(proc() {
+            p3.recv();
+            c1.clone();
+            assert_eq!(p3.try_recv(), Empty);
+            c1.send(2);
+            p3.recv();
+        });
+
+        c3.send(());
+        select!(
+            _i1 = p1.recv() => {},
+            _i2 = p2.recv() => fail!()
+        )
+        c3.send(());
+    })
+
+    test!(fn cloning3() {
+        let (p1, c1) = Chan::<()>::new();
+        let (p2, c2) = Chan::<()>::new();
+        let (p, c) = Chan::new();
+        spawn(proc() {
+            let mut s = Select::new();
+            let mut h1 = s.handle(&p1);
+            let mut h2 = s.handle(&p2);
+            unsafe { h2.add(); }
+            unsafe { h1.add(); }
+            assert_eq!(s.wait(), h2.id);
+            c.send(());
+        });
+
+        for _ in range(0, 1000) { task::deschedule(); }
+        drop(c1.clone());
+        c2.send(());
+        p.recv();
+    })
 }
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
new file mode 100644
index 00000000000..30e061bb7b9
--- /dev/null
+++ b/src/libstd/comm/shared.rs
@@ -0,0 +1,483 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Shared channels
+///
+/// This is the flavor of channels which are not necessarily optimized for any
+/// particular use case, but are the most general in how they are used. Shared
+/// channels are cloneable allowing for multiple senders.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module. You'll also note that the implementation of the shared and stream
+/// channels are quite similar, and this is no coincidence!
+
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use result::{Ok, Err, Result};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use rt::thread::Thread;
+use sync::atomics;
+use unstable::mutex::Mutex;
+use vec::OwnedVector;
+
+use mpsc = sync::mpsc_queue;
+
+static DISCONNECTED: int = int::MIN;
+static FUDGE: int = 1024;
+static MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+    queue: mpsc::Queue<T>,
+    cnt: atomics::AtomicInt, // How many items are on this channel
+    steals: int, // How many times has a port received without blocking?
+    to_wake: atomics::AtomicUint, // Task to wake up
+
+    // The number of channels which are currently using this packet.
+    channels: atomics::AtomicInt,
+
+    // See the discussion in Port::drop and the channel send methods for what
+    // these are used for
+    port_dropped: atomics::AtomicBool,
+    sender_drain: atomics::AtomicInt,
+
+    // this lock protects various portions of this implementation during
+    // select()
+    select_lock: Mutex,
+}
+
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+impl<T: Send> Packet<T> {
+    // Creation of a packet *must* be followed by a call to inherit_blocker
+    pub fn new() -> Packet<T> {
+        let mut p = Packet {
+            queue: mpsc::Queue::new(),
+            cnt: atomics::AtomicInt::new(0),
+            steals: 0,
+            to_wake: atomics::AtomicUint::new(0),
+            channels: atomics::AtomicInt::new(2),
+            port_dropped: atomics::AtomicBool::new(false),
+            sender_drain: atomics::AtomicInt::new(0),
+            select_lock: unsafe { Mutex::new() },
+        };
+        // see comments in inherit_blocker about why we grab this lock
+        unsafe { p.select_lock.lock() }
+        return p;
+    }
+
+    // This function is used at the creation of a shared packet to inherit a
+    // previously blocked task. This is done to prevent spurious wakeups of
+    // tasks in select().
+    //
+    // This can only be called at channel-creation time
+    pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+        match task {
+            Some(task) => {
+                assert_eq!(self.cnt.load(atomics::SeqCst), 0);
+                assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+                self.to_wake.store(unsafe { task.cast_to_uint() },
+                                   atomics::SeqCst);
+                self.cnt.store(-1, atomics::SeqCst);
+
+                // This store is a little sketchy. What's happening here is
+                // that we're transferring a blocker from a oneshot or stream
+                // channel to this shared channel. In doing so, we never
+                // spuriously wake them up and rather only wake them up at the
+                // appropriate time. This implementation of shared channels
+                // assumes that any blocking recv() will undo the increment of
+                // steals performed in try_recv() once the recv is complete.
+                // This thread that we're inheriting, however, is not in the
+                // middle of recv. Hence, the first time we wake them up,
+                // they're going to wake up from their old port, move on to the
+                // upgraded port, and then call the block recv() function.
+                //
+                // When calling this function, they'll find there's data
+                // immediately available, counting it as a steal. This in fact
+                // wasn't a steal because we appropriately blocked them waiting
+                // for data.
+                //
+                // To offset this bad increment, we initially set the steal
+                // count to -1. You'll find some special code in
+                // abort_selection() as well to ensure that this -1 steal count
+                // doesn't escape too far.
+                self.steals = -1;
+            }
+            None => {}
+        }
+
+        // When the shared packet is constructed, we grabbed this lock. The
+        // purpose of this lock is to ensure that abort_selection() doesn't
+        // interfere with this method. After we unlock this lock, we're
+        // signifying that we're done modifying self.cnt and self.to_wake and
+        // the port is ready for the world to continue using it.
+        unsafe { self.select_lock.unlock() }
+    }
+
+    pub fn send(&mut self, t: T) -> bool {
+        // See Port::drop for what's going on
+        if self.port_dropped.load(atomics::SeqCst) { return false }
+
+        // Note that the multiple sender case is a little tricker
+        // semantically than the single sender case. The logic for
+        // incrementing is "add and if disconnected store disconnected".
+        // This could end up leading some senders to believe that there
+        // wasn't a disconnect if in fact there was a disconnect. This means
+        // that while one thread is attempting to re-store the disconnected
+        // states, other threads could walk through merrily incrementing
+        // this very-negative disconnected count. To prevent senders from
+        // spuriously attempting to send when the channels is actually
+        // disconnected, the count has a ranged check here.
+        //
+        // This is also done for another reason. Remember that the return
+        // value of this function is:
+        //
+        //  `true` == the data *may* be received, this essentially has no
+        //            meaning
+        //  `false` == the data will *never* be received, this has a lot of
+        //             meaning
+        //
+        // In the SPSC case, we have a check of 'queue.is_empty()' to see
+        // whether the data was actually received, but this same condition
+        // means nothing in a multi-producer context. As a result, this
+        // preflight check serves as the definitive "this will never be
+        // received". Once we get beyond this check, we have permanently
+        // entered the realm of "this may be received"
+        if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
+            return false
+        }
+
+        self.queue.push(t);
+        match self.cnt.fetch_add(1, atomics::SeqCst) {
+            -1 => {
+                self.take_to_wake().wake().map(|t| t.reawaken());
+            }
+
+            // In this case, we have possibly failed to send our data, and
+            // we need to consider re-popping the data in order to fully
+            // destroy it. We must arbitrate among the multiple senders,
+            // however, because the queues that we're using are
+            // single-consumer queues. In order to do this, all exiting
+            // pushers will use an atomic count in order to count those
+            // flowing through. Pushers who see 0 are required to drain as
+            // much as possible, and then can only exit when they are the
+            // only pusher (otherwise they must try again).
+            n if n < DISCONNECTED + FUDGE => {
+                // see the comment in 'try' for a shared channel for why this
+                // window of "not disconnected" is ok.
+                self.cnt.store(DISCONNECTED, atomics::SeqCst);
+
+                if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
+                    loop {
+                        // drain the queue, for info on the thread yield see the
+                        // discussion in try_recv
+                        loop {
+                            match self.queue.pop() {
+                                mpsc::Data(..) => {}
+                                mpsc::Empty => break,
+                                mpsc::Inconsistent => Thread::yield_now(),
+                            }
+                        }
+                        // maybe we're done, if we're not the last ones
+                        // here, then we need to go try again.
+                        if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
+                            break
+                        }
+                    }
+
+                    // At this point, there may still be data on the queue,
+                    // but only if the count hasn't been incremented and
+                    // some other sender hasn't finished pushing data just
+                    // yet. That sender in question will drain its own data.
+                }
+            }
+
+            // Can't make any assumptions about this case like in the SPSC case.
+            _ => {}
+        }
+
+        true
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure> {
+        // This code is essentially the exact same as that found in the stream
+        // case (see stream.rs)
+        match self.try_recv() {
+            Err(Empty) => {}
+            data => return data,
+        }
+
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
+            self.decrement(task)
+        });
+
+        match self.try_recv() {
+            data @ Ok(..) => { self.steals -= 1; data }
+            data => data,
+        }
+    }
+
+    // Essentially the exact same thing as the stream decrement function.
+    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+        assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+        let n = unsafe { task.cast_to_uint() };
+        self.to_wake.store(n, atomics::SeqCst);
+
+        let steals = self.steals;
+        self.steals = 0;
+
+        match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
+            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
+            // If we factor in our steals and notice that the channel has no
+            // data, we successfully sleep
+            n => {
+                assert!(n >= 0);
+                if n - steals <= 0 { return Ok(()) }
+            }
+        }
+
+        self.to_wake.store(0, atomics::SeqCst);
+        Err(unsafe { BlockedTask::cast_from_uint(n) })
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure> {
+        let ret = match self.queue.pop() {
+            mpsc::Data(t) => Some(t),
+            mpsc::Empty => None,
+
+            // This is a bit of an interesting case. The channel is
+            // reported as having data available, but our pop() has
+            // failed due to the queue being in an inconsistent state.
+            // This means that there is some pusher somewhere which has
+            // yet to complete, but we are guaranteed that a pop will
+            // eventually succeed. In this case, we spin in a yield loop
+            // because the remote sender should finish their enqueue
+            // operation "very quickly".
+            //
+            // Note that this yield loop does *not* attempt to do a green
+            // yield (regardless of the context), but *always* performs an
+            // OS-thread yield. The reasoning for this is that the pusher in
+            // question which is causing the inconsistent state is
+            // guaranteed to *not* be a blocked task (green tasks can't get
+            // pre-empted), so it must be on a different OS thread. Also,
+            // `try_recv` is normally a "guaranteed no rescheduling" context
+            // in a green-thread situation. By yielding control of the
+            // thread, we will hopefully allow time for the remote task on
+            // the other OS thread to make progress.
+            //
+            // Avoiding this yield loop would require a different queue
+            // abstraction which provides the guarantee that after M
+            // pushes have succeeded, at least M pops will succeed. The
+            // current queues guarantee that if there are N active
+            // pushes, you can pop N times once all N have finished.
+            mpsc::Inconsistent => {
+                let data;
+                loop {
+                    Thread::yield_now();
+                    match self.queue.pop() {
+                        mpsc::Data(t) => { data = t; break }
+                        mpsc::Empty => fail!("inconsistent => empty"),
+                        mpsc::Inconsistent => {}
+                    }
+                }
+                Some(data)
+            }
+        };
+        match ret {
+            // See the discussion in the stream implementation for why we we
+            // might decrement steals.
+            Some(data) => {
+                self.steals += 1;
+                if self.steals > MAX_STEALS {
+                    match self.cnt.swap(0, atomics::SeqCst) {
+                        DISCONNECTED => {
+                            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+                        }
+                        n => { self.steals -= n; }
+                    }
+                    assert!(self.steals >= 0);
+                }
+                Ok(data)
+            }
+
+            // See the discussion in the stream implementation for why we try
+            // again.
+            None => {
+                match self.cnt.load(atomics::SeqCst) {
+                    n if n != DISCONNECTED => Err(Empty),
+                    _ => {
+                        match self.queue.pop() {
+                            mpsc::Data(t) => Ok(t),
+                            mpsc::Empty => Err(Disconnected),
+                            // with no senders, an inconsistency is impossible.
+                            mpsc::Inconsistent => unreachable!(),
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&mut self) {
+        self.channels.fetch_add(1, atomics::SeqCst);
+    }
+
+    // Decrement the reference count on a channel. This is called whenever a
+    // Chan is dropped and may end up waking up a receiver. It's the receiver's
+    // responsibility on the other end to figure out that we've disconnected.
+    pub fn drop_chan(&mut self) {
+        match self.channels.fetch_sub(1, atomics::SeqCst) {
+            1 => {}
+            n if n > 1 => return,
+            n => fail!("bad number of channels left {}", n),
+        }
+
+        match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
+            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            DISCONNECTED => {}
+            n => { assert!(n >= 0); }
+        }
+    }
+
+    // See the long discussion inside of stream.rs for why the queue is drained,
+    // and why it is done in this fashion.
+    pub fn drop_port(&mut self) {
+        self.port_dropped.store(true, atomics::SeqCst);
+        let mut steals = self.steals;
+        while {
+            let cnt = self.cnt.compare_and_swap(
+                            steals, DISCONNECTED, atomics::SeqCst);
+            cnt != DISCONNECTED && cnt != steals
+        } {
+            // See the discussion in 'try_recv' for why we yield
+            // control of this thread.
+            loop {
+                match self.queue.pop() {
+                    mpsc::Data(..) => { steals += 1; }
+                    mpsc::Empty | mpsc::Inconsistent => break,
+                }
+            }
+        }
+    }
+
+    // Consumes ownership of the 'to_wake' field.
+    fn take_to_wake(&mut self) -> BlockedTask {
+        let task = self.to_wake.load(atomics::SeqCst);
+        self.to_wake.store(0, atomics::SeqCst);
+        assert!(task != 0);
+        unsafe { BlockedTask::cast_from_uint(task) }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // Helper function for select, tests whether this port can receive without
+    // blocking (obviously not an atomic decision).
+    //
+    // This is different than the stream version because there's no need to peek
+    // at the queue, we can just look at the local count.
+    pub fn can_recv(&mut self) -> bool {
+        let cnt = self.cnt.load(atomics::SeqCst);
+        cnt == DISCONNECTED || cnt - self.steals > 0
+    }
+
+    // Inserts the blocked task for selection on this port, returning it back if
+    // the port already has data on it.
+    //
+    // The code here is the same as in stream.rs, except that it doesn't need to
+    // peek at the channel to see if an upgrade is pending.
+    pub fn start_selection(&mut self,
+                           task: BlockedTask) -> Result<(), BlockedTask> {
+        match self.decrement(task) {
+            Ok(()) => Ok(()),
+            Err(task) => {
+                let prev = self.cnt.fetch_add(1, atomics::SeqCst);
+                assert!(prev >= 0);
+                return Err(task);
+            }
+        }
+    }
+
+    // Cancels a previous task waiting on this port, returning whether there's
+    // data on the port.
+    //
+    // This is similar to the stream implementation (hence fewer comments), but
+    // uses a different value for the "steals" variable.
+    pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
+        // Before we do anything else, we bounce on this lock. The reason for
+        // doing this is to ensure that any upgrade-in-progress is gone and
+        // done with. Without this bounce, we can race with inherit_blocker
+        // about looking at and dealing with to_wake. Once we have acquired the
+        // lock, we are guaranteed that inherit_blocker is done.
+        unsafe {
+            self.select_lock.lock();
+            self.select_lock.unlock();
+        }
+
+        // Like the stream implementation, we want to make sure that the count
+        // on the channel goes non-negative. We don't know how negative the
+        // stream currently is, so instead of using a steal value of 1, we load
+        // the channel count and figure out what we should do to make it
+        // positive.
+        let steals = {
+            let cnt = self.cnt.load(atomics::SeqCst);
+            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+        };
+        let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+
+        if prev == DISCONNECTED {
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+            true
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+            if prev < 0 {
+                self.take_to_wake().trash();
+            } else {
+                while self.to_wake.load(atomics::SeqCst) != 0 {
+                    Thread::yield_now();
+                }
+            }
+            // if the number of steals is -1, it was the pre-emptive -1 steal
+            // count from when we inherited a blocker. This is fine because
+            // we're just going to overwrite it with a real value.
+            assert!(self.steals == 0 || self.steals == -1);
+            self.steals = steals;
+            prev >= 0
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        unsafe {
+            // Note that this load is not only an assert for correctness about
+            // disconnection, but also a proper fence before the read of
+            // `to_wake`, so this assert cannot be removed with also removing
+            // the `to_wake` assert.
+            assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            assert_eq!(self.channels.load(atomics::SeqCst), 0);
+            self.select_lock.destroy();
+        }
+    }
+}
diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs
new file mode 100644
index 00000000000..0e249a55f87
--- /dev/null
+++ b/src/libstd/comm/stream.rs
@@ -0,0 +1,460 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Stream channels
+///
+/// This is the flavor of channels which are optimized for one sender and one
+/// receiver. The sender will be upgraded to a shared channel if the channel is
+/// cloned.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module.
+
+use comm::Port;
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None};
+use result::{Ok, Err, Result};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use rt::thread::Thread;
+use spsc = sync::spsc_queue;
+use sync::atomics;
+use vec::OwnedVector;
+
+static DISCONNECTED: int = int::MIN;
+static MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+    queue: spsc::Queue<Message<T>>, // internal queue for all message
+
+    cnt: atomics::AtomicInt, // How many items are on this channel
+    steals: int, // How many times has a port received without blocking?
+    to_wake: atomics::AtomicUint, // Task to wake up
+
+    port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
+}
+
+pub enum Failure<T> {
+    Empty,
+    Disconnected,
+    Upgraded(Port<T>),
+}
+
+pub enum UpgradeResult {
+    UpSuccess,
+    UpDisconnected,
+    UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+    SelSuccess,
+    SelCanceled(BlockedTask),
+    SelUpgraded(BlockedTask, Port<T>),
+}
+
+// Any message could contain an "upgrade request" to a new shared port, so the
+// internal queue it's a queue of T, but rather Message<T>
+enum Message<T> {
+    Data(T),
+    GoUp(Port<T>),
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new() -> Packet<T> {
+        Packet {
+            queue: spsc::Queue::new(128),
+
+            cnt: atomics::AtomicInt::new(0),
+            steals: 0,
+            to_wake: atomics::AtomicUint::new(0),
+
+            port_dropped: atomics::AtomicBool::new(false),
+        }
+    }
+
+
+    pub fn send(&mut self, t: T) -> bool {
+        match self.do_send(Data(t)) {
+            UpSuccess => true,
+            UpDisconnected => false,
+            UpWoke(task) => {
+                task.wake().map(|t| t.reawaken());
+                true
+            }
+        }
+    }
+    pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult {
+        self.do_send(GoUp(up))
+    }
+
+    fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
+        // Use an acquire/release ordering to maintain the same position with
+        // respect to the atomic loads below
+        if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
+
+        self.queue.push(t);
+        match self.cnt.fetch_add(1, atomics::SeqCst) {
+            // As described in the mod's doc comment, -1 == wakeup
+            -1 => UpWoke(self.take_to_wake()),
+            // As as described before, SPSC queues must be >= -2
+            -2 => UpSuccess,
+
+            // Be sure to preserve the disconnected state, and the return value
+            // in this case is going to be whether our data was received or not.
+            // This manifests itself on whether we have an empty queue or not.
+            //
+            // Primarily, are required to drain the queue here because the port
+            // will never remove this data. We can only have at most one item to
+            // drain (the port drains the rest).
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, atomics::SeqCst);
+                let first = self.queue.pop();
+                let second = self.queue.pop();
+                assert!(second.is_none());
+
+                match first {
+                    Some(..) => UpSuccess,  // we failed to send the data
+                    None => UpDisconnected, // we successfully sent data
+                }
+            }
+
+            // Otherwise we just sent some data on a non-waiting queue, so just
+            // make sure the world is sane and carry on!
+            n => { assert!(n >= 0); UpSuccess }
+        }
+    }
+
+    // Consumes ownership of the 'to_wake' field.
+    fn take_to_wake(&mut self) -> BlockedTask {
+        let task = self.to_wake.load(atomics::SeqCst);
+        self.to_wake.store(0, atomics::SeqCst);
+        assert!(task != 0);
+        unsafe { BlockedTask::cast_from_uint(task) }
+    }
+
+    // Decrements the count on the channel for a sleeper, returning the sleeper
+    // back if it shouldn't sleep. Note that this is the location where we take
+    // steals into account.
+    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+        assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+        let n = unsafe { task.cast_to_uint() };
+        self.to_wake.store(n, atomics::SeqCst);
+
+        let steals = self.steals;
+        self.steals = 0;
+
+        match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
+            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
+            // If we factor in our steals and notice that the channel has no
+            // data, we successfully sleep
+            n => {
+                assert!(n >= 0);
+                if n - steals <= 0 { return Ok(()) }
+            }
+        }
+
+        self.to_wake.store(0, atomics::SeqCst);
+        Err(unsafe { BlockedTask::cast_from_uint(n) })
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure<T>> {
+        // Optimistic preflight check (scheduling is expensive).
+        match self.try_recv() {
+            Err(Empty) => {}
+            data => return data,
+        }
+
+        // Welp, our channel has no data. Deschedule the current task and
+        // initiate the blocking protocol.
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
+            self.decrement(task)
+        });
+
+        match self.try_recv() {
+            // Messages which actually popped from the queue shouldn't count as
+            // a steal, so offset the decrement here (we already have our
+            // "steal" factored into the channel count above).
+            data @ Ok(..) |
+            data @ Err(Upgraded(..)) => {
+                self.steals -= 1;
+                data
+            }
+
+            data => data,
+        }
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+        match self.queue.pop() {
+            // If we stole some data, record to that effect (this will be
+            // factored into cnt later on). Note that we don't allow steals to
+            // grow without bound in order to prevent eventual overflow of
+            // either steals or cnt as an overflow would have catastrophic
+            // results. Also note that we don't unconditionally set steals to 0
+            // because it can be true that steals > cnt.
+            Some(data) => {
+                self.steals += 1;
+                if self.steals > MAX_STEALS {
+                    match self.cnt.swap(0, atomics::SeqCst) {
+                        DISCONNECTED => {
+                            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+                        }
+                        n => { self.steals -= n; }
+                    }
+                    assert!(self.steals >= 0);
+                }
+                match data {
+                    Data(t) => Ok(t),
+                    GoUp(up) => Err(Upgraded(up)),
+                }
+            }
+
+            None => {
+                match self.cnt.load(atomics::SeqCst) {
+                    n if n != DISCONNECTED => Err(Empty),
+
+                    // This is a little bit of a tricky case. We failed to pop
+                    // data above, and then we have viewed that the channel is
+                    // disconnected. In this window more data could have been
+                    // sent on the channel. It doesn't really make sense to
+                    // return that the channel is disconnected when there's
+                    // actually data on it, so be extra sure there's no data by
+                    // popping one more time.
+                    //
+                    // We can ignore steals because the other end is
+                    // disconnected and we'll never need to really factor in our
+                    // steals again.
+                    _ => {
+                        match self.queue.pop() {
+                            Some(Data(t)) => Ok(t),
+                            Some(GoUp(up)) => Err(Upgraded(up)),
+                            None => Err(Disconnected),
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    pub fn drop_chan(&mut self) {
+        // Dropping a channel is pretty simple, we just flag it as disconnected
+        // and then wakeup a blocker if there is one.
+        match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
+            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            DISCONNECTED => {}
+            n => { assert!(n >= 0); }
+        }
+    }
+
+    pub fn drop_port(&mut self) {
+        // Dropping a port seems like a fairly trivial thing. In theory all we
+        // need to do is flag that we're disconnected and then everything else
+        // can take over (we don't have anyone to wake up).
+        //
+        // The catch for Ports is that we want to drop the entire contents of
+        // the queue. There are multiple reasons for having this property, the
+        // largest of which is that if another chan is waiting in this channel
+        // (but not received yet), then waiting on that port will cause a
+        // deadlock.
+        //
+        // So if we accept that we must now destroy the entire contents of the
+        // queue, this code may make a bit more sense. The tricky part is that
+        // we can't let any in-flight sends go un-dropped, we have to make sure
+        // *everything* is dropped and nothing new will come onto the channel.
+
+        // The first thing we do is set a flag saying that we're done for. All
+        // sends are gated on this flag, so we're immediately guaranteed that
+        // there are a bounded number of active sends that we'll have to deal
+        // with.
+        self.port_dropped.store(true, atomics::SeqCst);
+
+        // Now that we're guaranteed to deal with a bounded number of senders,
+        // we need to drain the queue. This draining process happens atomically
+        // with respect to the "count" of the channel. If the count is nonzero
+        // (with steals taken into account), then there must be data on the
+        // channel. In this case we drain everything and then try again. We will
+        // continue to fail while active senders send data while we're dropping
+        // data, but eventually we're guaranteed to break out of this loop
+        // (because there is a bounded number of senders).
+        let mut steals = self.steals;
+        while {
+            let cnt = self.cnt.compare_and_swap(
+                            steals, DISCONNECTED, atomics::SeqCst);
+            cnt != DISCONNECTED && cnt != steals
+        } {
+            loop {
+                match self.queue.pop() {
+                    Some(..) => { steals += 1; }
+                    None => break
+                }
+            }
+        }
+
+        // At this point in time, we have gated all future senders from sending,
+        // and we have flagged the channel as being disconnected. The senders
+        // still have some responsibility, however, because some sends may not
+        // complete until after we flag the disconnection. There are more
+        // details in the sending methods that see DISCONNECTED
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // Tests to see whether this port can receive without blocking. If Ok is
+    // returned, then that's the answer. If Err is returned, then the returned
+    // port needs to be queried instead (an upgrade happened)
+    pub fn can_recv(&mut self) -> Result<bool, Port<T>> {
+        // We peek at the queue to see if there's anything on it, and we use
+        // this return value to determine if we should pop from the queue and
+        // upgrade this channel immediately. If it looks like we've got an
+        // upgrade pending, then go through the whole recv rigamarole to update
+        // the internal state.
+        match self.queue.peek() {
+            Some(&GoUp(..)) => {
+                match self.recv() {
+                    Err(Upgraded(port)) => Err(port),
+                    _ => unreachable!(),
+                }
+            }
+            Some(..) => Ok(true),
+            None => Ok(false)
+        }
+    }
+
+    // Attempts to start selecting on this port. Like a oneshot, this can fail
+    // immediately because of an upgrade.
+    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+        match self.decrement(task) {
+            Ok(()) => SelSuccess,
+            Err(task) => {
+                let ret = match self.queue.peek() {
+                    Some(&GoUp(..)) => {
+                        match self.queue.pop() {
+                            Some(GoUp(port)) => SelUpgraded(task, port),
+                            _ => unreachable!(),
+                        }
+                    }
+                    Some(..) => SelCanceled(task),
+                    None => SelCanceled(task),
+                };
+                // Undo our decrement above, and we should be guaranteed that the
+                // previous value is positive because we're not going to sleep
+                let prev = self.cnt.fetch_add(1, atomics::SeqCst);
+                assert!(prev >= 0);
+                return ret;
+            }
+        }
+    }
+
+    // Removes a previous task from being blocked in this port
+    pub fn abort_selection(&mut self,
+                           was_upgrade: bool) -> Result<bool, Port<T>> {
+        // If we're aborting selection after upgrading from a oneshot, then
+        // we're guarantee that no one is waiting. The only way that we could
+        // have seen the upgrade is if data was actually sent on the channel
+        // half again. For us, this means that there is guaranteed to be data on
+        // this channel. Furthermore, we're guaranteed that there was no
+        // start_selection previously, so there's no need to modify `self.cnt`
+        // at all.
+        //
+        // Hence, because of these invariants, we immediately return `Ok(true)`.
+        // Note that the data may not actually be sent on the channel just yet.
+        // The other end could have flagged the upgrade but not sent data to
+        // this end. This is fine because we know it's a small bounded windows
+        // of time until the data is actually sent.
+        if was_upgrade {
+            assert_eq!(self.steals, 0);
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            return Ok(true)
+        }
+
+        // We want to make sure that the count on the channel goes non-negative,
+        // and in the stream case we can have at most one steal, so just assume
+        // that we had one steal.
+        let steals = 1;
+        let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+
+        // If we were previously disconnected, then we know for sure that there
+        // is no task in to_wake, so just keep going
+        let has_data = if prev == DISCONNECTED {
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+            true // there is data, that data is that we're disconnected
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+
+            // If the previous count was negative, then we just made things go
+            // positive, hence we passed the -1 boundary and we're responsible
+            // for removing the to_wake() field and trashing it.
+            //
+            // If the previous count was positive then we're in a tougher
+            // situation. A possible race is that a sender just incremented
+            // through -1 (meaning it's going to try to wake a task up), but it
+            // hasn't yet read the to_wake. In order to prevent a future recv()
+            // from waking up too early (this sender picking up the plastered
+            // over to_wake), we spin loop here waiting for to_wake to be 0.
+            // Note that this entire select() implementation needs an overhaul,
+            // and this is *not* the worst part of it, so this is not done as a
+            // final solution but rather out of necessity for now to get
+            // something working.
+            if prev < 0 {
+                self.take_to_wake().trash();
+            } else {
+                while self.to_wake.load(atomics::SeqCst) != 0 {
+                    Thread::yield_now();
+                }
+            }
+            assert_eq!(self.steals, 0);
+            self.steals = steals;
+
+            // if we were previously positive, then there's surely data to
+            // receive
+            prev >= 0
+        };
+
+        // Now that we've determined that this queue "has data", we peek at the
+        // queue to see if the data is an upgrade or not. If it's an upgrade,
+        // then we need to destroy this port and abort selection on the
+        // upgraded port.
+        if has_data {
+            match self.queue.peek() {
+                Some(&GoUp(..)) => {
+                    match self.queue.pop() {
+                        Some(GoUp(port)) => Err(port),
+                        _ => unreachable!(),
+                    }
+                }
+                _ => Ok(true),
+            }
+        } else {
+            Ok(false)
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        unsafe {
+            // Note that this load is not only an assert for correctness about
+            // disconnection, but also a proper fence before the read of
+            // `to_wake`, so this assert cannot be removed with also removing
+            // the `to_wake` assert.
+            assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+        }
+    }
+}
diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs
index 75804c40c58..46c106234db 100644
--- a/src/libstd/io/signal.rs
+++ b/src/libstd/io/signal.rs
@@ -21,7 +21,7 @@ definitions for a number of signals.
 
 use clone::Clone;
 use result::{Ok, Err};
-use comm::{Port, SharedChan};
+use comm::{Port, Chan};
 use container::{Map, MutableMap};
 use hashmap;
 use io;
@@ -81,7 +81,7 @@ pub struct Listener {
     priv handles: hashmap::HashMap<Signum, ~RtioSignal>,
     /// chan is where all the handles send signums, which are received by
     /// the clients from port.
-    priv chan: SharedChan<Signum>,
+    priv chan: Chan<Signum>,
 
     /// Clients of Listener can `recv()` from this port. This is exposed to
     /// allow selection over this port as well as manipulation of the port
@@ -93,7 +93,7 @@ impl Listener {
     /// Creates a new listener for signals. Once created, signals are bound via
     /// the `register` method (otherwise nothing will ever be received)
     pub fn new() -> Listener {
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         Listener {
             chan: chan,
             port: port,
diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs
index 471ec050192..4849b83037f 100644
--- a/src/libstd/prelude.rs
+++ b/src/libstd/prelude.rs
@@ -80,7 +80,7 @@ pub use vec::{MutableVector, MutableTotalOrdVector};
 pub use vec::{Vector, VectorVector, CloneableVector, ImmutableVector};
 
 // Reexported runtime types
-pub use comm::{Port, Chan, SharedChan};
+pub use comm::{Port, Chan};
 pub use task::spawn;
 
 // Reexported statics
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 376d685c8ac..b751c57c0fa 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -73,6 +73,7 @@ pub use self::unwind::{begin_unwind, begin_unwind_raw, begin_unwind_fmt};
 // FIXME: these probably shouldn't be public...
 #[doc(hidden)]
 pub mod shouldnt_be_public {
+    #[cfg(not(test))]
     pub use super::local_ptr::native::maybe_tls_key;
     #[cfg(not(windows), not(target_os = "android"))]
     pub use super::local_ptr::compiled::RT_TLS_PTR;
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 8d02048d55c..39623e329ea 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -10,7 +10,7 @@
 
 use c_str::CString;
 use cast;
-use comm::{SharedChan, Port};
+use comm::{Chan, Port};
 use libc::c_int;
 use libc;
 use ops::Drop;
@@ -181,7 +181,7 @@ pub trait IoFactory {
     fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>;
     fn tty_open(&mut self, fd: c_int, readable: bool)
             -> Result<~RtioTTY, IoError>;
-    fn signal(&mut self, signal: Signum, channel: SharedChan<Signum>)
+    fn signal(&mut self, signal: Signum, channel: Chan<Signum>)
         -> Result<~RtioSignal, IoError>;
 }
 
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index a7648dd2d19..e2b94e655e8 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -449,7 +449,7 @@ mod test {
 
     #[test]
     fn comm_shared_chan() {
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         chan.send(10);
         assert!(port.recv() == 10);
     }
diff --git a/src/libstd/rt/unwind.rs b/src/libstd/rt/unwind.rs
index 2f4e705735e..9f89becaef9 100644
--- a/src/libstd/rt/unwind.rs
+++ b/src/libstd/rt/unwind.rs
@@ -73,6 +73,7 @@ use unstable::intrinsics;
 
 use uw = self::libunwind;
 
+#[allow(dead_code)]
 mod libunwind {
     //! Unwind library interface
 
diff --git a/src/libstd/run.rs b/src/libstd/run.rs
index fdd26c6c383..6f684f23d47 100644
--- a/src/libstd/run.rs
+++ b/src/libstd/run.rs
@@ -13,7 +13,7 @@
 #[allow(missing_doc)];
 #[deny(unused_must_use)];
 
-use comm::SharedChan;
+use comm::Chan;
 use io::Reader;
 use io::process::ProcessExit;
 use io::process;
@@ -225,7 +225,7 @@ impl Process {
         // in parallel so we don't deadlock while blocking on one
         // or the other. FIXME (#2625): Surely there's a much more
         // clever way to do this.
-        let (p, ch) = SharedChan::new();
+        let (p, ch) = Chan::new();
         let ch_clone = ch.clone();
 
         spawn(proc() {
diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs
index 74f3a6f6918..44825a1ef94 100644
--- a/src/libstd/sync/mpmc_bounded_queue.rs
+++ b/src/libstd/sync/mpmc_bounded_queue.rs
@@ -172,7 +172,7 @@ mod tests {
         let nmsgs = 1000u;
         let mut q = Queue::with_capacity(nthreads*nmsgs);
         assert_eq!(None, q.pop());
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
 
         for _ in range(0, nthreads) {
             let q = q.clone();
diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs
index 258162069d9..b5a55f3f8c9 100644
--- a/src/libstd/sync/mpsc_queue.rs
+++ b/src/libstd/sync/mpsc_queue.rs
@@ -156,14 +156,15 @@ impl<T: Send> Drop for Queue<T> {
 mod tests {
     use prelude::*;
 
-    use super::{Queue, Data, Empty, Inconsistent};
     use native;
+    use super::{Queue, Data, Empty, Inconsistent};
+    use sync::arc::UnsafeArc;
 
     #[test]
     fn test_full() {
         let mut q = Queue::new();
-        p.push(~1);
-        p.push(~2);
+        q.push(~1);
+        q.push(~2);
     }
 
     #[test]
@@ -171,11 +172,11 @@ mod tests {
         let nthreads = 8u;
         let nmsgs = 1000u;
         let mut q = Queue::new();
-        match c.pop() {
+        match q.pop() {
             Empty => {}
             Inconsistent | Data(..) => fail!()
         }
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
         let q = UnsafeArc::new(q);
 
         for _ in range(0, nthreads) {
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
index d1fde759cc1..a2c61a2b135 100644
--- a/src/libstd/sync/spsc_queue.rs
+++ b/src/libstd/sync/spsc_queue.rs
@@ -194,14 +194,16 @@ impl<T: Send> Queue<T> {
         }
     }
 
-    /// Tests whether this queue is empty or not. Remember that there can only
-    /// be one tester/popper, and also keep in mind that the answer returned
-    /// from this is likely to change if it is `false`.
-    pub fn is_empty(&self) -> bool {
+    /// Attempts to peek at the head of the queue, returning `None` if the queue
+    /// has no data currently
+    pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+        // This is essentially the same as above with all the popping bits
+        // stripped out.
         unsafe {
             let tail = self.tail;
             let next = (*tail).next.load(Acquire);
-            return next.is_null();
+            if next.is_null() { return None }
+            return (*next).value.as_mut();
         }
     }
 }
@@ -223,8 +225,9 @@ impl<T: Send> Drop for Queue<T> {
 #[cfg(test)]
 mod test {
     use prelude::*;
-    use super::Queue;
     use native;
+    use super::Queue;
+    use sync::arc::UnsafeArc;
 
     #[test]
     fn smoke() {
@@ -272,7 +275,6 @@ mod test {
             let (a, b) = UnsafeArc::new2(Queue::new(bound));
             let (port, chan) = Chan::new();
             native::task::spawn(proc() {
-                let mut c = c;
                 for _ in range(0, 100000) {
                     loop {
                         match unsafe { (*b.get()).pop() } {
diff --git a/src/libstd/task.rs b/src/libstd/task.rs
index 921d0feaa8b..5d8c4a87b39 100644
--- a/src/libstd/task.rs
+++ b/src/libstd/task.rs
@@ -65,7 +65,6 @@ use rt::task::Task;
 use str::{Str, SendStr, IntoMaybeOwned};
 
 #[cfg(test)] use any::{AnyOwnExt, AnyRefExt};
-#[cfg(test)] use comm::SharedChan;
 #[cfg(test)] use ptr;
 #[cfg(test)] use result;
 
@@ -474,9 +473,9 @@ fn test_try_fail() {
 fn test_spawn_sched() {
     use clone::Clone;
 
-    let (po, ch) = SharedChan::new();
+    let (po, ch) = Chan::new();
 
-    fn f(i: int, ch: SharedChan<()>) {
+    fn f(i: int, ch: Chan<()>) {
         let ch = ch.clone();
         spawn(proc() {
             if i == 0 {
diff --git a/src/libsync/sync/mod.rs b/src/libsync/sync/mod.rs
index 0ac385ea1d1..7078f01945e 100644
--- a/src/libsync/sync/mod.rs
+++ b/src/libsync/sync/mod.rs
@@ -764,7 +764,7 @@ mod tests {
     use std::cast;
     use std::result;
     use std::task;
-    use std::comm::{SharedChan, Empty};
+    use std::comm::Empty;
 
     /************************************************************************
      * Semaphore tests
@@ -1393,7 +1393,7 @@ mod tests {
     #[test]
     fn test_barrier() {
         let barrier = Barrier::new(10);
-        let (port, chan) = SharedChan::new();
+        let (port, chan) = Chan::new();
 
         for _ in range(0, 9) {
             let c = barrier.clone();
diff --git a/src/libsync/sync/mutex.rs b/src/libsync/sync/mutex.rs
index f1a81d65c1d..3726528a5e9 100644
--- a/src/libsync/sync/mutex.rs
+++ b/src/libsync/sync/mutex.rs
@@ -531,7 +531,7 @@ mod test {
             }
         }
 
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         for _ in range(0, N) {
             let c2 = c.clone();
             native::task::spawn(proc() { inc(); c2.send(()); });
diff --git a/src/libsync/sync/one.rs b/src/libsync/sync/one.rs
index 93d818b704d..a651f3b9d4c 100644
--- a/src/libsync/sync/one.rs
+++ b/src/libsync/sync/one.rs
@@ -137,7 +137,7 @@ mod test {
         static mut o: Once = ONCE_INIT;
         static mut run: bool = false;
 
-        let (p, c) = SharedChan::new();
+        let (p, c) = Chan::new();
         for _ in range(0, 10) {
             let c = c.clone();
             spawn(proc() {
diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs
index aa4e0f1ae58..b766be88d23 100644
--- a/src/test/bench/msgsend-pipes-shared.rs
+++ b/src/test/bench/msgsend-pipes-shared.rs
@@ -53,7 +53,7 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) {
 
 fn run(args: &[~str]) {
     let (from_child, to_parent) = Chan::new();
-    let (from_parent, to_child) = SharedChan::new();
+    let (from_parent, to_child) = Chan::new();
 
     let size = from_str::<uint>(args[1]).unwrap();
     let workers = from_str::<uint>(args[2]).unwrap();
diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs
index 6ce0f9de8d0..89e0bcf3326 100644
--- a/src/test/bench/msgsend-pipes.rs
+++ b/src/test/bench/msgsend-pipes.rs
@@ -67,7 +67,7 @@ fn run(args: &[~str]) {
         });
         from_parent
     } else {
-        let (from_parent, to_child) = SharedChan::new();
+        let (from_parent, to_child) = Chan::new();
         for _ in range(0u, workers) {
             let to_child = to_child.clone();
             let mut builder = task::task();
diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs
index 7281667e676..5c237b306fb 100644
--- a/src/test/bench/shootout-chameneos-redux.rs
+++ b/src/test/bench/shootout-chameneos-redux.rs
@@ -100,8 +100,8 @@ fn creature(
     name: uint,
     color: color,
     from_rendezvous: Port<Option<CreatureInfo>>,
-    to_rendezvous: SharedChan<CreatureInfo>,
-    to_rendezvous_log: SharedChan<~str>
+    to_rendezvous: Chan<CreatureInfo>,
+    to_rendezvous_log: Chan<~str>
 ) {
     let mut color = color;
     let mut creatures_met = 0;
@@ -137,8 +137,8 @@ fn creature(
 fn rendezvous(nn: uint, set: ~[color]) {
 
     // these ports will allow us to hear from the creatures
-    let (from_creatures, to_rendezvous) = SharedChan::<CreatureInfo>::new();
-    let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new();
+    let (from_creatures, to_rendezvous) = Chan::<CreatureInfo>::new();
+    let (from_creatures_log, to_rendezvous_log) = Chan::<~str>::new();
 
     // these channels will be passed to the creatures so they can talk to us
 
diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs
index 86a2043527e..7f4fd3cf94c 100644
--- a/src/test/bench/shootout-pfib.rs
+++ b/src/test/bench/shootout-pfib.rs
@@ -28,13 +28,13 @@ use std::task;
 use std::uint;
 
 fn fib(n: int) -> int {
-    fn pfib(c: &SharedChan<int>, n: int) {
+    fn pfib(c: &Chan<int>, n: int) {
         if n == 0 {
             c.send(0);
         } else if n <= 2 {
             c.send(1);
         } else {
-            let (pp, cc) = SharedChan::new();
+            let (pp, cc) = Chan::new();
             let ch = cc.clone();
             task::spawn(proc() pfib(&ch, n - 1));
             let ch = cc.clone();
@@ -43,7 +43,7 @@ fn fib(n: int) -> int {
         }
     }
 
-    let (p, ch) = SharedChan::new();
+    let (p, ch) = Chan::new();
     let _t = task::spawn(proc() pfib(&ch, n) );
     p.recv()
 }
diff --git a/src/test/bench/task-perf-linked-failure.rs b/src/test/bench/task-perf-linked-failure.rs
index 2a012ef19fa..189a3ac7448 100644
--- a/src/test/bench/task-perf-linked-failure.rs
+++ b/src/test/bench/task-perf-linked-failure.rs
@@ -33,7 +33,7 @@
 // Creates in the background 'num_tasks' tasks, all blocked forever.
 // Doesn't return until all such tasks are ready, but doesn't block forever itself.
 
-use std::comm::{stream, SharedChan};
+use std::comm::{stream, Chan};
 use std::os;
 use std::result;
 use std::task;
@@ -41,7 +41,7 @@ use std::uint;
 
 fn grandchild_group(num_tasks: uint) {
     let (po, ch) = stream();
-    let ch = SharedChan::new(ch);
+    let ch = Chan::new(ch);
 
     for _ in range(0, num_tasks) {
         let ch = ch.clone();
diff --git a/src/test/compile-fail/comm-not-freeze.rs b/src/test/compile-fail/comm-not-freeze.rs
index 2b85068d470..ef5bd21f913 100644
--- a/src/test/compile-fail/comm-not-freeze.rs
+++ b/src/test/compile-fail/comm-not-freeze.rs
@@ -13,5 +13,5 @@ fn test<T: Freeze>() {}
 fn main() {
     test::<Chan<int>>();        //~ ERROR: does not fulfill `Freeze`
     test::<Port<int>>();        //~ ERROR: does not fulfill `Freeze`
-    test::<SharedChan<int>>();  //~ ERROR: does not fulfill `Freeze`
+    test::<Chan<int>>();  //~ ERROR: does not fulfill `Freeze`
 }
diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs
index b15c3dca855..9c05dae46bd 100644
--- a/src/test/run-pass/hashmap-memory.rs
+++ b/src/test/run-pass/hashmap-memory.rs
@@ -31,7 +31,7 @@ mod map_reduce {
 
     enum ctrl_proto { find_reducer(~[u8], Chan<int>), mapper_done, }
 
-    fn start_mappers(ctrl: SharedChan<ctrl_proto>, inputs: ~[~str]) {
+    fn start_mappers(ctrl: Chan<ctrl_proto>, inputs: ~[~str]) {
         for i in inputs.iter() {
             let ctrl = ctrl.clone();
             let i = i.clone();
@@ -39,11 +39,11 @@ mod map_reduce {
         }
     }
 
-    fn map_task(ctrl: SharedChan<ctrl_proto>, input: ~str) {
+    fn map_task(ctrl: Chan<ctrl_proto>, input: ~str) {
         let mut intermediates = HashMap::new();
 
         fn emit(im: &mut HashMap<~str, int>,
-                ctrl: SharedChan<ctrl_proto>, key: ~str,
+                ctrl: Chan<ctrl_proto>, key: ~str,
                 _val: ~str) {
             if im.contains_key(&key) {
                 return;
@@ -63,7 +63,7 @@ mod map_reduce {
     }
 
     pub fn map_reduce(inputs: ~[~str]) {
-        let (ctrl_port, ctrl_chan) = SharedChan::new();
+        let (ctrl_port, ctrl_chan) = Chan::new();
 
         // This task becomes the master control task. It spawns others
         // to do the rest.
diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs
index b51f626f3c2..0403284e55f 100644
--- a/src/test/run-pass/task-comm-14.rs
+++ b/src/test/run-pass/task-comm-14.rs
@@ -13,7 +13,7 @@
 use std::task;
 
 pub fn main() {
-    let (po, ch) = SharedChan::new();
+    let (po, ch) = Chan::new();
 
     // Spawn 10 tasks each sending us back one int.
     let mut i = 10;
@@ -37,7 +37,7 @@ pub fn main() {
     info!("main thread exiting");
 }
 
-fn child(x: int, ch: &SharedChan<int>) {
+fn child(x: int, ch: &Chan<int>) {
     info!("{}", x);
     ch.send(x);
 }
diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs
index 049f2d71946..f5374e7df05 100644
--- a/src/test/run-pass/task-comm-3.rs
+++ b/src/test/run-pass/task-comm-3.rs
@@ -16,7 +16,7 @@ use std::task;
 
 pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
 
-fn test00_start(ch: &SharedChan<int>, message: int, count: int) {
+fn test00_start(ch: &Chan<int>, message: int, count: int) {
     info!("Starting test00_start");
     let mut i: int = 0;
     while i < count {
@@ -33,7 +33,7 @@ fn test00() {
 
     info!("Creating tasks");
 
-    let (po, ch) = SharedChan::new();
+    let (po, ch) = Chan::new();
 
     let mut i: int = 0;
 
diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs
index 45994e78d94..c63bf8bc856 100644
--- a/src/test/run-pass/task-comm-6.rs
+++ b/src/test/run-pass/task-comm-6.rs
@@ -15,7 +15,7 @@ pub fn main() { test00(); }
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, ch) = SharedChan::new();
+    let (p, ch) = Chan::new();
     let mut c0 = ch.clone();
     let mut c1 = ch.clone();
     let mut c2 = ch.clone();
diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs
index 159962e1857..ff43a80adac 100644
--- a/src/test/run-pass/task-comm-7.rs
+++ b/src/test/run-pass/task-comm-7.rs
@@ -18,7 +18,7 @@ use std::task;
 
 pub fn main() { test00(); }
 
-fn test00_start(c: &SharedChan<int>, start: int,
+fn test00_start(c: &Chan<int>, start: int,
                 number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(start + i); i += 1; }
@@ -27,7 +27,7 @@ fn test00_start(c: &SharedChan<int>, start: int,
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, ch) = SharedChan::new();
+    let (p, ch) = Chan::new();
     let number_of_messages: int = 10;
 
     let c = ch.clone();
diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs
index d1c45a336fa..299fed735ab 100644
--- a/src/test/run-pass/unique-send-2.rs
+++ b/src/test/run-pass/unique-send-2.rs
@@ -10,12 +10,12 @@
 
 use std::task;
 
-fn child(c: &SharedChan<~uint>, i: uint) {
+fn child(c: &Chan<~uint>, i: uint) {
     c.send(~i);
 }
 
 pub fn main() {
-    let (p, ch) = SharedChan::new();
+    let (p, ch) = Chan::new();
     let n = 100u;
     let mut expected = 0u;
     for i in range(0u, n) {
diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs
index 4679f65c43c..e643a20436e 100644
--- a/src/test/run-pass/unwind-resource.rs
+++ b/src/test/run-pass/unwind-resource.rs
@@ -15,7 +15,7 @@ extern mod extra;
 use std::task;
 
 struct complainer {
-  c: SharedChan<bool>,
+  c: Chan<bool>,
 }
 
 impl Drop for complainer {
@@ -26,20 +26,20 @@ impl Drop for complainer {
     }
 }
 
-fn complainer(c: SharedChan<bool>) -> complainer {
+fn complainer(c: Chan<bool>) -> complainer {
     error!("Hello!");
     complainer {
         c: c
     }
 }
 
-fn f(c: SharedChan<bool>) {
+fn f(c: Chan<bool>) {
     let _c = complainer(c);
     fail!();
 }
 
 pub fn main() {
-    let (p, c) = SharedChan::new();
+    let (p, c) = Chan::new();
     task::spawn(proc() f(c.clone()));
     error!("hiiiiiiiii");
     assert!(p.recv());