about summary refs log tree commit diff
path: root/src/libstd/comm
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/comm')
-rw-r--r--src/libstd/comm/mod.rs2085
-rw-r--r--src/libstd/comm/oneshot.rs377
-rw-r--r--src/libstd/comm/select.rs711
-rw-r--r--src/libstd/comm/shared.rs493
-rw-r--r--src/libstd/comm/stream.rs486
-rw-r--r--src/libstd/comm/sync.rs490
6 files changed, 4642 insertions, 0 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
new file mode 100644
index 00000000000..2b66e91c00d
--- /dev/null
+++ b/src/libstd/comm/mod.rs
@@ -0,0 +1,2085 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Communication primitives for concurrent tasks
+//!
+//! Rust makes it very difficult to share data among tasks to prevent race
+//! conditions and to improve parallelism, but there is often a need for
+//! communication between concurrent tasks. The primitives defined in this
+//! module are the building blocks for synchronization in rust.
+//!
+//! This module provides message-based communication over channels, concretely
+//! defined among three types:
+//!
+//! * `Sender`
+//! * `SyncSender`
+//! * `Receiver`
+//!
+//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
+//! senders are clone-able such that many tasks can send simultaneously to one
+//! receiver.  These channels are *task blocking*, not *thread blocking*. This
+//! means that if one task is blocked on a channel, other tasks can continue to
+//! make progress.
+//!
+//! Rust channels come in one of two flavors:
+//!
+//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
+//!    will return a `(Sender, Receiver)` tuple where all sends will be
+//!    **asynchronous** (they never block). The channel conceptually has an
+//!    infinite buffer.
+//!
+//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
+//!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
+//!    is a pre-allocated buffer of a fixed size. All sends will be
+//!    **synchronous** by blocking until there is buffer space available. Note
+//!    that a bound of 0 is allowed, causing the channel to become a
+//!    "rendezvous" channel where each sender atomically hands off a message to
+//!    a receiver.
+//!
+//! ## Panic Propagation
+//!
+//! In addition to being a core primitive for communicating in rust, channels
+//! are the points at which panics are propagated among tasks.  Whenever the one
+//! half of channel is closed, the other half will have its next operation
+//! `panic!`. The purpose of this is to allow propagation of panics among tasks
+//! that are linked to one another via channels.
+//!
+//! There are methods on both of senders and receivers to perform their
+//! respective operations without panicking, however.
+//!
+//! ## Runtime Requirements
+//!
+//! The channel types defined in this module generally have very few runtime
+//! requirements in order to operate. The major requirement they have is for a
+//! local rust `Task` to be available if any *blocking* operation is performed.
+//!
+//! If a local `Task` is not available (for example an FFI callback), then the
+//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
+//! the `try_send` method on a `SyncSender`, but no other operations are
+//! guaranteed to be safe.
+//!
+//! # Example
+//!
+//! Simple usage:
+//!
+//! ```
+//! // Create a simple streaming channel
+//! let (tx, rx) = channel();
+//! spawn(proc() {
+//!     tx.send(10i);
+//! });
+//! assert_eq!(rx.recv(), 10i);
+//! ```
+//!
+//! Shared usage:
+//!
+//! ```
+//! // Create a shared channel which can be sent along from many tasks
+//! // where tx is the sending half (tx for transmission), and rx is the receiving
+//! // half (rx for receiving).
+//! let (tx, rx) = channel();
+//! for i in range(0i, 10i) {
+//!     let tx = tx.clone();
+//!     spawn(proc() {
+//!         tx.send(i);
+//!     })
+//! }
+//!
+//! for _ in range(0i, 10i) {
+//!     let j = rx.recv();
+//!     assert!(0 <= j && j < 10);
+//! }
+//! ```
+//!
+//! Propagating panics:
+//!
+//! ```should_fail
+//! // The call to recv() will panic!() because the channel has already hung
+//! // up (or been deallocated)
+//! let (tx, rx) = channel::<int>();
+//! drop(tx);
+//! rx.recv();
+//! ```
+//!
+//! Synchronous channels:
+//!
+//! ```
+//! let (tx, rx) = sync_channel::<int>(0);
+//! spawn(proc() {
+//!     // This will wait for the parent task to start receiving
+//!     tx.send(53);
+//! });
+//! rx.recv();
+//! ```
+//!
+//! Reading from a channel with a timeout requires to use a Timer together
+//! with the channel. You can use the select! macro to select either and
+//! handle the timeout case. This first example will break out of the loop
+//! after 10 seconds no matter what:
+//!
+//! ```no_run
+//! use std::io::timer::Timer;
+//! use std::time::Duration;
+//!
+//! let (tx, rx) = channel::<int>();
+//! let mut timer = Timer::new().unwrap();
+//! let timeout = timer.oneshot(Duration::seconds(10));
+//!
+//! loop {
+//!     select! {
+//!         val = rx.recv() => println!("Received {}", val),
+//!         () = timeout.recv() => {
+//!             println!("timed out, total time was more than 10 seconds")
+//!             break;
+//!         }
+//!     }
+//! }
+//! ```
+//!
+//! This second example is more costly since it allocates a new timer every
+//! time a message is received, but it allows you to timeout after the channel
+//! has been inactive for 5 seconds:
+//!
+//! ```no_run
+//! use std::io::timer::Timer;
+//! use std::time::Duration;
+//!
+//! let (tx, rx) = channel::<int>();
+//! let mut timer = Timer::new().unwrap();
+//!
+//! loop {
+//!     let timeout = timer.oneshot(Duration::seconds(5));
+//!
+//!     select! {
+//!         val = rx.recv() => println!("Received {}", val),
+//!         () = timeout.recv() => {
+//!             println!("timed out, no message received in 5 seconds")
+//!             break;
+//!         }
+//!     }
+//! }
+//! ```
+
+// A description of how Rust's channel implementation works
+//
+// Channels are supposed to be the basic building block for all other
+// concurrent primitives that are used in Rust. As a result, the channel type
+// needs to be highly optimized, flexible, and broad enough for use everywhere.
+//
+// The choice of implementation of all channels is to be built on lock-free data
+// structures. The channels themselves are then consequently also lock-free data
+// structures. As always with lock-free code, this is a very "here be dragons"
+// territory, especially because I'm unaware of any academic papers which have
+// gone into great length about channels of these flavors.
+//
+// ## Flavors of channels
+//
+// From the perspective of a consumer of this library, there is only one flavor
+// of channel. This channel can be used as a stream and cloned to allow multiple
+// senders. Under the hood, however, there are actually three flavors of
+// channels in play.
+//
+// * Oneshots - these channels are highly optimized for the one-send use case.
+//              They contain as few atomics as possible and involve one and
+//              exactly one allocation.
+// * Streams - these channels are optimized for the non-shared use case. They
+//             use a different concurrent queue which is more tailored for this
+//             use case. The initial allocation of this flavor of channel is not
+//             optimized.
+// * Shared - this is the most general form of channel that this module offers,
+//            a channel with multiple senders. This type is as optimized as it
+//            can be, but the previous two types mentioned are much faster for
+//            their use-cases.
+//
+// ## Concurrent queues
+//
+// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
+// recv() obviously blocks. This means that under the hood there must be some
+// shared and concurrent queue holding all of the actual data.
+//
+// With two flavors of channels, two flavors of queues are also used. We have
+// chosen to use queues from a well-known author which are abbreviated as SPSC
+// and MPSC (single producer, single consumer and multiple producer, single
+// consumer). SPSC queues are used for streams while MPSC queues are used for
+// shared channels.
+//
+// ### SPSC optimizations
+//
+// The SPSC queue found online is essentially a linked list of nodes where one
+// half of the nodes are the "queue of data" and the other half of nodes are a
+// cache of unused nodes. The unused nodes are used such that an allocation is
+// not required on every push() and a free doesn't need to happen on every
+// pop().
+//
+// As found online, however, the cache of nodes is of an infinite size. This
+// means that if a channel at one point in its life had 50k items in the queue,
+// then the queue will always have the capacity for 50k items. I believed that
+// this was an unnecessary limitation of the implementation, so I have altered
+// the queue to optionally have a bound on the cache size.
+//
+// By default, streams will have an unbounded SPSC queue with a small-ish cache
+// size. The hope is that the cache is still large enough to have very fast
+// send() operations while not too large such that millions of channels can
+// coexist at once.
+//
+// ### MPSC optimizations
+//
+// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
+// a linked list under the hood to earn its unboundedness, but I have not put
+// forth much effort into having a cache of nodes similar to the SPSC queue.
+//
+// For now, I believe that this is "ok" because shared channels are not the most
+// common type, but soon we may wish to revisit this queue choice and determine
+// another candidate for backend storage of shared channels.
+//
+// ## Overview of the Implementation
+//
+// Now that there's a little background on the concurrent queues used, it's
+// worth going into much more detail about the channels themselves. The basic
+// pseudocode for a send/recv are:
+//
+//
+//      send(t)                             recv()
+//        queue.push(t)                       return if queue.pop()
+//        if increment() == -1                deschedule {
+//          wakeup()                            if decrement() > 0
+//                                                cancel_deschedule()
+//                                            }
+//                                            queue.pop()
+//
+// As mentioned before, there are no locks in this implementation, only atomic
+// instructions are used.
+//
+// ### The internal atomic counter
+//
+// Every channel has a shared counter with each half to keep track of the size
+// of the queue. This counter is used to abort descheduling by the receiver and
+// to know when to wake up on the sending side.
+//
+// As seen in the pseudocode, senders will increment this count and receivers
+// will decrement the count. The theory behind this is that if a sender sees a
+// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
+// then it doesn't need to block.
+//
+// The recv() method has a beginning call to pop(), and if successful, it needs
+// to decrement the count. It is a crucial implementation detail that this
+// decrement does *not* happen to the shared counter. If this were the case,
+// then it would be possible for the counter to be very negative when there were
+// no receivers waiting, in which case the senders would have to determine when
+// it was actually appropriate to wake up a receiver.
+//
+// Instead, the "steal count" is kept track of separately (not atomically
+// because it's only used by receivers), and then the decrement() call when
+// descheduling will lump in all of the recent steals into one large decrement.
+//
+// The implication of this is that if a sender sees a -1 count, then there's
+// guaranteed to be a waiter waiting!
+//
+// ## Native Implementation
+//
+// A major goal of these channels is to work seamlessly on and off the runtime.
+// All of the previous race conditions have been worded in terms of
+// scheduler-isms (which is obviously not available without the runtime).
+//
+// For now, native usage of channels (off the runtime) will fall back onto
+// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
+// is still entirely lock-free, the "deschedule" blocks above are surrounded by
+// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
+// condition variable.
+//
+// ## Select
+//
+// Being able to support selection over channels has greatly influenced this
+// design, and not only does selection need to work inside the runtime, but also
+// outside the runtime.
+//
+// The implementation is fairly straightforward. The goal of select() is not to
+// return some data, but only to return which channel can receive data without
+// blocking. The implementation is essentially the entire blocking procedure
+// followed by an increment as soon as its woken up. The cancellation procedure
+// involves an increment and swapping out of to_wake to acquire ownership of the
+// task to unblock.
+//
+// Sadly this current implementation requires multiple allocations, so I have
+// seen the throughput of select() be much worse than it should be. I do not
+// believe that there is anything fundamental which needs to change about these
+// channels, however, in order to support a more efficient select().
+//
+// # Conclusion
+//
+// And now that you've seen all the races that I found and attempted to fix,
+// here's the code for you to find some more!
+
+use core::prelude::*;
+
+pub use self::TryRecvError::*;
+pub use self::TrySendError::*;
+use self::Flavor::*;
+
+use alloc::arc::Arc;
+use core::kinds::marker;
+use core::mem;
+use core::cell::UnsafeCell;
+use rustrt::task::BlockedTask;
+
+pub use comm::select::{Select, Handle};
+
+macro_rules! test (
+    { fn $name:ident() $b:block $(#[$a:meta])*} => (
+        mod $name {
+            #![allow(unused_imports)]
+
+            extern crate rustrt;
+
+            use prelude::*;
+
+            use comm::*;
+            use super::*;
+            use task;
+
+            $(#[$a])* #[test] fn f() { $b }
+        }
+    )
+)
+
+mod oneshot;
+mod select;
+mod shared;
+mod stream;
+mod sync;
+
+/// The receiving-half of Rust's channel type. This half can only be owned by
+/// one task
+#[unstable]
+pub struct Receiver<T> {
+    inner: UnsafeCell<Flavor<T>>,
+    // can't share in an arc
+    _marker: marker::NoSync,
+}
+
+/// An iterator over messages on a receiver, this iterator will block
+/// whenever `next` is called, waiting for a new message, and `None` will be
+/// returned when the corresponding channel has hung up.
+#[unstable]
+pub struct Messages<'a, T:'a> {
+    rx: &'a Receiver<T>
+}
+
+/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
+#[unstable]
+pub struct Sender<T> {
+    inner: UnsafeCell<Flavor<T>>,
+    // can't share in an arc
+    _marker: marker::NoSync,
+}
+
+/// The sending-half of Rust's synchronous channel type. This half can only be
+/// owned by one task, but it can be cloned to send to other tasks.
+#[unstable = "this type may be renamed, but it will always exist"]
+pub struct SyncSender<T> {
+    inner: Arc<UnsafeCell<sync::Packet<T>>>,
+    // can't share in an arc
+    _marker: marker::NoSync,
+}
+
+/// This enumeration is the list of the possible reasons that try_recv could not
+/// return data when called.
+#[deriving(PartialEq, Clone, Show)]
+#[experimental = "this is likely to be removed in changing try_recv()"]
+pub enum TryRecvError {
+    /// This channel is currently empty, but the sender(s) have not yet
+    /// disconnected, so data may yet become available.
+    Empty,
+    /// This channel's sending half has become disconnected, and there will
+    /// never be any more data received on this channel
+    Disconnected,
+}
+
+/// This enumeration is the list of the possible error outcomes for the
+/// `SyncSender::try_send` method.
+#[deriving(PartialEq, Clone, Show)]
+#[experimental = "this is likely to be removed in changing try_send()"]
+pub enum TrySendError<T> {
+    /// The data could not be sent on the channel because it would require that
+    /// the callee block to send the data.
+    ///
+    /// If this is a buffered channel, then the buffer is full at this time. If
+    /// this is not a buffered channel, then there is no receiver available to
+    /// acquire the data.
+    Full(T),
+    /// This channel's receiving half has disconnected, so the data could not be
+    /// sent. The data is returned back to the callee in this case.
+    RecvDisconnected(T),
+}
+
+enum Flavor<T> {
+    Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
+    Stream(Arc<UnsafeCell<stream::Packet<T>>>),
+    Shared(Arc<UnsafeCell<shared::Packet<T>>>),
+    Sync(Arc<UnsafeCell<sync::Packet<T>>>),
+}
+
+#[doc(hidden)]
+trait UnsafeFlavor<T> {
+    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
+    unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
+        &mut *self.inner_unsafe().get()
+    }
+    unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
+        &*self.inner_unsafe().get()
+    }
+}
+impl<T> UnsafeFlavor<T> for Sender<T> {
+    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
+        &self.inner
+    }
+}
+impl<T> UnsafeFlavor<T> for Receiver<T> {
+    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
+        &self.inner
+    }
+}
+
+/// Creates a new asynchronous channel, returning the sender/receiver halves.
+///
+/// All data sent on the sender will become available on the receiver, and no
+/// send will block the calling task (this channel has an "infinite buffer").
+///
+/// # Example
+///
+/// ```
+/// // tx is is the sending half (tx for transmission), and rx is the receiving
+/// // half (rx for receiving).
+/// let (tx, rx) = channel();
+///
+/// // Spawn off an expensive computation
+/// spawn(proc() {
+/// #   fn expensive_computation() {}
+///     tx.send(expensive_computation());
+/// });
+///
+/// // Do some useful work for awhile
+///
+/// // Let's see what that answer was
+/// println!("{}", rx.recv());
+/// ```
+#[unstable]
+pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
+    let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
+    (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
+}
+
+/// Creates a new synchronous, bounded channel.
+///
+/// Like asynchronous channels, the `Receiver` will block until a message
+/// becomes available. These channels differ greatly in the semantics of the
+/// sender from asynchronous channels, however.
+///
+/// This channel has an internal buffer on which messages will be queued. When
+/// the internal buffer becomes full, future sends will *block* waiting for the
+/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
+/// becomes  "rendezvous channel" where each send will not return until a recv
+/// is paired with it.
+///
+/// As with asynchronous channels, all senders will panic in `send` if the
+/// `Receiver` has been destroyed.
+///
+/// # Example
+///
+/// ```
+/// let (tx, rx) = sync_channel(1);
+///
+/// // this returns immediately
+/// tx.send(1i);
+///
+/// spawn(proc() {
+///     // this will block until the previous message has been received
+///     tx.send(2i);
+/// });
+///
+/// assert_eq!(rx.recv(), 1i);
+/// assert_eq!(rx.recv(), 2i);
+/// ```
+#[unstable = "this function may be renamed to more accurately reflect the type \
+              of channel that is is creating"]
+pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
+    let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
+    (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Sender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T: Send> Sender<T> {
+    fn new(inner: Flavor<T>) -> Sender<T> {
+        Sender {
+            inner: UnsafeCell::new(inner),
+            _marker: marker::NoSync,
+        }
+    }
+
+    /// Sends a value along this channel to be received by the corresponding
+    /// receiver.
+    ///
+    /// Rust channels are infinitely buffered so this method will never block.
+    ///
+    /// # Panics
+    ///
+    /// This function will panic if the other end of the channel has hung up.
+    /// This means that if the corresponding receiver has fallen out of scope,
+    /// this function will trigger a panic message saying that a message is
+    /// being sent on a closed channel.
+    ///
+    /// Note that if this function does *not* panic, it does not mean that the
+    /// data will be successfully received. All sends are placed into a queue,
+    /// so it is possible for a send to succeed (the other end is alive), but
+    /// then the other end could immediately disconnect.
+    ///
+    /// The purpose of this functionality is to propagate panicks among tasks.
+    /// If a panic is not desired, then consider using the `send_opt` method
+    #[experimental = "this function is being considered candidate for removal \
+                      to adhere to the general guidelines of rust"]
+    pub fn send(&self, t: T) {
+        if self.send_opt(t).is_err() {
+            panic!("sending on a closed channel");
+        }
+    }
+
+    /// Attempts to send a value on this channel, returning it back if it could
+    /// not be sent.
+    ///
+    /// A successful send occurs when it is determined that the other end of
+    /// the channel has not hung up already. An unsuccessful send would be one
+    /// where the corresponding receiver has already been deallocated. Note
+    /// that a return value of `Err` means that the data will never be
+    /// received, but a return value of `Ok` does *not* mean that the data
+    /// will be received.  It is possible for the corresponding receiver to
+    /// hang up immediately after this function returns `Ok`.
+    ///
+    /// Like `send`, this method will never block.
+    ///
+    /// # Panics
+    ///
+    /// This method will never panic, it will return the message back to the
+    /// caller if the other end is disconnected
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// let (tx, rx) = channel();
+    ///
+    /// // This send is always successful
+    /// assert_eq!(tx.send_opt(1i), Ok(()));
+    ///
+    /// // This send will fail because the receiver is gone
+    /// drop(rx);
+    /// assert_eq!(tx.send_opt(1i), Err(1));
+    /// ```
+    #[unstable = "this function may be renamed to send() in the future"]
+    pub fn send_opt(&self, t: T) -> Result<(), T> {
+        let (new_inner, ret) = match *unsafe { self.inner() } {
+            Oneshot(ref p) => {
+                unsafe {
+                    let p = p.get();
+                    if !(*p).sent() {
+                        return (*p).send(t);
+                    } else {
+                        let a = Arc::new(UnsafeCell::new(stream::Packet::new()));
+                        match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
+                            oneshot::UpSuccess => {
+                                let ret = (*a.get()).send(t);
+                                (a, ret)
+                            }
+                            oneshot::UpDisconnected => (a, Err(t)),
+                            oneshot::UpWoke(task) => {
+                                // This send cannot panic because the task is
+                                // asleep (we're looking at it), so the receiver
+                                // can't go away.
+                                (*a.get()).send(t).ok().unwrap();
+                                task.wake().map(|t| t.reawaken());
+                                (a, Ok(()))
+                            }
+                        }
+                    }
+                }
+            }
+            Stream(ref p) => return unsafe { (*p.get()).send(t) },
+            Shared(ref p) => return unsafe { (*p.get()).send(t) },
+            Sync(..) => unreachable!(),
+        };
+
+        unsafe {
+            let tmp = Sender::new(Stream(new_inner));
+            mem::swap(self.inner_mut(), tmp.inner_mut());
+        }
+        return ret;
+    }
+}
+
+#[unstable]
+impl<T: Send> Clone for Sender<T> {
+    fn clone(&self) -> Sender<T> {
+        let (packet, sleeper) = match *unsafe { self.inner() } {
+            Oneshot(ref p) => {
+                let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
+                unsafe {
+                    (*a.get()).postinit_lock();
+                    match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
+                        oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
+                        oneshot::UpWoke(task) => (a, Some(task))
+                    }
+                }
+            }
+            Stream(ref p) => {
+                let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
+                unsafe {
+                    (*a.get()).postinit_lock();
+                    match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
+                        stream::UpSuccess | stream::UpDisconnected => (a, None),
+                        stream::UpWoke(task) => (a, Some(task)),
+                    }
+                }
+            }
+            Shared(ref p) => {
+                unsafe { (*p.get()).clone_chan(); }
+                return Sender::new(Shared(p.clone()));
+            }
+            Sync(..) => unreachable!(),
+        };
+
+        unsafe {
+            (*packet.get()).inherit_blocker(sleeper);
+
+            let tmp = Sender::new(Shared(packet.clone()));
+            mem::swap(self.inner_mut(), tmp.inner_mut());
+        }
+        Sender::new(Shared(packet))
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Sender<T> {
+    fn drop(&mut self) {
+        match *unsafe { self.inner_mut() } {
+            Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+            Sync(..) => unreachable!(),
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// SyncSender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T: Send> SyncSender<T> {
+    fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
+        SyncSender { inner: inner, _marker: marker::NoSync }
+    }
+
+    /// Sends a value on this synchronous channel.
+    ///
+    /// This function will *block* until space in the internal buffer becomes
+    /// available or a receiver is available to hand off the message to.
+    ///
+    /// Note that a successful send does *not* guarantee that the receiver will
+    /// ever see the data if there is a buffer on this channel. Messages may be
+    /// enqueued in the internal buffer for the receiver to receive at a later
+    /// time. If the buffer size is 0, however, it can be guaranteed that the
+    /// receiver has indeed received the data if this function returns success.
+    ///
+    /// # Panics
+    ///
+    /// Similarly to `Sender::send`, this function will panic if the
+    /// corresponding `Receiver` for this channel has disconnected. This
+    /// behavior is used to propagate panics among tasks.
+    ///
+    /// If a panic is not desired, you can achieve the same semantics with the
+    /// `SyncSender::send_opt` method which will not panic if the receiver
+    /// disconnects.
+    #[experimental = "this function is being considered candidate for removal \
+                      to adhere to the general guidelines of rust"]
+    pub fn send(&self, t: T) {
+        if self.send_opt(t).is_err() {
+            panic!("sending on a closed channel");
+        }
+    }
+
+    /// Send a value on a channel, returning it back if the receiver
+    /// disconnected
+    ///
+    /// This method will *block* to send the value `t` on the channel, but if
+    /// the value could not be sent due to the receiver disconnecting, the value
+    /// is returned back to the callee. This function is similar to `try_send`,
+    /// except that it will block if the channel is currently full.
+    ///
+    /// # Panics
+    ///
+    /// This function cannot panic.
+    #[unstable = "this function may be renamed to send() in the future"]
+    pub fn send_opt(&self, t: T) -> Result<(), T> {
+        unsafe { (*self.inner.get()).send(t) }
+    }
+
+    /// Attempts to send a value on this channel without blocking.
+    ///
+    /// This method differs from `send_opt` by returning immediately if the
+    /// channel's buffer is full or no receiver is waiting to acquire some
+    /// data. Compared with `send_opt`, this function has two failure cases
+    /// instead of one (one for disconnection, one for a full buffer).
+    ///
+    /// See `SyncSender::send` for notes about guarantees of whether the
+    /// receiver has received the data or not if this function is successful.
+    ///
+    /// # Panics
+    ///
+    /// This function cannot panic
+    #[unstable = "the return type of this function is candidate for \
+                  modification"]
+    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
+        unsafe { (*self.inner.get()).try_send(t) }
+    }
+}
+
+#[unstable]
+impl<T: Send> Clone for SyncSender<T> {
+    fn clone(&self) -> SyncSender<T> {
+        unsafe { (*self.inner.get()).clone_chan(); }
+        return SyncSender::new(self.inner.clone());
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for SyncSender<T> {
+    fn drop(&mut self) {
+        unsafe { (*self.inner.get()).drop_chan(); }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Receiver
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T: Send> Receiver<T> {
+    fn new(inner: Flavor<T>) -> Receiver<T> {
+        Receiver { inner: UnsafeCell::new(inner), _marker: marker::NoSync }
+    }
+
+    /// Blocks waiting for a value on this receiver
+    ///
+    /// This function will block if necessary to wait for a corresponding send
+    /// on the channel from its paired `Sender` structure. This receiver will
+    /// be woken up when data is ready, and the data will be returned.
+    ///
+    /// # Panics
+    ///
+    /// Similar to channels, this method will trigger a task panic if the
+    /// other end of the channel has hung up (been deallocated). The purpose of
+    /// this is to propagate panicks among tasks.
+    ///
+    /// If a panic is not desired, then there are two options:
+    ///
+    /// * If blocking is still desired, the `recv_opt` method will return `None`
+    ///   when the other end hangs up
+    ///
+    /// * If blocking is not desired, then the `try_recv` method will attempt to
+    ///   peek at a value on this receiver.
+    #[experimental = "this function is being considered candidate for removal \
+                      to adhere to the general guidelines of rust"]
+    pub fn recv(&self) -> T {
+        match self.recv_opt() {
+            Ok(t) => t,
+            Err(()) => panic!("receiving on a closed channel"),
+        }
+    }
+
+    /// Attempts to return a pending value on this receiver without blocking
+    ///
+    /// This method will never block the caller in order to wait for data to
+    /// become available. Instead, this will always return immediately with a
+    /// possible option of pending data on the channel.
+    ///
+    /// This is useful for a flavor of "optimistic check" before deciding to
+    /// block on a receiver.
+    ///
+    /// # Panics
+    ///
+    /// This function cannot panic.
+    #[unstable = "the return type of this function may be altered"]
+    pub fn try_recv(&self) -> Result<T, TryRecvError> {
+        loop {
+            let new_port = match *unsafe { self.inner() } {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(oneshot::Empty) => return Err(Empty),
+                        Err(oneshot::Disconnected) => return Err(Disconnected),
+                        Err(oneshot::Upgraded(rx)) => rx,
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(stream::Empty) => return Err(Empty),
+                        Err(stream::Disconnected) => return Err(Disconnected),
+                        Err(stream::Upgraded(rx)) => rx,
+                    }
+                }
+                Shared(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(shared::Empty) => return Err(Empty),
+                        Err(shared::Disconnected) => return Err(Disconnected),
+                    }
+                }
+                Sync(ref p) => {
+                    match unsafe { (*p.get()).try_recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(sync::Empty) => return Err(Empty),
+                        Err(sync::Disconnected) => return Err(Disconnected),
+                    }
+                }
+            };
+            unsafe {
+                mem::swap(self.inner_mut(),
+                          new_port.inner_mut());
+            }
+        }
+    }
+
+    /// Attempt to wait for a value on this receiver, but does not panic if the
+    /// corresponding channel has hung up.
+    ///
+    /// This implementation of iterators for ports will always block if there is
+    /// not data available on the receiver, but it will not panic in the case
+    /// that the channel has been deallocated.
+    ///
+    /// In other words, this function has the same semantics as the `recv`
+    /// method except for the panic aspect.
+    ///
+    /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
+    /// the value found on the receiver is returned.
+    #[unstable = "this function may be renamed to recv()"]
+    pub fn recv_opt(&self) -> Result<T, ()> {
+        loop {
+            let new_port = match *unsafe { self.inner() } {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(oneshot::Empty) => return unreachable!(),
+                        Err(oneshot::Disconnected) => return Err(()),
+                        Err(oneshot::Upgraded(rx)) => rx,
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(stream::Empty) => return unreachable!(),
+                        Err(stream::Disconnected) => return Err(()),
+                        Err(stream::Upgraded(rx)) => rx,
+                    }
+                }
+                Shared(ref p) => {
+                    match unsafe { (*p.get()).recv() } {
+                        Ok(t) => return Ok(t),
+                        Err(shared::Empty) => return unreachable!(),
+                        Err(shared::Disconnected) => return Err(()),
+                    }
+                }
+                Sync(ref p) => return unsafe { (*p.get()).recv() }
+            };
+            unsafe {
+                mem::swap(self.inner_mut(), new_port.inner_mut());
+            }
+        }
+    }
+
+    /// Returns an iterator which will block waiting for messages, but never
+    /// `panic!`. It will return `None` when the channel has hung up.
+    #[unstable]
+    pub fn iter<'a>(&'a self) -> Messages<'a, T> {
+        Messages { rx: self }
+    }
+}
+
+impl<T: Send> select::Packet for Receiver<T> {
+    fn can_recv(&self) -> bool {
+        loop {
+            let new_port = match *unsafe { self.inner() } {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).can_recv() } {
+                        Ok(ret) => return ret,
+                        Err(upgrade) => upgrade,
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).can_recv() } {
+                        Ok(ret) => return ret,
+                        Err(upgrade) => upgrade,
+                    }
+                }
+                Shared(ref p) => {
+                    return unsafe { (*p.get()).can_recv() };
+                }
+                Sync(ref p) => {
+                    return unsafe { (*p.get()).can_recv() };
+                }
+            };
+            unsafe {
+                mem::swap(self.inner_mut(),
+                          new_port.inner_mut());
+            }
+        }
+    }
+
+    fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
+        loop {
+            let (t, new_port) = match *unsafe { self.inner() } {
+                Oneshot(ref p) => {
+                    match unsafe { (*p.get()).start_selection(task) } {
+                        oneshot::SelSuccess => return Ok(()),
+                        oneshot::SelCanceled(task) => return Err(task),
+                        oneshot::SelUpgraded(t, rx) => (t, rx),
+                    }
+                }
+                Stream(ref p) => {
+                    match unsafe { (*p.get()).start_selection(task) } {
+                        stream::SelSuccess => return Ok(()),
+                        stream::SelCanceled(task) => return Err(task),
+                        stream::SelUpgraded(t, rx) => (t, rx),
+                    }
+                }
+                Shared(ref p) => {
+                    return unsafe { (*p.get()).start_selection(task) };
+                }
+                Sync(ref p) => {
+                    return unsafe { (*p.get()).start_selection(task) };
+                }
+            };
+            task = t;
+            unsafe {
+                mem::swap(self.inner_mut(),
+                          new_port.inner_mut());
+            }
+        }
+    }
+
+    fn abort_selection(&self) -> bool {
+        let mut was_upgrade = false;
+        loop {
+            let result = match *unsafe { self.inner() } {
+                Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
+                Stream(ref p) => unsafe {
+                    (*p.get()).abort_selection(was_upgrade)
+                },
+                Shared(ref p) => return unsafe {
+                    (*p.get()).abort_selection(was_upgrade)
+                },
+                Sync(ref p) => return unsafe {
+                    (*p.get()).abort_selection()
+                },
+            };
+            let new_port = match result { Ok(b) => return b, Err(p) => p };
+            was_upgrade = true;
+            unsafe {
+                mem::swap(self.inner_mut(),
+                          new_port.inner_mut());
+            }
+        }
+    }
+}
+
+#[unstable]
+impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
+    fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Receiver<T> {
+    fn drop(&mut self) {
+        match *unsafe { self.inner_mut() } {
+            Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
+            Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+
+    use os;
+    use super::*;
+
+    pub fn stress_factor() -> uint {
+        match os::getenv("RUST_TEST_STRESS") {
+            Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
+            None => 1,
+        }
+    }
+
+    test!(fn smoke() {
+        let (tx, rx) = channel::<int>();
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn drop_full() {
+        let (tx, _rx) = channel();
+        tx.send(box 1i);
+    })
+
+    test!(fn drop_full_shared() {
+        let (tx, _rx) = channel();
+        drop(tx.clone());
+        drop(tx.clone());
+        tx.send(box 1i);
+    })
+
+    test!(fn smoke_shared() {
+        let (tx, rx) = channel::<int>();
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+        let tx = tx.clone();
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_threads() {
+        let (tx, rx) = channel::<int>();
+        spawn(proc() {
+            tx.send(1);
+        });
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_port_gone() {
+        let (tx, rx) = channel::<int>();
+        drop(rx);
+        tx.send(1);
+    } #[should_fail])
+
+    test!(fn smoke_shared_port_gone() {
+        let (tx, rx) = channel::<int>();
+        drop(rx);
+        tx.send(1);
+    } #[should_fail])
+
+    test!(fn smoke_shared_port_gone2() {
+        let (tx, rx) = channel::<int>();
+        drop(rx);
+        let tx2 = tx.clone();
+        drop(tx);
+        tx2.send(1);
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent() {
+        let (tx, rx) = channel::<int>();
+        spawn(proc() {
+            rx.recv();
+        });
+        loop { tx.send(1) }
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent_shared() {
+        let (tx, rx) = channel::<int>();
+        let tx2 = tx.clone();
+        spawn(proc() {
+            rx.recv();
+        });
+        loop {
+            tx.send(1);
+            tx2.send(1);
+        }
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone() {
+        let (tx, rx) = channel::<int>();
+        drop(tx);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone_shared() {
+        let (tx, rx) = channel::<()>();
+        let tx2 = tx.clone();
+        drop(tx);
+        drop(tx2);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn chan_gone_concurrent() {
+        let (tx, rx) = channel::<int>();
+        spawn(proc() {
+            tx.send(1);
+            tx.send(1);
+        });
+        loop { rx.recv(); }
+    } #[should_fail])
+
+    test!(fn stress() {
+        let (tx, rx) = channel::<int>();
+        spawn(proc() {
+            for _ in range(0u, 10000) { tx.send(1i); }
+        });
+        for _ in range(0u, 10000) {
+            assert_eq!(rx.recv(), 1);
+        }
+    })
+
+    test!(fn stress_shared() {
+        static AMT: uint = 10000;
+        static NTHREADS: uint = 8;
+        let (tx, rx) = channel::<int>();
+        let (dtx, drx) = channel::<()>();
+
+        spawn(proc() {
+            for _ in range(0, AMT * NTHREADS) {
+                assert_eq!(rx.recv(), 1);
+            }
+            match rx.try_recv() {
+                Ok(..) => panic!(),
+                _ => {}
+            }
+            dtx.send(());
+        });
+
+        for _ in range(0, NTHREADS) {
+            let tx = tx.clone();
+            spawn(proc() {
+                for _ in range(0, AMT) { tx.send(1); }
+            });
+        }
+        drop(tx);
+        drx.recv();
+    })
+
+    #[test]
+    fn send_from_outside_runtime() {
+        let (tx1, rx1) = channel::<()>();
+        let (tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+        let tx4 = tx3.clone();
+        spawn(proc() {
+            tx1.send(());
+            for _ in range(0i, 40) {
+                assert_eq!(rx2.recv(), 1);
+            }
+            tx3.send(());
+        });
+        rx1.recv();
+        spawn(proc() {
+            for _ in range(0i, 40) {
+                tx2.send(1);
+            }
+            tx4.send(());
+        });
+        rx3.recv();
+        rx3.recv();
+    }
+
+    #[test]
+    fn recv_from_outside_runtime() {
+        let (tx, rx) = channel::<int>();
+        let (dtx, drx) = channel();
+        spawn(proc() {
+            for _ in range(0i, 40) {
+                assert_eq!(rx.recv(), 1);
+            }
+            dtx.send(());
+        });
+        for _ in range(0u, 40) {
+            tx.send(1);
+        }
+        drx.recv();
+    }
+
+    #[test]
+    fn no_runtime() {
+        let (tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+        let tx4 = tx3.clone();
+        spawn(proc() {
+            assert_eq!(rx1.recv(), 1);
+            tx2.send(2);
+            tx4.send(());
+        });
+        spawn(proc() {
+            tx1.send(1);
+            assert_eq!(rx2.recv(), 2);
+            tx3.send(());
+        });
+        rx3.recv();
+        rx3.recv();
+    }
+
+    test!(fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        let (_tx, rx) = channel::<int>();
+        drop(rx);
+    })
+
+    test!(fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        let (tx, _rx) = channel::<int>();
+        drop(tx);
+    })
+
+    test!(fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        let (tx, rx) = channel::<Box<int>>();
+        drop(rx);
+        tx.send(box 0);
+    } #[should_fail])
+
+    test!(fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will panic
+        let res = task::try(proc() {
+            let (tx, rx) = channel::<int>();
+            drop(tx);
+            rx.recv();
+        });
+        // What is our res?
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_single_thread_send_then_recv() {
+        let (tx, rx) = channel::<Box<int>>();
+        tx.send(box 10);
+        assert!(rx.recv() == box 10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_open() {
+        let (tx, rx) = channel::<int>();
+        assert!(tx.send_opt(10).is_ok());
+        assert!(rx.recv() == 10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_closed() {
+        let (tx, rx) = channel::<int>();
+        drop(rx);
+        assert!(tx.send_opt(10).is_err());
+    })
+
+    test!(fn oneshot_single_thread_try_recv_open() {
+        let (tx, rx) = channel::<int>();
+        tx.send(10);
+        assert!(rx.recv_opt() == Ok(10));
+    })
+
+    test!(fn oneshot_single_thread_try_recv_closed() {
+        let (tx, rx) = channel::<int>();
+        drop(tx);
+        assert!(rx.recv_opt() == Err(()));
+    })
+
+    test!(fn oneshot_single_thread_peek_data() {
+        let (tx, rx) = channel::<int>();
+        assert_eq!(rx.try_recv(), Err(Empty))
+        tx.send(10);
+        assert_eq!(rx.try_recv(), Ok(10));
+    })
+
+    test!(fn oneshot_single_thread_peek_close() {
+        let (tx, rx) = channel::<int>();
+        drop(tx);
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+    })
+
+    test!(fn oneshot_single_thread_peek_open() {
+        let (_tx, rx) = channel::<int>();
+        assert_eq!(rx.try_recv(), Err(Empty));
+    })
+
+    test!(fn oneshot_multi_task_recv_then_send() {
+        let (tx, rx) = channel::<Box<int>>();
+        spawn(proc() {
+            assert!(rx.recv() == box 10);
+        });
+
+        tx.send(box 10);
+    })
+
+    test!(fn oneshot_multi_task_recv_then_close() {
+        let (tx, rx) = channel::<Box<int>>();
+        spawn(proc() {
+            drop(tx);
+        });
+        let res = task::try(proc() {
+            assert!(rx.recv() == box 10);
+        });
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_multi_thread_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = channel::<int>();
+            spawn(proc() {
+                drop(rx);
+            });
+            drop(tx);
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = channel::<int>();
+            spawn(proc() {
+                drop(rx);
+            });
+            let _ = task::try(proc() {
+                tx.send(1);
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_recv_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = channel::<int>();
+            spawn(proc() {
+                let res = task::try(proc() {
+                    rx.recv();
+                });
+                assert!(res.is_err());
+            });
+            spawn(proc() {
+                spawn(proc() {
+                    drop(tx);
+                });
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = channel();
+            spawn(proc() {
+                tx.send(box 10i);
+            });
+            spawn(proc() {
+                assert!(rx.recv() == box 10i);
+            });
+        }
+    })
+
+    test!(fn stream_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = channel();
+
+            send(tx, 0);
+            recv(rx, 0);
+
+            fn send(tx: Sender<Box<int>>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    tx.send(box i);
+                    send(tx, i + 1);
+                });
+            }
+
+            fn recv(rx: Receiver<Box<int>>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    assert!(rx.recv() == box i);
+                    recv(rx, i + 1);
+                });
+            }
+        }
+    })
+
+    test!(fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        let (tx, rx) = channel();
+        for _ in range(0i, 10000) { tx.send(()); }
+        for _ in range(0i, 10000) { rx.recv(); }
+    })
+
+    test!(fn shared_chan_stress() {
+        let (tx, rx) = channel();
+        let total = stress_factor() + 100;
+        for _ in range(0, total) {
+            let tx = tx.clone();
+            spawn(proc() {
+                tx.send(());
+            });
+        }
+
+        for _ in range(0, total) {
+            rx.recv();
+        }
+    })
+
+    test!(fn test_nested_recv_iter() {
+        let (tx, rx) = channel::<int>();
+        let (total_tx, total_rx) = channel::<int>();
+
+        spawn(proc() {
+            let mut acc = 0;
+            for x in rx.iter() {
+                acc += x;
+            }
+            total_tx.send(acc);
+        });
+
+        tx.send(3);
+        tx.send(1);
+        tx.send(2);
+        drop(tx);
+        assert_eq!(total_rx.recv(), 6);
+    })
+
+    test!(fn test_recv_iter_break() {
+        let (tx, rx) = channel::<int>();
+        let (count_tx, count_rx) = channel();
+
+        spawn(proc() {
+            let mut count = 0;
+            for x in rx.iter() {
+                if count >= 3 {
+                    break;
+                } else {
+                    count += x;
+                }
+            }
+            count_tx.send(count);
+        });
+
+        tx.send(2);
+        tx.send(2);
+        tx.send(2);
+        let _ = tx.send_opt(2);
+        drop(tx);
+        assert_eq!(count_rx.recv(), 4);
+    })
+
+    test!(fn try_recv_states() {
+        let (tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<()>();
+        let (tx3, rx3) = channel::<()>();
+        spawn(proc() {
+            rx2.recv();
+            tx1.send(1);
+            tx3.send(());
+            rx2.recv();
+            drop(tx1);
+            tx3.send(());
+        });
+
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Ok(1));
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Err(Disconnected));
+    })
+
+    // This bug used to end up in a livelock inside of the Receiver destructor
+    // because the internal state of the Shared packet was corrupted
+    test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+        let (tx, rx) = channel();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            rx.recv(); // wait on a oneshot
+            drop(rx);  // destroy a shared
+            tx2.send(());
+        });
+        // make sure the other task has gone to sleep
+        for _ in range(0u, 5000) { task::deschedule(); }
+
+        // upgrade to a shared chan and send a message
+        let t = tx.clone();
+        drop(tx);
+        t.send(());
+
+        // wait for the child task to exit before we exit
+        rx2.recv();
+    })
+
+    test!(fn sends_off_the_runtime() {
+        use rustrt::thread::Thread;
+
+        let (tx, rx) = channel();
+        let t = Thread::start(proc() {
+            for _ in range(0u, 1000) {
+                tx.send(());
+            }
+        });
+        for _ in range(0u, 1000) {
+            rx.recv();
+        }
+        t.join();
+    })
+
+    test!(fn try_recvs_off_the_runtime() {
+        use rustrt::thread::Thread;
+
+        let (tx, rx) = channel();
+        let (cdone, pdone) = channel();
+        let t = Thread::start(proc() {
+            let mut hits = 0u;
+            while hits < 10 {
+                match rx.try_recv() {
+                    Ok(()) => { hits += 1; }
+                    Err(Empty) => { Thread::yield_now(); }
+                    Err(Disconnected) => return,
+                }
+            }
+            cdone.send(());
+        });
+        for _ in range(0u, 10) {
+            tx.send(());
+        }
+        t.join();
+        pdone.recv();
+    })
+}
+
+#[cfg(test)]
+mod sync_tests {
+    use prelude::*;
+    use os;
+
+    pub fn stress_factor() -> uint {
+        match os::getenv("RUST_TEST_STRESS") {
+            Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
+            None => 1,
+        }
+    }
+
+    test!(fn smoke() {
+        let (tx, rx) = sync_channel::<int>(1);
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn drop_full() {
+        let (tx, _rx) = sync_channel(1);
+        tx.send(box 1i);
+    })
+
+    test!(fn smoke_shared() {
+        let (tx, rx) = sync_channel::<int>(1);
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+        let tx = tx.clone();
+        tx.send(1);
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_threads() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            tx.send(1);
+        });
+        assert_eq!(rx.recv(), 1);
+    })
+
+    test!(fn smoke_port_gone() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+        tx.send(1);
+    } #[should_fail])
+
+    test!(fn smoke_shared_port_gone2() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+        let tx2 = tx.clone();
+        drop(tx);
+        tx2.send(1);
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            rx.recv();
+        });
+        loop { tx.send(1) }
+    } #[should_fail])
+
+    test!(fn port_gone_concurrent_shared() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let tx2 = tx.clone();
+        spawn(proc() {
+            rx.recv();
+        });
+        loop {
+            tx.send(1);
+            tx2.send(1);
+        }
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn smoke_chan_gone_shared() {
+        let (tx, rx) = sync_channel::<()>(0);
+        let tx2 = tx.clone();
+        drop(tx);
+        drop(tx2);
+        rx.recv();
+    } #[should_fail])
+
+    test!(fn chan_gone_concurrent() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            tx.send(1);
+            tx.send(1);
+        });
+        loop { rx.recv(); }
+    } #[should_fail])
+
+    test!(fn stress() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            for _ in range(0u, 10000) { tx.send(1); }
+        });
+        for _ in range(0u, 10000) {
+            assert_eq!(rx.recv(), 1);
+        }
+    })
+
+    test!(fn stress_shared() {
+        static AMT: uint = 1000;
+        static NTHREADS: uint = 8;
+        let (tx, rx) = sync_channel::<int>(0);
+        let (dtx, drx) = sync_channel::<()>(0);
+
+        spawn(proc() {
+            for _ in range(0, AMT * NTHREADS) {
+                assert_eq!(rx.recv(), 1);
+            }
+            match rx.try_recv() {
+                Ok(..) => panic!(),
+                _ => {}
+            }
+            dtx.send(());
+        });
+
+        for _ in range(0, NTHREADS) {
+            let tx = tx.clone();
+            spawn(proc() {
+                for _ in range(0, AMT) { tx.send(1); }
+            });
+        }
+        drop(tx);
+        drx.recv();
+    })
+
+    test!(fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        let (_tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+    })
+
+    test!(fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        let (tx, _rx) = sync_channel::<int>(0);
+        drop(tx);
+    })
+
+    test!(fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        let (tx, rx) = sync_channel::<Box<int>>(0);
+        drop(rx);
+        tx.send(box 0);
+    } #[should_fail])
+
+    test!(fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will panic
+        let res = task::try(proc() {
+            let (tx, rx) = sync_channel::<int>(0);
+            drop(tx);
+            rx.recv();
+        });
+        // What is our res?
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_single_thread_send_then_recv() {
+        let (tx, rx) = sync_channel::<Box<int>>(1);
+        tx.send(box 10);
+        assert!(rx.recv() == box 10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_open() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(tx.try_send(10), Ok(()));
+        assert!(rx.recv() == 10);
+    })
+
+    test!(fn oneshot_single_thread_try_send_closed() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(rx);
+        assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
+    })
+
+    test!(fn oneshot_single_thread_try_send_closed2() {
+        let (tx, _rx) = sync_channel::<int>(0);
+        assert_eq!(tx.try_send(10), Err(Full(10)));
+    })
+
+    test!(fn oneshot_single_thread_try_recv_open() {
+        let (tx, rx) = sync_channel::<int>(1);
+        tx.send(10);
+        assert!(rx.recv_opt() == Ok(10));
+    })
+
+    test!(fn oneshot_single_thread_try_recv_closed() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        assert!(rx.recv_opt() == Err(()));
+    })
+
+    test!(fn oneshot_single_thread_peek_data() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(rx.try_recv(), Err(Empty))
+        tx.send(10);
+        assert_eq!(rx.try_recv(), Ok(10));
+    })
+
+    test!(fn oneshot_single_thread_peek_close() {
+        let (tx, rx) = sync_channel::<int>(0);
+        drop(tx);
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+        assert_eq!(rx.try_recv(), Err(Disconnected));
+    })
+
+    test!(fn oneshot_single_thread_peek_open() {
+        let (_tx, rx) = sync_channel::<int>(0);
+        assert_eq!(rx.try_recv(), Err(Empty));
+    })
+
+    test!(fn oneshot_multi_task_recv_then_send() {
+        let (tx, rx) = sync_channel::<Box<int>>(0);
+        spawn(proc() {
+            assert!(rx.recv() == box 10);
+        });
+
+        tx.send(box 10);
+    })
+
+    test!(fn oneshot_multi_task_recv_then_close() {
+        let (tx, rx) = sync_channel::<Box<int>>(0);
+        spawn(proc() {
+            drop(tx);
+        });
+        let res = task::try(proc() {
+            assert!(rx.recv() == box 10);
+        });
+        assert!(res.is_err());
+    })
+
+    test!(fn oneshot_multi_thread_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                drop(rx);
+            });
+            drop(tx);
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                drop(rx);
+            });
+            let _ = task::try(proc() {
+                tx.send(1);
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_recv_close_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<int>(0);
+            spawn(proc() {
+                let res = task::try(proc() {
+                    rx.recv();
+                });
+                assert!(res.is_err());
+            });
+            spawn(proc() {
+                spawn(proc() {
+                    drop(tx);
+                });
+            });
+        }
+    })
+
+    test!(fn oneshot_multi_thread_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<Box<int>>(0);
+            spawn(proc() {
+                tx.send(box 10i);
+            });
+            spawn(proc() {
+                assert!(rx.recv() == box 10i);
+            });
+        }
+    })
+
+    test!(fn stream_send_recv_stress() {
+        for _ in range(0, stress_factor()) {
+            let (tx, rx) = sync_channel::<Box<int>>(0);
+
+            send(tx, 0);
+            recv(rx, 0);
+
+            fn send(tx: SyncSender<Box<int>>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    tx.send(box i);
+                    send(tx, i + 1);
+                });
+            }
+
+            fn recv(rx: Receiver<Box<int>>, i: int) {
+                if i == 10 { return }
+
+                spawn(proc() {
+                    assert!(rx.recv() == box i);
+                    recv(rx, i + 1);
+                });
+            }
+        }
+    })
+
+    test!(fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        let (tx, rx) = sync_channel(10000);
+        for _ in range(0u, 10000) { tx.send(()); }
+        for _ in range(0u, 10000) { rx.recv(); }
+    })
+
+    test!(fn shared_chan_stress() {
+        let (tx, rx) = sync_channel(0);
+        let total = stress_factor() + 100;
+        for _ in range(0, total) {
+            let tx = tx.clone();
+            spawn(proc() {
+                tx.send(());
+            });
+        }
+
+        for _ in range(0, total) {
+            rx.recv();
+        }
+    })
+
+    test!(fn test_nested_recv_iter() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let (total_tx, total_rx) = sync_channel::<int>(0);
+
+        spawn(proc() {
+            let mut acc = 0;
+            for x in rx.iter() {
+                acc += x;
+            }
+            total_tx.send(acc);
+        });
+
+        tx.send(3);
+        tx.send(1);
+        tx.send(2);
+        drop(tx);
+        assert_eq!(total_rx.recv(), 6);
+    })
+
+    test!(fn test_recv_iter_break() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let (count_tx, count_rx) = sync_channel(0);
+
+        spawn(proc() {
+            let mut count = 0;
+            for x in rx.iter() {
+                if count >= 3 {
+                    break;
+                } else {
+                    count += x;
+                }
+            }
+            count_tx.send(count);
+        });
+
+        tx.send(2);
+        tx.send(2);
+        tx.send(2);
+        let _ = tx.try_send(2);
+        drop(tx);
+        assert_eq!(count_rx.recv(), 4);
+    })
+
+    test!(fn try_recv_states() {
+        let (tx1, rx1) = sync_channel::<int>(1);
+        let (tx2, rx2) = sync_channel::<()>(1);
+        let (tx3, rx3) = sync_channel::<()>(1);
+        spawn(proc() {
+            rx2.recv();
+            tx1.send(1);
+            tx3.send(());
+            rx2.recv();
+            drop(tx1);
+            tx3.send(());
+        });
+
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Ok(1));
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        tx2.send(());
+        rx3.recv();
+        assert_eq!(rx1.try_recv(), Err(Disconnected));
+    })
+
+    // This bug used to end up in a livelock inside of the Receiver destructor
+    // because the internal state of the Shared packet was corrupted
+    test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
+        let (tx, rx) = sync_channel::<()>(0);
+        let (tx2, rx2) = sync_channel::<()>(0);
+        spawn(proc() {
+            rx.recv(); // wait on a oneshot
+            drop(rx);  // destroy a shared
+            tx2.send(());
+        });
+        // make sure the other task has gone to sleep
+        for _ in range(0u, 5000) { task::deschedule(); }
+
+        // upgrade to a shared chan and send a message
+        let t = tx.clone();
+        drop(tx);
+        t.send(());
+
+        // wait for the child task to exit before we exit
+        rx2.recv();
+    })
+
+    test!(fn try_recvs_off_the_runtime() {
+        use rustrt::thread::Thread;
+
+        let (tx, rx) = sync_channel::<()>(0);
+        let (cdone, pdone) = channel();
+        let t = Thread::start(proc() {
+            let mut hits = 0u;
+            while hits < 10 {
+                match rx.try_recv() {
+                    Ok(()) => { hits += 1; }
+                    Err(Empty) => { Thread::yield_now(); }
+                    Err(Disconnected) => return,
+                }
+            }
+            cdone.send(());
+        });
+        for _ in range(0u, 10) {
+            tx.send(());
+        }
+        t.join();
+        pdone.recv();
+    })
+
+    test!(fn send_opt1() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() { rx.recv(); });
+        assert_eq!(tx.send_opt(1), Ok(()));
+    })
+
+    test!(fn send_opt2() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() { drop(rx); });
+        assert_eq!(tx.send_opt(1), Err(1));
+    })
+
+    test!(fn send_opt3() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(tx.send_opt(1), Ok(()));
+        spawn(proc() { drop(rx); });
+        assert_eq!(tx.send_opt(1), Err(1));
+    })
+
+    test!(fn send_opt4() {
+        let (tx, rx) = sync_channel::<int>(0);
+        let tx2 = tx.clone();
+        let (done, donerx) = channel();
+        let done2 = done.clone();
+        spawn(proc() {
+            assert_eq!(tx.send_opt(1), Err(1));
+            done.send(());
+        });
+        spawn(proc() {
+            assert_eq!(tx2.send_opt(2), Err(2));
+            done2.send(());
+        });
+        drop(rx);
+        donerx.recv();
+        donerx.recv();
+    })
+
+    test!(fn try_send1() {
+        let (tx, _rx) = sync_channel::<int>(0);
+        assert_eq!(tx.try_send(1), Err(Full(1)));
+    })
+
+    test!(fn try_send2() {
+        let (tx, _rx) = sync_channel::<int>(1);
+        assert_eq!(tx.try_send(1), Ok(()));
+        assert_eq!(tx.try_send(1), Err(Full(1)));
+    })
+
+    test!(fn try_send3() {
+        let (tx, rx) = sync_channel::<int>(1);
+        assert_eq!(tx.try_send(1), Ok(()));
+        drop(rx);
+        assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
+    })
+
+    test!(fn try_send4() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            for _ in range(0u, 1000) { task::deschedule(); }
+            assert_eq!(tx.try_send(1), Ok(()));
+        });
+        assert_eq!(rx.recv(), 1);
+    } #[ignore(reason = "flaky on libnative")])
+
+    test!(fn issue_15761() {
+        fn repro() {
+            let (tx1, rx1) = sync_channel::<()>(3);
+            let (tx2, rx2) = sync_channel::<()>(3);
+
+            spawn(proc() {
+                rx1.recv();
+                tx2.try_send(()).unwrap();
+            });
+
+            tx1.try_send(()).unwrap();
+            rx2.recv();
+        }
+
+        for _ in range(0u, 100) {
+            repro()
+        }
+    })
+}
diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs
new file mode 100644
index 00000000000..bc34c3e8c52
--- /dev/null
+++ b/src/libstd/comm/oneshot.rs
@@ -0,0 +1,377 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Oneshot channels/ports
+///
+/// This is the initial flavor of channels/ports used for comm module. This is
+/// an optimization for the one-use case of a channel. The major optimization of
+/// this type is to have one and exactly one allocation when the chan/port pair
+/// is created.
+///
+/// Another possible optimization would be to not use an Arc box because
+/// in theory we know when the shared packet can be deallocated (no real need
+/// for the atomic reference counting), but I was having trouble how to destroy
+/// the data early in a drop of a Port.
+///
+/// # Implementation
+///
+/// Oneshots are implemented around one atomic uint variable. This variable
+/// indicates both the state of the port/chan but also contains any tasks
+/// blocked on the port. All atomic operations happen on this one word.
+///
+/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
+/// on behalf of the channel side of things (it can be mentally thought of as
+/// consuming the port). This upgrade is then also stored in the shared packet.
+/// The one caveat to consider is that when a port sees a disconnected channel
+/// it must check for data because there is no "data plus upgrade" state.
+
+pub use self::Failure::*;
+pub use self::UpgradeResult::*;
+pub use self::SelectionResult::*;
+use self::MyUpgrade::*;
+
+use core::prelude::*;
+
+use alloc::boxed::Box;
+use core::mem;
+use rustrt::local::Local;
+use rustrt::task::{Task, BlockedTask};
+
+use sync::atomic;
+use comm::Receiver;
+
+// Various states you can find a port in.
+const EMPTY: uint = 0;
+const DATA: uint = 1;
+const DISCONNECTED: uint = 2;
+
+pub struct Packet<T> {
+    // Internal state of the chan/port pair (stores the blocked task as well)
+    state: atomic::AtomicUint,
+    // One-shot data slot location
+    data: Option<T>,
+    // when used for the second time, a oneshot channel must be upgraded, and
+    // this contains the slot for the upgrade
+    upgrade: MyUpgrade<T>,
+}
+
+pub enum Failure<T> {
+    Empty,
+    Disconnected,
+    Upgraded(Receiver<T>),
+}
+
+pub enum UpgradeResult {
+    UpSuccess,
+    UpDisconnected,
+    UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+    SelCanceled(BlockedTask),
+    SelUpgraded(BlockedTask, Receiver<T>),
+    SelSuccess,
+}
+
+enum MyUpgrade<T> {
+    NothingSent,
+    SendUsed,
+    GoUp(Receiver<T>),
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new() -> Packet<T> {
+        Packet {
+            data: None,
+            upgrade: NothingSent,
+            state: atomic::AtomicUint::new(EMPTY),
+        }
+    }
+
+    pub fn send(&mut self, t: T) -> Result<(), T> {
+        // Sanity check
+        match self.upgrade {
+            NothingSent => {}
+            _ => panic!("sending on a oneshot that's already sent on "),
+        }
+        assert!(self.data.is_none());
+        self.data = Some(t);
+        self.upgrade = SendUsed;
+
+        match self.state.swap(DATA, atomic::SeqCst) {
+            // Sent the data, no one was waiting
+            EMPTY => Ok(()),
+
+            // Couldn't send the data, the port hung up first. Return the data
+            // back up the stack.
+            DISCONNECTED => {
+                Err(self.data.take().unwrap())
+            }
+
+            // Not possible, these are one-use channels
+            DATA => unreachable!(),
+
+            // Anything else means that there was a task waiting on the other
+            // end. We leave the 'DATA' state inside so it'll pick it up on the
+            // other end.
+            n => unsafe {
+                let t = BlockedTask::cast_from_uint(n);
+                t.wake().map(|t| t.reawaken());
+                Ok(())
+            }
+        }
+    }
+
+    // Just tests whether this channel has been sent on or not, this is only
+    // safe to use from the sender.
+    pub fn sent(&self) -> bool {
+        match self.upgrade {
+            NothingSent => false,
+            _ => true,
+        }
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure<T>> {
+        // Attempt to not block the task (it's a little expensive). If it looks
+        // like we're not empty, then immediately go through to `try_recv`.
+        if self.state.load(atomic::SeqCst) == EMPTY {
+            let t: Box<Task> = Local::take();
+            t.deschedule(1, |task| {
+                let n = unsafe { task.cast_to_uint() };
+                match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
+                    // Nothing on the channel, we legitimately block
+                    EMPTY => Ok(()),
+
+                    // If there's data or it's a disconnected channel, then we
+                    // failed the cmpxchg, so we just wake ourselves back up
+                    DATA | DISCONNECTED => {
+                        unsafe { Err(BlockedTask::cast_from_uint(n)) }
+                    }
+
+                    // Only one thread is allowed to sleep on this port
+                    _ => unreachable!()
+                }
+            });
+        }
+
+        self.try_recv()
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+        match self.state.load(atomic::SeqCst) {
+            EMPTY => Err(Empty),
+
+            // We saw some data on the channel, but the channel can be used
+            // again to send us an upgrade. As a result, we need to re-insert
+            // into the channel that there's no data available (otherwise we'll
+            // just see DATA next time). This is done as a cmpxchg because if
+            // the state changes under our feet we'd rather just see that state
+            // change.
+            DATA => {
+                self.state.compare_and_swap(DATA, EMPTY, atomic::SeqCst);
+                match self.data.take() {
+                    Some(data) => Ok(data),
+                    None => unreachable!(),
+                }
+            }
+
+            // There's no guarantee that we receive before an upgrade happens,
+            // and an upgrade flags the channel as disconnected, so when we see
+            // this we first need to check if there's data available and *then*
+            // we go through and process the upgrade.
+            DISCONNECTED => {
+                match self.data.take() {
+                    Some(data) => Ok(data),
+                    None => {
+                        match mem::replace(&mut self.upgrade, SendUsed) {
+                            SendUsed | NothingSent => Err(Disconnected),
+                            GoUp(upgrade) => Err(Upgraded(upgrade))
+                        }
+                    }
+                }
+            }
+            _ => unreachable!()
+        }
+    }
+
+    // Returns whether the upgrade was completed. If the upgrade wasn't
+    // completed, then the port couldn't get sent to the other half (it will
+    // never receive it).
+    pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
+        let prev = match self.upgrade {
+            NothingSent => NothingSent,
+            SendUsed => SendUsed,
+            _ => panic!("upgrading again"),
+        };
+        self.upgrade = GoUp(up);
+
+        match self.state.swap(DISCONNECTED, atomic::SeqCst) {
+            // If the channel is empty or has data on it, then we're good to go.
+            // Senders will check the data before the upgrade (in case we
+            // plastered over the DATA state).
+            DATA | EMPTY => UpSuccess,
+
+            // If the other end is already disconnected, then we failed the
+            // upgrade. Be sure to trash the port we were given.
+            DISCONNECTED => { self.upgrade = prev; UpDisconnected }
+
+            // If someone's waiting, we gotta wake them up
+            n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
+        }
+    }
+
+    pub fn drop_chan(&mut self) {
+        match self.state.swap(DISCONNECTED, atomic::SeqCst) {
+            DATA | DISCONNECTED | EMPTY => {}
+
+            // If someone's waiting, we gotta wake them up
+            n => unsafe {
+                let t = BlockedTask::cast_from_uint(n);
+                t.wake().map(|t| t.reawaken());
+            }
+        }
+    }
+
+    pub fn drop_port(&mut self) {
+        match self.state.swap(DISCONNECTED, atomic::SeqCst) {
+            // An empty channel has nothing to do, and a remotely disconnected
+            // channel also has nothing to do b/c we're about to run the drop
+            // glue
+            DISCONNECTED | EMPTY => {}
+
+            // There's data on the channel, so make sure we destroy it promptly.
+            // This is why not using an arc is a little difficult (need the box
+            // to stay valid while we take the data).
+            DATA => { self.data.take().unwrap(); }
+
+            // We're the only ones that can block on this port
+            _ => unreachable!()
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // If Ok, the value is whether this port has data, if Err, then the upgraded
+    // port needs to be checked instead of this one.
+    pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
+        match self.state.load(atomic::SeqCst) {
+            EMPTY => Ok(false), // Welp, we tried
+            DATA => Ok(true),   // we have some un-acquired data
+            DISCONNECTED if self.data.is_some() => Ok(true), // we have data
+            DISCONNECTED => {
+                match mem::replace(&mut self.upgrade, SendUsed) {
+                    // The other end sent us an upgrade, so we need to
+                    // propagate upwards whether the upgrade can receive
+                    // data
+                    GoUp(upgrade) => Err(upgrade),
+
+                    // If the other end disconnected without sending an
+                    // upgrade, then we have data to receive (the channel is
+                    // disconnected).
+                    up => { self.upgrade = up; Ok(true) }
+                }
+            }
+            _ => unreachable!(), // we're the "one blocker"
+        }
+    }
+
+    // Attempts to start selection on this port. This can either succeed, fail
+    // because there is data, or fail because there is an upgrade pending.
+    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+        let n = unsafe { task.cast_to_uint() };
+        match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) {
+            EMPTY => SelSuccess,
+            DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
+            DISCONNECTED if self.data.is_some() => {
+                SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+            }
+            DISCONNECTED => {
+                match mem::replace(&mut self.upgrade, SendUsed) {
+                    // The other end sent us an upgrade, so we need to
+                    // propagate upwards whether the upgrade can receive
+                    // data
+                    GoUp(upgrade) => {
+                        SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
+                                    upgrade)
+                    }
+
+                    // If the other end disconnected without sending an
+                    // upgrade, then we have data to receive (the channel is
+                    // disconnected).
+                    up => {
+                        self.upgrade = up;
+                        SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
+                    }
+                }
+            }
+            _ => unreachable!(), // we're the "one blocker"
+        }
+    }
+
+    // Remove a previous selecting task from this port. This ensures that the
+    // blocked task will no longer be visible to any other threads.
+    //
+    // The return value indicates whether there's data on this port.
+    pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
+        let state = match self.state.load(atomic::SeqCst) {
+            // Each of these states means that no further activity will happen
+            // with regard to abortion selection
+            s @ EMPTY |
+            s @ DATA |
+            s @ DISCONNECTED => s,
+
+            // If we've got a blocked task, then use an atomic to gain ownership
+            // of it (may fail)
+            n => self.state.compare_and_swap(n, EMPTY, atomic::SeqCst)
+        };
+
+        // Now that we've got ownership of our state, figure out what to do
+        // about it.
+        match state {
+            EMPTY => unreachable!(),
+            // our task used for select was stolen
+            DATA => Ok(true),
+
+            // If the other end has hung up, then we have complete ownership
+            // of the port. First, check if there was data waiting for us. This
+            // is possible if the other end sent something and then hung up.
+            //
+            // We then need to check to see if there was an upgrade requested,
+            // and if so, the upgraded port needs to have its selection aborted.
+            DISCONNECTED => {
+                if self.data.is_some() {
+                    Ok(true)
+                } else {
+                    match mem::replace(&mut self.upgrade, SendUsed) {
+                        GoUp(port) => Err(port),
+                        _ => Ok(true),
+                    }
+                }
+            }
+
+            // We woke ourselves up from select. Assert that the task should be
+            // trashed and returned that we don't have any data.
+            n => {
+                let t = unsafe { BlockedTask::cast_from_uint(n) };
+                t.trash();
+                Ok(false)
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        assert_eq!(self.state.load(atomic::SeqCst), DISCONNECTED);
+    }
+}
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
new file mode 100644
index 00000000000..621556f75ce
--- /dev/null
+++ b/src/libstd/comm/select.rs
@@ -0,0 +1,711 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Selection over an array of receivers
+//!
+//! This module contains the implementation machinery necessary for selecting
+//! over a number of receivers. One large goal of this module is to provide an
+//! efficient interface to selecting over any receiver of any type.
+//!
+//! This is achieved through an architecture of a "receiver set" in which
+//! receivers are added to a set and then the entire set is waited on at once.
+//! The set can be waited on multiple times to prevent re-adding each receiver
+//! to the set.
+//!
+//! Usage of this module is currently encouraged to go through the use of the
+//! `select!` macro. This macro allows naturally binding of variables to the
+//! received values of receivers in a much more natural syntax then usage of the
+//! `Select` structure directly.
+//!
+//! # Example
+//!
+//! ```rust
+//! let (tx1, rx1) = channel();
+//! let (tx2, rx2) = channel();
+//!
+//! tx1.send(1i);
+//! tx2.send(2i);
+//!
+//! select! {
+//!     val = rx1.recv() => {
+//!         assert_eq!(val, 1i);
+//!     },
+//!     val = rx2.recv() => {
+//!         assert_eq!(val, 2i);
+//!     }
+//! }
+//! ```
+
+#![allow(dead_code)]
+#![experimental = "This implementation, while likely sufficient, is unsafe and \
+                   likely to be error prone. At some point in the future this \
+                   module will likely be replaced, and it is currently \
+                   unknown how much API breakage that will cause. The ability \
+                   to select over a number of channels will remain forever, \
+                   but no guarantees beyond this are being made"]
+
+
+use core::prelude::*;
+
+use alloc::boxed::Box;
+use core::cell::Cell;
+use core::kinds::marker;
+use core::mem;
+use core::uint;
+use rustrt::local::Local;
+use rustrt::task::{Task, BlockedTask};
+
+use comm::Receiver;
+
+/// The "receiver set" of the select interface. This structure is used to manage
+/// a set of receivers which are being selected over.
+pub struct Select {
+    head: *mut Handle<'static, ()>,
+    tail: *mut Handle<'static, ()>,
+    next_id: Cell<uint>,
+    marker1: marker::NoSend,
+}
+
+/// A handle to a receiver which is currently a member of a `Select` set of
+/// receivers.  This handle is used to keep the receiver in the set as well as
+/// interact with the underlying receiver.
+pub struct Handle<'rx, T:'rx> {
+    /// The ID of this handle, used to compare against the return value of
+    /// `Select::wait()`
+    id: uint,
+    selector: &'rx Select,
+    next: *mut Handle<'static, ()>,
+    prev: *mut Handle<'static, ()>,
+    added: bool,
+    packet: &'rx Packet+'rx,
+
+    // due to our fun transmutes, we be sure to place this at the end. (nothing
+    // previous relies on T)
+    rx: &'rx Receiver<T>,
+}
+
+struct Packets { cur: *mut Handle<'static, ()> }
+
+#[doc(hidden)]
+pub trait Packet {
+    fn can_recv(&self) -> bool;
+    fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
+    fn abort_selection(&self) -> bool;
+}
+
+impl Select {
+    /// Creates a new selection structure. This set is initially empty and
+    /// `wait` will panic!() if called.
+    ///
+    /// Usage of this struct directly can sometimes be burdensome, and usage is
+    /// rather much easier through the `select!` macro.
+    pub fn new() -> Select {
+        Select {
+            marker1: marker::NoSend,
+            head: 0 as *mut Handle<'static, ()>,
+            tail: 0 as *mut Handle<'static, ()>,
+            next_id: Cell::new(1),
+        }
+    }
+
+    /// Creates a new handle into this receiver set for a new receiver. Note
+    /// that this does *not* add the receiver to the receiver set, for that you
+    /// must call the `add` method on the handle itself.
+    pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
+        let id = self.next_id.get();
+        self.next_id.set(id + 1);
+        Handle {
+            id: id,
+            selector: self,
+            next: 0 as *mut Handle<'static, ()>,
+            prev: 0 as *mut Handle<'static, ()>,
+            added: false,
+            rx: rx,
+            packet: rx,
+        }
+    }
+
+    /// Waits for an event on this receiver set. The returned value is *not* an
+    /// index, but rather an id. This id can be queried against any active
+    /// `Handle` structures (each one has an `id` method). The handle with
+    /// the matching `id` will have some sort of event available on it. The
+    /// event could either be that data is available or the corresponding
+    /// channel has been closed.
+    pub fn wait(&self) -> uint {
+        self.wait2(true)
+    }
+
+    /// Helper method for skipping the preflight checks during testing
+    fn wait2(&self, do_preflight_checks: bool) -> uint {
+        // Note that this is currently an inefficient implementation. We in
+        // theory have knowledge about all receivers in the set ahead of time,
+        // so this method shouldn't really have to iterate over all of them yet
+        // again. The idea with this "receiver set" interface is to get the
+        // interface right this time around, and later this implementation can
+        // be optimized.
+        //
+        // This implementation can be summarized by:
+        //
+        //      fn select(receivers) {
+        //          if any receiver ready { return ready index }
+        //          deschedule {
+        //              block on all receivers
+        //          }
+        //          unblock on all receivers
+        //          return ready index
+        //      }
+        //
+        // Most notably, the iterations over all of the receivers shouldn't be
+        // necessary.
+        unsafe {
+            let mut amt = 0;
+            for p in self.iter() {
+                amt += 1;
+                if do_preflight_checks && (*p).packet.can_recv() {
+                    return (*p).id;
+                }
+            }
+            assert!(amt > 0);
+
+            let mut ready_index = amt;
+            let mut ready_id = uint::MAX;
+            let mut iter = self.iter().enumerate();
+
+            // Acquire a number of blocking contexts, and block on each one
+            // sequentially until one fails. If one fails, then abort
+            // immediately so we can go unblock on all the other receivers.
+            let task: Box<Task> = Local::take();
+            task.deschedule(amt, |task| {
+                // Prepare for the block
+                let (i, handle) = iter.next().unwrap();
+                match (*handle).packet.start_selection(task) {
+                    Ok(()) => Ok(()),
+                    Err(task) => {
+                        ready_index = i;
+                        ready_id = (*handle).id;
+                        Err(task)
+                    }
+                }
+            });
+
+            // Abort the selection process on each receiver. If the abort
+            // process returns `true`, then that means that the receiver is
+            // ready to receive some data. Note that this also means that the
+            // receiver may have yet to have fully read the `to_wake` field and
+            // woken us up (although the wakeup is guaranteed to fail).
+            //
+            // This situation happens in the window of where a sender invokes
+            // increment(), sees -1, and then decides to wake up the task. After
+            // all this is done, the sending thread will set `selecting` to
+            // `false`. Until this is done, we cannot return. If we were to
+            // return, then a sender could wake up a receiver which has gone
+            // back to sleep after this call to `select`.
+            //
+            // Note that it is a "fairly small window" in which an increment()
+            // views that it should wake a thread up until the `selecting` bit
+            // is set to false. For now, the implementation currently just spins
+            // in a yield loop. This is very distasteful, but this
+            // implementation is already nowhere near what it should ideally be.
+            // A rewrite should focus on avoiding a yield loop, and for now this
+            // implementation is tying us over to a more efficient "don't
+            // iterate over everything every time" implementation.
+            for handle in self.iter().take(ready_index) {
+                if (*handle).packet.abort_selection() {
+                    ready_id = (*handle).id;
+                }
+            }
+
+            assert!(ready_id != uint::MAX);
+            return ready_id;
+        }
+    }
+
+    fn iter(&self) -> Packets { Packets { cur: self.head } }
+}
+
+impl<'rx, T: Send> Handle<'rx, T> {
+    /// Retrieve the id of this handle.
+    #[inline]
+    pub fn id(&self) -> uint { self.id }
+
+    /// Receive a value on the underlying receiver. Has the same semantics as
+    /// `Receiver.recv`
+    pub fn recv(&mut self) -> T { self.rx.recv() }
+    /// Block to receive a value on the underlying receiver, returning `Some` on
+    /// success or `None` if the channel disconnects. This function has the same
+    /// semantics as `Receiver.recv_opt`
+    pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
+
+    /// Adds this handle to the receiver set that the handle was created from. This
+    /// method can be called multiple times, but it has no effect if `add` was
+    /// called previously.
+    ///
+    /// This method is unsafe because it requires that the `Handle` is not moved
+    /// while it is added to the `Select` set.
+    pub unsafe fn add(&mut self) {
+        if self.added { return }
+        let selector: &mut Select = mem::transmute(&*self.selector);
+        let me: *mut Handle<'static, ()> = mem::transmute(&*self);
+
+        if selector.head.is_null() {
+            selector.head = me;
+            selector.tail = me;
+        } else {
+            (*me).prev = selector.tail;
+            assert!((*me).next.is_null());
+            (*selector.tail).next = me;
+            selector.tail = me;
+        }
+        self.added = true;
+    }
+
+    /// Removes this handle from the `Select` set. This method is unsafe because
+    /// it has no guarantee that the `Handle` was not moved since `add` was
+    /// called.
+    pub unsafe fn remove(&mut self) {
+        if !self.added { return }
+
+        let selector: &mut Select = mem::transmute(&*self.selector);
+        let me: *mut Handle<'static, ()> = mem::transmute(&*self);
+
+        if self.prev.is_null() {
+            assert_eq!(selector.head, me);
+            selector.head = self.next;
+        } else {
+            (*self.prev).next = self.next;
+        }
+        if self.next.is_null() {
+            assert_eq!(selector.tail, me);
+            selector.tail = self.prev;
+        } else {
+            (*self.next).prev = self.prev;
+        }
+
+        self.next = 0 as *mut Handle<'static, ()>;
+        self.prev = 0 as *mut Handle<'static, ()>;
+
+        self.added = false;
+    }
+}
+
+#[unsafe_destructor]
+impl Drop for Select {
+    fn drop(&mut self) {
+        assert!(self.head.is_null());
+        assert!(self.tail.is_null());
+    }
+}
+
+#[unsafe_destructor]
+impl<'rx, T: Send> Drop for Handle<'rx, T> {
+    fn drop(&mut self) {
+        unsafe { self.remove() }
+    }
+}
+
+impl Iterator<*mut Handle<'static, ()>> for Packets {
+    fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
+        if self.cur.is_null() {
+            None
+        } else {
+            let ret = Some(self.cur);
+            unsafe { self.cur = (*self.cur).next; }
+            ret
+        }
+    }
+}
+
+#[cfg(test)]
+#[allow(unused_imports)]
+mod test {
+    use prelude::*;
+
+    use super::*;
+
+    // Don't use the libstd version so we can pull in the right Select structure
+    // (std::comm points at the wrong one)
+    macro_rules! select {
+        (
+            $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
+        ) => ({
+            use comm::Select;
+            let sel = Select::new();
+            $( let mut $rx = sel.handle(&$rx); )+
+            unsafe {
+                $( $rx.add(); )+
+            }
+            let ret = sel.wait();
+            $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
+            { unreachable!() }
+        })
+    }
+
+    test!(fn smoke() {
+        let (tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<int>();
+        tx1.send(1);
+        select! (
+            foo = rx1.recv() => { assert_eq!(foo, 1); },
+            _bar = rx2.recv() => { panic!() }
+        )
+        tx2.send(2);
+        select! (
+            _foo = rx1.recv() => { panic!() },
+            bar = rx2.recv() => { assert_eq!(bar, 2) }
+        )
+        drop(tx1);
+        select! (
+            foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
+            _bar = rx2.recv() => { panic!() }
+        )
+        drop(tx2);
+        select! (
+            bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
+        )
+    })
+
+    test!(fn smoke2() {
+        let (_tx1, rx1) = channel::<int>();
+        let (_tx2, rx2) = channel::<int>();
+        let (_tx3, rx3) = channel::<int>();
+        let (_tx4, rx4) = channel::<int>();
+        let (tx5, rx5) = channel::<int>();
+        tx5.send(4);
+        select! (
+            _foo = rx1.recv() => { panic!("1") },
+            _foo = rx2.recv() => { panic!("2") },
+            _foo = rx3.recv() => { panic!("3") },
+            _foo = rx4.recv() => { panic!("4") },
+            foo = rx5.recv() => { assert_eq!(foo, 4); }
+        )
+    })
+
+    test!(fn closed() {
+        let (_tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<int>();
+        drop(tx2);
+
+        select! (
+            _a1 = rx1.recv_opt() => { panic!() },
+            a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
+        )
+    })
+
+    test!(fn unblocks() {
+        let (tx1, rx1) = channel::<int>();
+        let (_tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<int>();
+
+        spawn(proc() {
+            for _ in range(0u, 20) { task::deschedule(); }
+            tx1.send(1);
+            rx3.recv();
+            for _ in range(0u, 20) { task::deschedule(); }
+        });
+
+        select! (
+            a = rx1.recv() => { assert_eq!(a, 1); },
+            _b = rx2.recv() => { panic!() }
+        )
+        tx3.send(1);
+        select! (
+            a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
+            _b = rx2.recv() => { panic!() }
+        )
+    })
+
+    test!(fn both_ready() {
+        let (tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+
+        spawn(proc() {
+            for _ in range(0u, 20) { task::deschedule(); }
+            tx1.send(1);
+            tx2.send(2);
+            rx3.recv();
+        });
+
+        select! (
+            a = rx1.recv() => { assert_eq!(a, 1); },
+            a = rx2.recv() => { assert_eq!(a, 2); }
+        )
+        select! (
+            a = rx1.recv() => { assert_eq!(a, 1); },
+            a = rx2.recv() => { assert_eq!(a, 2); }
+        )
+        assert_eq!(rx1.try_recv(), Err(Empty));
+        assert_eq!(rx2.try_recv(), Err(Empty));
+        tx3.send(());
+    })
+
+    test!(fn stress() {
+        static AMT: int = 10000;
+        let (tx1, rx1) = channel::<int>();
+        let (tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+
+        spawn(proc() {
+            for i in range(0, AMT) {
+                if i % 2 == 0 {
+                    tx1.send(i);
+                } else {
+                    tx2.send(i);
+                }
+                rx3.recv();
+            }
+        });
+
+        for i in range(0, AMT) {
+            select! (
+                i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
+                i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
+            )
+            tx3.send(());
+        }
+    })
+
+    test!(fn cloning() {
+        let (tx1, rx1) = channel::<int>();
+        let (_tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+
+        spawn(proc() {
+            rx3.recv();
+            tx1.clone();
+            assert_eq!(rx3.try_recv(), Err(Empty));
+            tx1.send(2);
+            rx3.recv();
+        });
+
+        tx3.send(());
+        select!(
+            _i1 = rx1.recv() => {},
+            _i2 = rx2.recv() => panic!()
+        )
+        tx3.send(());
+    })
+
+    test!(fn cloning2() {
+        let (tx1, rx1) = channel::<int>();
+        let (_tx2, rx2) = channel::<int>();
+        let (tx3, rx3) = channel::<()>();
+
+        spawn(proc() {
+            rx3.recv();
+            tx1.clone();
+            assert_eq!(rx3.try_recv(), Err(Empty));
+            tx1.send(2);
+            rx3.recv();
+        });
+
+        tx3.send(());
+        select!(
+            _i1 = rx1.recv() => {},
+            _i2 = rx2.recv() => panic!()
+        )
+        tx3.send(());
+    })
+
+    test!(fn cloning3() {
+        let (tx1, rx1) = channel::<()>();
+        let (tx2, rx2) = channel::<()>();
+        let (tx3, rx3) = channel::<()>();
+        spawn(proc() {
+            let s = Select::new();
+            let mut h1 = s.handle(&rx1);
+            let mut h2 = s.handle(&rx2);
+            unsafe { h2.add(); }
+            unsafe { h1.add(); }
+            assert_eq!(s.wait(), h2.id);
+            tx3.send(());
+        });
+
+        for _ in range(0u, 1000) { task::deschedule(); }
+        drop(tx1.clone());
+        tx2.send(());
+        rx3.recv();
+    })
+
+    test!(fn preflight1() {
+        let (tx, rx) = channel();
+        tx.send(());
+        select!(
+            () = rx.recv() => {}
+        )
+    })
+
+    test!(fn preflight2() {
+        let (tx, rx) = channel();
+        tx.send(());
+        tx.send(());
+        select!(
+            () = rx.recv() => {}
+        )
+    })
+
+    test!(fn preflight3() {
+        let (tx, rx) = channel();
+        drop(tx.clone());
+        tx.send(());
+        select!(
+            () = rx.recv() => {}
+        )
+    })
+
+    test!(fn preflight4() {
+        let (tx, rx) = channel();
+        tx.send(());
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn preflight5() {
+        let (tx, rx) = channel();
+        tx.send(());
+        tx.send(());
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn preflight6() {
+        let (tx, rx) = channel();
+        drop(tx.clone());
+        tx.send(());
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn preflight7() {
+        let (tx, rx) = channel::<()>();
+        drop(tx);
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn preflight8() {
+        let (tx, rx) = channel();
+        tx.send(());
+        drop(tx);
+        rx.recv();
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn preflight9() {
+        let (tx, rx) = channel();
+        drop(tx.clone());
+        tx.send(());
+        drop(tx);
+        rx.recv();
+        let s = Select::new();
+        let mut h = s.handle(&rx);
+        unsafe { h.add(); }
+        assert_eq!(s.wait2(false), h.id);
+    })
+
+    test!(fn oneshot_data_waiting() {
+        let (tx1, rx1) = channel();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            select! {
+                () = rx1.recv() => {}
+            }
+            tx2.send(());
+        });
+
+        for _ in range(0u, 100) { task::deschedule() }
+        tx1.send(());
+        rx2.recv();
+    })
+
+    test!(fn stream_data_waiting() {
+        let (tx1, rx1) = channel();
+        let (tx2, rx2) = channel();
+        tx1.send(());
+        tx1.send(());
+        rx1.recv();
+        rx1.recv();
+        spawn(proc() {
+            select! {
+                () = rx1.recv() => {}
+            }
+            tx2.send(());
+        });
+
+        for _ in range(0u, 100) { task::deschedule() }
+        tx1.send(());
+        rx2.recv();
+    })
+
+    test!(fn shared_data_waiting() {
+        let (tx1, rx1) = channel();
+        let (tx2, rx2) = channel();
+        drop(tx1.clone());
+        tx1.send(());
+        rx1.recv();
+        spawn(proc() {
+            select! {
+                () = rx1.recv() => {}
+            }
+            tx2.send(());
+        });
+
+        for _ in range(0u, 100) { task::deschedule() }
+        tx1.send(());
+        rx2.recv();
+    })
+
+    test!(fn sync1() {
+        let (tx, rx) = sync_channel::<int>(1);
+        tx.send(1);
+        select! {
+            n = rx.recv() => { assert_eq!(n, 1); }
+        }
+    })
+
+    test!(fn sync2() {
+        let (tx, rx) = sync_channel::<int>(0);
+        spawn(proc() {
+            for _ in range(0u, 100) { task::deschedule() }
+            tx.send(1);
+        });
+        select! {
+            n = rx.recv() => { assert_eq!(n, 1); }
+        }
+    })
+
+    test!(fn sync3() {
+        let (tx1, rx1) = sync_channel::<int>(0);
+        let (tx2, rx2): (Sender<int>, Receiver<int>) = channel();
+        spawn(proc() { tx1.send(1); });
+        spawn(proc() { tx2.send(2); });
+        select! {
+            n = rx1.recv() => {
+                assert_eq!(n, 1);
+                assert_eq!(rx2.recv(), 2);
+            },
+            n = rx2.recv() => {
+                assert_eq!(n, 2);
+                assert_eq!(rx1.recv(), 1);
+            }
+        }
+    })
+}
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
new file mode 100644
index 00000000000..6396edbdbd1
--- /dev/null
+++ b/src/libstd/comm/shared.rs
@@ -0,0 +1,493 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Shared channels
+///
+/// This is the flavor of channels which are not necessarily optimized for any
+/// particular use case, but are the most general in how they are used. Shared
+/// channels are cloneable allowing for multiple senders.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module. You'll also note that the implementation of the shared and stream
+/// channels are quite similar, and this is no coincidence!
+
+pub use self::Failure::*;
+
+use core::prelude::*;
+
+use alloc::boxed::Box;
+use core::cmp;
+use core::int;
+use rustrt::local::Local;
+use rustrt::mutex::NativeMutex;
+use rustrt::task::{Task, BlockedTask};
+use rustrt::thread::Thread;
+
+use sync::atomic;
+use sync::mpsc_queue as mpsc;
+
+const DISCONNECTED: int = int::MIN;
+const FUDGE: int = 1024;
+#[cfg(test)]
+const MAX_STEALS: int = 5;
+#[cfg(not(test))]
+const MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+    queue: mpsc::Queue<T>,
+    cnt: atomic::AtomicInt, // How many items are on this channel
+    steals: int, // How many times has a port received without blocking?
+    to_wake: atomic::AtomicUint, // Task to wake up
+
+    // The number of channels which are currently using this packet.
+    channels: atomic::AtomicInt,
+
+    // See the discussion in Port::drop and the channel send methods for what
+    // these are used for
+    port_dropped: atomic::AtomicBool,
+    sender_drain: atomic::AtomicInt,
+
+    // this lock protects various portions of this implementation during
+    // select()
+    select_lock: NativeMutex,
+}
+
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+impl<T: Send> Packet<T> {
+    // Creation of a packet *must* be followed by a call to postinit_lock
+    // and later by inherit_blocker
+    pub fn new() -> Packet<T> {
+        let p = Packet {
+            queue: mpsc::Queue::new(),
+            cnt: atomic::AtomicInt::new(0),
+            steals: 0,
+            to_wake: atomic::AtomicUint::new(0),
+            channels: atomic::AtomicInt::new(2),
+            port_dropped: atomic::AtomicBool::new(false),
+            sender_drain: atomic::AtomicInt::new(0),
+            select_lock: unsafe { NativeMutex::new() },
+        };
+        return p;
+    }
+
+    // This function should be used after newly created Packet
+    // was wrapped with an Arc
+    // In other case mutex data will be duplicated while cloning
+    // and that could cause problems on platforms where it is
+    // represented by opaque data structure
+    pub fn postinit_lock(&mut self) {
+        unsafe { self.select_lock.lock_noguard() }
+    }
+
+    // This function is used at the creation of a shared packet to inherit a
+    // previously blocked task. This is done to prevent spurious wakeups of
+    // tasks in select().
+    //
+    // This can only be called at channel-creation time
+    pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+        match task {
+            Some(task) => {
+                assert_eq!(self.cnt.load(atomic::SeqCst), 0);
+                assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+                self.to_wake.store(unsafe { task.cast_to_uint() },
+                                   atomic::SeqCst);
+                self.cnt.store(-1, atomic::SeqCst);
+
+                // This store is a little sketchy. What's happening here is
+                // that we're transferring a blocker from a oneshot or stream
+                // channel to this shared channel. In doing so, we never
+                // spuriously wake them up and rather only wake them up at the
+                // appropriate time. This implementation of shared channels
+                // assumes that any blocking recv() will undo the increment of
+                // steals performed in try_recv() once the recv is complete.
+                // This thread that we're inheriting, however, is not in the
+                // middle of recv. Hence, the first time we wake them up,
+                // they're going to wake up from their old port, move on to the
+                // upgraded port, and then call the block recv() function.
+                //
+                // When calling this function, they'll find there's data
+                // immediately available, counting it as a steal. This in fact
+                // wasn't a steal because we appropriately blocked them waiting
+                // for data.
+                //
+                // To offset this bad increment, we initially set the steal
+                // count to -1. You'll find some special code in
+                // abort_selection() as well to ensure that this -1 steal count
+                // doesn't escape too far.
+                self.steals = -1;
+            }
+            None => {}
+        }
+
+        // When the shared packet is constructed, we grabbed this lock. The
+        // purpose of this lock is to ensure that abort_selection() doesn't
+        // interfere with this method. After we unlock this lock, we're
+        // signifying that we're done modifying self.cnt and self.to_wake and
+        // the port is ready for the world to continue using it.
+        unsafe { self.select_lock.unlock_noguard() }
+    }
+
+    pub fn send(&mut self, t: T) -> Result<(), T> {
+        // See Port::drop for what's going on
+        if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
+
+        // Note that the multiple sender case is a little trickier
+        // semantically than the single sender case. The logic for
+        // incrementing is "add and if disconnected store disconnected".
+        // This could end up leading some senders to believe that there
+        // wasn't a disconnect if in fact there was a disconnect. This means
+        // that while one thread is attempting to re-store the disconnected
+        // states, other threads could walk through merrily incrementing
+        // this very-negative disconnected count. To prevent senders from
+        // spuriously attempting to send when the channels is actually
+        // disconnected, the count has a ranged check here.
+        //
+        // This is also done for another reason. Remember that the return
+        // value of this function is:
+        //
+        //  `true` == the data *may* be received, this essentially has no
+        //            meaning
+        //  `false` == the data will *never* be received, this has a lot of
+        //             meaning
+        //
+        // In the SPSC case, we have a check of 'queue.is_empty()' to see
+        // whether the data was actually received, but this same condition
+        // means nothing in a multi-producer context. As a result, this
+        // preflight check serves as the definitive "this will never be
+        // received". Once we get beyond this check, we have permanently
+        // entered the realm of "this may be received"
+        if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
+            return Err(t)
+        }
+
+        self.queue.push(t);
+        match self.cnt.fetch_add(1, atomic::SeqCst) {
+            -1 => {
+                self.take_to_wake().wake().map(|t| t.reawaken());
+            }
+
+            // In this case, we have possibly failed to send our data, and
+            // we need to consider re-popping the data in order to fully
+            // destroy it. We must arbitrate among the multiple senders,
+            // however, because the queues that we're using are
+            // single-consumer queues. In order to do this, all exiting
+            // pushers will use an atomic count in order to count those
+            // flowing through. Pushers who see 0 are required to drain as
+            // much as possible, and then can only exit when they are the
+            // only pusher (otherwise they must try again).
+            n if n < DISCONNECTED + FUDGE => {
+                // see the comment in 'try' for a shared channel for why this
+                // window of "not disconnected" is ok.
+                self.cnt.store(DISCONNECTED, atomic::SeqCst);
+
+                if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
+                    loop {
+                        // drain the queue, for info on the thread yield see the
+                        // discussion in try_recv
+                        loop {
+                            match self.queue.pop() {
+                                mpsc::Data(..) => {}
+                                mpsc::Empty => break,
+                                mpsc::Inconsistent => Thread::yield_now(),
+                            }
+                        }
+                        // maybe we're done, if we're not the last ones
+                        // here, then we need to go try again.
+                        if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
+                            break
+                        }
+                    }
+
+                    // At this point, there may still be data on the queue,
+                    // but only if the count hasn't been incremented and
+                    // some other sender hasn't finished pushing data just
+                    // yet. That sender in question will drain its own data.
+                }
+            }
+
+            // Can't make any assumptions about this case like in the SPSC case.
+            _ => {}
+        }
+
+        Ok(())
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure> {
+        // This code is essentially the exact same as that found in the stream
+        // case (see stream.rs)
+        match self.try_recv() {
+            Err(Empty) => {}
+            data => return data,
+        }
+
+        let task: Box<Task> = Local::take();
+        task.deschedule(1, |task| {
+            self.decrement(task)
+        });
+
+        match self.try_recv() {
+            data @ Ok(..) => { self.steals -= 1; data }
+            data => data,
+        }
+    }
+
+    // Essentially the exact same thing as the stream decrement function.
+    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+        let n = unsafe { task.cast_to_uint() };
+        self.to_wake.store(n, atomic::SeqCst);
+
+        let steals = self.steals;
+        self.steals = 0;
+
+        match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
+            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
+            // If we factor in our steals and notice that the channel has no
+            // data, we successfully sleep
+            n => {
+                assert!(n >= 0);
+                if n - steals <= 0 { return Ok(()) }
+            }
+        }
+
+        self.to_wake.store(0, atomic::SeqCst);
+        Err(unsafe { BlockedTask::cast_from_uint(n) })
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure> {
+        let ret = match self.queue.pop() {
+            mpsc::Data(t) => Some(t),
+            mpsc::Empty => None,
+
+            // This is a bit of an interesting case. The channel is
+            // reported as having data available, but our pop() has
+            // failed due to the queue being in an inconsistent state.
+            // This means that there is some pusher somewhere which has
+            // yet to complete, but we are guaranteed that a pop will
+            // eventually succeed. In this case, we spin in a yield loop
+            // because the remote sender should finish their enqueue
+            // operation "very quickly".
+            //
+            // Avoiding this yield loop would require a different queue
+            // abstraction which provides the guarantee that after M
+            // pushes have succeeded, at least M pops will succeed. The
+            // current queues guarantee that if there are N active
+            // pushes, you can pop N times once all N have finished.
+            mpsc::Inconsistent => {
+                let data;
+                loop {
+                    Thread::yield_now();
+                    match self.queue.pop() {
+                        mpsc::Data(t) => { data = t; break }
+                        mpsc::Empty => panic!("inconsistent => empty"),
+                        mpsc::Inconsistent => {}
+                    }
+                }
+                Some(data)
+            }
+        };
+        match ret {
+            // See the discussion in the stream implementation for why we
+            // might decrement steals.
+            Some(data) => {
+                if self.steals > MAX_STEALS {
+                    match self.cnt.swap(0, atomic::SeqCst) {
+                        DISCONNECTED => {
+                            self.cnt.store(DISCONNECTED, atomic::SeqCst);
+                        }
+                        n => {
+                            let m = cmp::min(n, self.steals);
+                            self.steals -= m;
+                            self.bump(n - m);
+                        }
+                    }
+                    assert!(self.steals >= 0);
+                }
+                self.steals += 1;
+                Ok(data)
+            }
+
+            // See the discussion in the stream implementation for why we try
+            // again.
+            None => {
+                match self.cnt.load(atomic::SeqCst) {
+                    n if n != DISCONNECTED => Err(Empty),
+                    _ => {
+                        match self.queue.pop() {
+                            mpsc::Data(t) => Ok(t),
+                            mpsc::Empty => Err(Disconnected),
+                            // with no senders, an inconsistency is impossible.
+                            mpsc::Inconsistent => unreachable!(),
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&mut self) {
+        self.channels.fetch_add(1, atomic::SeqCst);
+    }
+
+    // Decrement the reference count on a channel. This is called whenever a
+    // Chan is dropped and may end up waking up a receiver. It's the receiver's
+    // responsibility on the other end to figure out that we've disconnected.
+    pub fn drop_chan(&mut self) {
+        match self.channels.fetch_sub(1, atomic::SeqCst) {
+            1 => {}
+            n if n > 1 => return,
+            n => panic!("bad number of channels left {}", n),
+        }
+
+        match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
+            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            DISCONNECTED => {}
+            n => { assert!(n >= 0); }
+        }
+    }
+
+    // See the long discussion inside of stream.rs for why the queue is drained,
+    // and why it is done in this fashion.
+    pub fn drop_port(&mut self) {
+        self.port_dropped.store(true, atomic::SeqCst);
+        let mut steals = self.steals;
+        while {
+            let cnt = self.cnt.compare_and_swap(
+                            steals, DISCONNECTED, atomic::SeqCst);
+            cnt != DISCONNECTED && cnt != steals
+        } {
+            // See the discussion in 'try_recv' for why we yield
+            // control of this thread.
+            loop {
+                match self.queue.pop() {
+                    mpsc::Data(..) => { steals += 1; }
+                    mpsc::Empty | mpsc::Inconsistent => break,
+                }
+            }
+        }
+    }
+
+    // Consumes ownership of the 'to_wake' field.
+    fn take_to_wake(&mut self) -> BlockedTask {
+        let task = self.to_wake.load(atomic::SeqCst);
+        self.to_wake.store(0, atomic::SeqCst);
+        assert!(task != 0);
+        unsafe { BlockedTask::cast_from_uint(task) }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // Helper function for select, tests whether this port can receive without
+    // blocking (obviously not an atomic decision).
+    //
+    // This is different than the stream version because there's no need to peek
+    // at the queue, we can just look at the local count.
+    pub fn can_recv(&mut self) -> bool {
+        let cnt = self.cnt.load(atomic::SeqCst);
+        cnt == DISCONNECTED || cnt - self.steals > 0
+    }
+
+    // increment the count on the channel (used for selection)
+    fn bump(&mut self, amt: int) -> int {
+        match self.cnt.fetch_add(amt, atomic::SeqCst) {
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, atomic::SeqCst);
+                DISCONNECTED
+            }
+            n => n
+        }
+    }
+
+    // Inserts the blocked task for selection on this port, returning it back if
+    // the port already has data on it.
+    //
+    // The code here is the same as in stream.rs, except that it doesn't need to
+    // peek at the channel to see if an upgrade is pending.
+    pub fn start_selection(&mut self,
+                           task: BlockedTask) -> Result<(), BlockedTask> {
+        match self.decrement(task) {
+            Ok(()) => Ok(()),
+            Err(task) => {
+                let prev = self.bump(1);
+                assert!(prev == DISCONNECTED || prev >= 0);
+                return Err(task);
+            }
+        }
+    }
+
+    // Cancels a previous task waiting on this port, returning whether there's
+    // data on the port.
+    //
+    // This is similar to the stream implementation (hence fewer comments), but
+    // uses a different value for the "steals" variable.
+    pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
+        // Before we do anything else, we bounce on this lock. The reason for
+        // doing this is to ensure that any upgrade-in-progress is gone and
+        // done with. Without this bounce, we can race with inherit_blocker
+        // about looking at and dealing with to_wake. Once we have acquired the
+        // lock, we are guaranteed that inherit_blocker is done.
+        unsafe {
+            let _guard = self.select_lock.lock();
+        }
+
+        // Like the stream implementation, we want to make sure that the count
+        // on the channel goes non-negative. We don't know how negative the
+        // stream currently is, so instead of using a steal value of 1, we load
+        // the channel count and figure out what we should do to make it
+        // positive.
+        let steals = {
+            let cnt = self.cnt.load(atomic::SeqCst);
+            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+        };
+        let prev = self.bump(steals + 1);
+
+        if prev == DISCONNECTED {
+            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+            true
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+            if prev < 0 {
+                self.take_to_wake().trash();
+            } else {
+                while self.to_wake.load(atomic::SeqCst) != 0 {
+                    Thread::yield_now();
+                }
+            }
+            // if the number of steals is -1, it was the pre-emptive -1 steal
+            // count from when we inherited a blocker. This is fine because
+            // we're just going to overwrite it with a real value.
+            assert!(self.steals == 0 || self.steals == -1);
+            self.steals = steals;
+            prev >= 0
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        // Note that this load is not only an assert for correctness about
+        // disconnection, but also a proper fence before the read of
+        // `to_wake`, so this assert cannot be removed with also removing
+        // the `to_wake` assert.
+        assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
+        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+        assert_eq!(self.channels.load(atomic::SeqCst), 0);
+    }
+}
diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs
new file mode 100644
index 00000000000..23d042960b1
--- /dev/null
+++ b/src/libstd/comm/stream.rs
@@ -0,0 +1,486 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Stream channels
+///
+/// This is the flavor of channels which are optimized for one sender and one
+/// receiver. The sender will be upgraded to a shared channel if the channel is
+/// cloned.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module.
+
+pub use self::Failure::*;
+pub use self::UpgradeResult::*;
+pub use self::SelectionResult::*;
+use self::Message::*;
+
+use core::prelude::*;
+
+use alloc::boxed::Box;
+use core::cmp;
+use core::int;
+use rustrt::local::Local;
+use rustrt::task::{Task, BlockedTask};
+use rustrt::thread::Thread;
+
+use sync::atomic;
+use sync::spsc_queue as spsc;
+use comm::Receiver;
+
+const DISCONNECTED: int = int::MIN;
+#[cfg(test)]
+const MAX_STEALS: int = 5;
+#[cfg(not(test))]
+const MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+    queue: spsc::Queue<Message<T>>, // internal queue for all message
+
+    cnt: atomic::AtomicInt, // How many items are on this channel
+    steals: int, // How many times has a port received without blocking?
+    to_wake: atomic::AtomicUint, // Task to wake up
+
+    port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
+}
+
+pub enum Failure<T> {
+    Empty,
+    Disconnected,
+    Upgraded(Receiver<T>),
+}
+
+pub enum UpgradeResult {
+    UpSuccess,
+    UpDisconnected,
+    UpWoke(BlockedTask),
+}
+
+pub enum SelectionResult<T> {
+    SelSuccess,
+    SelCanceled(BlockedTask),
+    SelUpgraded(BlockedTask, Receiver<T>),
+}
+
+// Any message could contain an "upgrade request" to a new shared port, so the
+// internal queue it's a queue of T, but rather Message<T>
+enum Message<T> {
+    Data(T),
+    GoUp(Receiver<T>),
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new() -> Packet<T> {
+        Packet {
+            queue: unsafe { spsc::Queue::new(128) },
+
+            cnt: atomic::AtomicInt::new(0),
+            steals: 0,
+            to_wake: atomic::AtomicUint::new(0),
+
+            port_dropped: atomic::AtomicBool::new(false),
+        }
+    }
+
+
+    pub fn send(&mut self, t: T) -> Result<(), T> {
+        // If the other port has deterministically gone away, then definitely
+        // must return the data back up the stack. Otherwise, the data is
+        // considered as being sent.
+        if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
+
+        match self.do_send(Data(t)) {
+            UpSuccess | UpDisconnected => {},
+            UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
+        }
+        Ok(())
+    }
+    pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
+        // If the port has gone away, then there's no need to proceed any
+        // further.
+        if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected }
+
+        self.do_send(GoUp(up))
+    }
+
+    fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
+        self.queue.push(t);
+        match self.cnt.fetch_add(1, atomic::SeqCst) {
+            // As described in the mod's doc comment, -1 == wakeup
+            -1 => UpWoke(self.take_to_wake()),
+            // As as described before, SPSC queues must be >= -2
+            -2 => UpSuccess,
+
+            // Be sure to preserve the disconnected state, and the return value
+            // in this case is going to be whether our data was received or not.
+            // This manifests itself on whether we have an empty queue or not.
+            //
+            // Primarily, are required to drain the queue here because the port
+            // will never remove this data. We can only have at most one item to
+            // drain (the port drains the rest).
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, atomic::SeqCst);
+                let first = self.queue.pop();
+                let second = self.queue.pop();
+                assert!(second.is_none());
+
+                match first {
+                    Some(..) => UpSuccess,  // we failed to send the data
+                    None => UpDisconnected, // we successfully sent data
+                }
+            }
+
+            // Otherwise we just sent some data on a non-waiting queue, so just
+            // make sure the world is sane and carry on!
+            n => { assert!(n >= 0); UpSuccess }
+        }
+    }
+
+    // Consumes ownership of the 'to_wake' field.
+    fn take_to_wake(&mut self) -> BlockedTask {
+        let task = self.to_wake.load(atomic::SeqCst);
+        self.to_wake.store(0, atomic::SeqCst);
+        assert!(task != 0);
+        unsafe { BlockedTask::cast_from_uint(task) }
+    }
+
+    // Decrements the count on the channel for a sleeper, returning the sleeper
+    // back if it shouldn't sleep. Note that this is the location where we take
+    // steals into account.
+    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+        let n = unsafe { task.cast_to_uint() };
+        self.to_wake.store(n, atomic::SeqCst);
+
+        let steals = self.steals;
+        self.steals = 0;
+
+        match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
+            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
+            // If we factor in our steals and notice that the channel has no
+            // data, we successfully sleep
+            n => {
+                assert!(n >= 0);
+                if n - steals <= 0 { return Ok(()) }
+            }
+        }
+
+        self.to_wake.store(0, atomic::SeqCst);
+        Err(unsafe { BlockedTask::cast_from_uint(n) })
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure<T>> {
+        // Optimistic preflight check (scheduling is expensive).
+        match self.try_recv() {
+            Err(Empty) => {}
+            data => return data,
+        }
+
+        // Welp, our channel has no data. Deschedule the current task and
+        // initiate the blocking protocol.
+        let task: Box<Task> = Local::take();
+        task.deschedule(1, |task| {
+            self.decrement(task)
+        });
+
+        match self.try_recv() {
+            // Messages which actually popped from the queue shouldn't count as
+            // a steal, so offset the decrement here (we already have our
+            // "steal" factored into the channel count above).
+            data @ Ok(..) |
+            data @ Err(Upgraded(..)) => {
+                self.steals -= 1;
+                data
+            }
+
+            data => data,
+        }
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+        match self.queue.pop() {
+            // If we stole some data, record to that effect (this will be
+            // factored into cnt later on).
+            //
+            // Note that we don't allow steals to grow without bound in order to
+            // prevent eventual overflow of either steals or cnt as an overflow
+            // would have catastrophic results. Sometimes, steals > cnt, but
+            // other times cnt > steals, so we don't know the relation between
+            // steals and cnt. This code path is executed only rarely, so we do
+            // a pretty slow operation, of swapping 0 into cnt, taking steals
+            // down as much as possible (without going negative), and then
+            // adding back in whatever we couldn't factor into steals.
+            Some(data) => {
+                if self.steals > MAX_STEALS {
+                    match self.cnt.swap(0, atomic::SeqCst) {
+                        DISCONNECTED => {
+                            self.cnt.store(DISCONNECTED, atomic::SeqCst);
+                        }
+                        n => {
+                            let m = cmp::min(n, self.steals);
+                            self.steals -= m;
+                            self.bump(n - m);
+                        }
+                    }
+                    assert!(self.steals >= 0);
+                }
+                self.steals += 1;
+                match data {
+                    Data(t) => Ok(t),
+                    GoUp(up) => Err(Upgraded(up)),
+                }
+            }
+
+            None => {
+                match self.cnt.load(atomic::SeqCst) {
+                    n if n != DISCONNECTED => Err(Empty),
+
+                    // This is a little bit of a tricky case. We failed to pop
+                    // data above, and then we have viewed that the channel is
+                    // disconnected. In this window more data could have been
+                    // sent on the channel. It doesn't really make sense to
+                    // return that the channel is disconnected when there's
+                    // actually data on it, so be extra sure there's no data by
+                    // popping one more time.
+                    //
+                    // We can ignore steals because the other end is
+                    // disconnected and we'll never need to really factor in our
+                    // steals again.
+                    _ => {
+                        match self.queue.pop() {
+                            Some(Data(t)) => Ok(t),
+                            Some(GoUp(up)) => Err(Upgraded(up)),
+                            None => Err(Disconnected),
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    pub fn drop_chan(&mut self) {
+        // Dropping a channel is pretty simple, we just flag it as disconnected
+        // and then wakeup a blocker if there is one.
+        match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
+            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            DISCONNECTED => {}
+            n => { assert!(n >= 0); }
+        }
+    }
+
+    pub fn drop_port(&mut self) {
+        // Dropping a port seems like a fairly trivial thing. In theory all we
+        // need to do is flag that we're disconnected and then everything else
+        // can take over (we don't have anyone to wake up).
+        //
+        // The catch for Ports is that we want to drop the entire contents of
+        // the queue. There are multiple reasons for having this property, the
+        // largest of which is that if another chan is waiting in this channel
+        // (but not received yet), then waiting on that port will cause a
+        // deadlock.
+        //
+        // So if we accept that we must now destroy the entire contents of the
+        // queue, this code may make a bit more sense. The tricky part is that
+        // we can't let any in-flight sends go un-dropped, we have to make sure
+        // *everything* is dropped and nothing new will come onto the channel.
+
+        // The first thing we do is set a flag saying that we're done for. All
+        // sends are gated on this flag, so we're immediately guaranteed that
+        // there are a bounded number of active sends that we'll have to deal
+        // with.
+        self.port_dropped.store(true, atomic::SeqCst);
+
+        // Now that we're guaranteed to deal with a bounded number of senders,
+        // we need to drain the queue. This draining process happens atomically
+        // with respect to the "count" of the channel. If the count is nonzero
+        // (with steals taken into account), then there must be data on the
+        // channel. In this case we drain everything and then try again. We will
+        // continue to fail while active senders send data while we're dropping
+        // data, but eventually we're guaranteed to break out of this loop
+        // (because there is a bounded number of senders).
+        let mut steals = self.steals;
+        while {
+            let cnt = self.cnt.compare_and_swap(
+                            steals, DISCONNECTED, atomic::SeqCst);
+            cnt != DISCONNECTED && cnt != steals
+        } {
+            loop {
+                match self.queue.pop() {
+                    Some(..) => { steals += 1; }
+                    None => break
+                }
+            }
+        }
+
+        // At this point in time, we have gated all future senders from sending,
+        // and we have flagged the channel as being disconnected. The senders
+        // still have some responsibility, however, because some sends may not
+        // complete until after we flag the disconnection. There are more
+        // details in the sending methods that see DISCONNECTED
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // Tests to see whether this port can receive without blocking. If Ok is
+    // returned, then that's the answer. If Err is returned, then the returned
+    // port needs to be queried instead (an upgrade happened)
+    pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
+        // We peek at the queue to see if there's anything on it, and we use
+        // this return value to determine if we should pop from the queue and
+        // upgrade this channel immediately. If it looks like we've got an
+        // upgrade pending, then go through the whole recv rigamarole to update
+        // the internal state.
+        match self.queue.peek() {
+            Some(&GoUp(..)) => {
+                match self.recv() {
+                    Err(Upgraded(port)) => Err(port),
+                    _ => unreachable!(),
+                }
+            }
+            Some(..) => Ok(true),
+            None => Ok(false)
+        }
+    }
+
+    // increment the count on the channel (used for selection)
+    fn bump(&mut self, amt: int) -> int {
+        match self.cnt.fetch_add(amt, atomic::SeqCst) {
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, atomic::SeqCst);
+                DISCONNECTED
+            }
+            n => n
+        }
+    }
+
+    // Attempts to start selecting on this port. Like a oneshot, this can fail
+    // immediately because of an upgrade.
+    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
+        match self.decrement(task) {
+            Ok(()) => SelSuccess,
+            Err(task) => {
+                let ret = match self.queue.peek() {
+                    Some(&GoUp(..)) => {
+                        match self.queue.pop() {
+                            Some(GoUp(port)) => SelUpgraded(task, port),
+                            _ => unreachable!(),
+                        }
+                    }
+                    Some(..) => SelCanceled(task),
+                    None => SelCanceled(task),
+                };
+                // Undo our decrement above, and we should be guaranteed that the
+                // previous value is positive because we're not going to sleep
+                let prev = self.bump(1);
+                assert!(prev == DISCONNECTED || prev >= 0);
+                return ret;
+            }
+        }
+    }
+
+    // Removes a previous task from being blocked in this port
+    pub fn abort_selection(&mut self,
+                           was_upgrade: bool) -> Result<bool, Receiver<T>> {
+        // If we're aborting selection after upgrading from a oneshot, then
+        // we're guarantee that no one is waiting. The only way that we could
+        // have seen the upgrade is if data was actually sent on the channel
+        // half again. For us, this means that there is guaranteed to be data on
+        // this channel. Furthermore, we're guaranteed that there was no
+        // start_selection previously, so there's no need to modify `self.cnt`
+        // at all.
+        //
+        // Hence, because of these invariants, we immediately return `Ok(true)`.
+        // Note that the data may not actually be sent on the channel just yet.
+        // The other end could have flagged the upgrade but not sent data to
+        // this end. This is fine because we know it's a small bounded windows
+        // of time until the data is actually sent.
+        if was_upgrade {
+            assert_eq!(self.steals, 0);
+            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+            return Ok(true)
+        }
+
+        // We want to make sure that the count on the channel goes non-negative,
+        // and in the stream case we can have at most one steal, so just assume
+        // that we had one steal.
+        let steals = 1;
+        let prev = self.bump(steals + 1);
+
+        // If we were previously disconnected, then we know for sure that there
+        // is no task in to_wake, so just keep going
+        let has_data = if prev == DISCONNECTED {
+            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+            true // there is data, that data is that we're disconnected
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+
+            // If the previous count was negative, then we just made things go
+            // positive, hence we passed the -1 boundary and we're responsible
+            // for removing the to_wake() field and trashing it.
+            //
+            // If the previous count was positive then we're in a tougher
+            // situation. A possible race is that a sender just incremented
+            // through -1 (meaning it's going to try to wake a task up), but it
+            // hasn't yet read the to_wake. In order to prevent a future recv()
+            // from waking up too early (this sender picking up the plastered
+            // over to_wake), we spin loop here waiting for to_wake to be 0.
+            // Note that this entire select() implementation needs an overhaul,
+            // and this is *not* the worst part of it, so this is not done as a
+            // final solution but rather out of necessity for now to get
+            // something working.
+            if prev < 0 {
+                self.take_to_wake().trash();
+            } else {
+                while self.to_wake.load(atomic::SeqCst) != 0 {
+                    Thread::yield_now();
+                }
+            }
+            assert_eq!(self.steals, 0);
+            self.steals = steals;
+
+            // if we were previously positive, then there's surely data to
+            // receive
+            prev >= 0
+        };
+
+        // Now that we've determined that this queue "has data", we peek at the
+        // queue to see if the data is an upgrade or not. If it's an upgrade,
+        // then we need to destroy this port and abort selection on the
+        // upgraded port.
+        if has_data {
+            match self.queue.peek() {
+                Some(&GoUp(..)) => {
+                    match self.queue.pop() {
+                        Some(GoUp(port)) => Err(port),
+                        _ => unreachable!(),
+                    }
+                }
+                _ => Ok(true),
+            }
+        } else {
+            Ok(false)
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        // Note that this load is not only an assert for correctness about
+        // disconnection, but also a proper fence before the read of
+        // `to_wake`, so this assert cannot be removed with also removing
+        // the `to_wake` assert.
+        assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
+        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
+    }
+}
diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs
new file mode 100644
index 00000000000..a2e839e134c
--- /dev/null
+++ b/src/libstd/comm/sync.rs
@@ -0,0 +1,490 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Synchronous channels/ports
+///
+/// This channel implementation differs significantly from the asynchronous
+/// implementations found next to it (oneshot/stream/share). This is an
+/// implementation of a synchronous, bounded buffer channel.
+///
+/// Each channel is created with some amount of backing buffer, and sends will
+/// *block* until buffer space becomes available. A buffer size of 0 is valid,
+/// which means that every successful send is paired with a successful recv.
+///
+/// This flavor of channels defines a new `send_opt` method for channels which
+/// is the method by which a message is sent but the task does not panic if it
+/// cannot be delivered.
+///
+/// Another major difference is that send() will *always* return back the data
+/// if it couldn't be sent. This is because it is deterministically known when
+/// the data is received and when it is not received.
+///
+/// Implementation-wise, it can all be summed up with "use a mutex plus some
+/// logic". The mutex used here is an OS native mutex, meaning that no user code
+/// is run inside of the mutex (to prevent context switching). This
+/// implementation shares almost all code for the buffered and unbuffered cases
+/// of a synchronous channel. There are a few branches for the unbuffered case,
+/// but they're mostly just relevant to blocking senders.
+
+use core::prelude::*;
+
+pub use self::Failure::*;
+use self::Blocker::*;
+
+use alloc::boxed::Box;
+use vec::Vec;
+use core::mem;
+use core::cell::UnsafeCell;
+use rustrt::local::Local;
+use rustrt::mutex::{NativeMutex, LockGuard};
+use rustrt::task::{Task, BlockedTask};
+
+use sync::atomic;
+
+pub struct Packet<T> {
+    /// Only field outside of the mutex. Just done for kicks, but mainly because
+    /// the other shared channel already had the code implemented
+    channels: atomic::AtomicUint,
+
+    /// The state field is protected by this mutex
+    lock: NativeMutex,
+    state: UnsafeCell<State<T>>,
+}
+
+struct State<T> {
+    disconnected: bool, // Is the channel disconnected yet?
+    queue: Queue,       // queue of senders waiting to send data
+    blocker: Blocker,   // currently blocked task on this channel
+    buf: Buffer<T>,     // storage for buffered messages
+    cap: uint,          // capacity of this channel
+
+    /// A curious flag used to indicate whether a sender failed or succeeded in
+    /// blocking. This is used to transmit information back to the task that it
+    /// must dequeue its message from the buffer because it was not received.
+    /// This is only relevant in the 0-buffer case. This obviously cannot be
+    /// safely constructed, but it's guaranteed to always have a valid pointer
+    /// value.
+    canceled: Option<&'static mut bool>,
+}
+
+/// Possible flavors of tasks who can be blocked on this channel.
+enum Blocker {
+    BlockedSender(BlockedTask),
+    BlockedReceiver(BlockedTask),
+    NoneBlocked
+}
+
+/// Simple queue for threading tasks together. Nodes are stack-allocated, so
+/// this structure is not safe at all
+struct Queue {
+    head: *mut Node,
+    tail: *mut Node,
+}
+
+struct Node {
+    task: Option<BlockedTask>,
+    next: *mut Node,
+}
+
+/// A simple ring-buffer
+struct Buffer<T> {
+    buf: Vec<Option<T>>,
+    start: uint,
+    size: uint,
+}
+
+#[deriving(Show)]
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
+/// in the meantime. This re-locks the mutex upon returning.
+fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
+        lock: &NativeMutex) {
+    let me: Box<Task> = Local::take();
+    me.deschedule(1, |task| {
+        match mem::replace(slot, f(task)) {
+            NoneBlocked => {}
+            _ => unreachable!(),
+        }
+        unsafe { lock.unlock_noguard(); }
+        Ok(())
+    });
+    unsafe { lock.lock_noguard(); }
+}
+
+/// Wakes up a task, dropping the lock at the correct time
+fn wakeup(task: BlockedTask, guard: LockGuard) {
+    // We need to be careful to wake up the waiting task *outside* of the mutex
+    // in case it incurs a context switch.
+    mem::drop(guard);
+    task.wake().map(|t| t.reawaken());
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new(cap: uint) -> Packet<T> {
+        Packet {
+            channels: atomic::AtomicUint::new(1),
+            lock: unsafe { NativeMutex::new() },
+            state: UnsafeCell::new(State {
+                disconnected: false,
+                blocker: NoneBlocked,
+                cap: cap,
+                canceled: None,
+                queue: Queue {
+                    head: 0 as *mut Node,
+                    tail: 0 as *mut Node,
+                },
+                buf: Buffer {
+                    buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
+                    start: 0,
+                    size: 0,
+                },
+            }),
+        }
+    }
+
+    // Locks this channel, returning a guard for the state and the mutable state
+    // itself. Care should be taken to ensure that the state does not escape the
+    // guard!
+    //
+    // Note that we're ok promoting an & reference to an &mut reference because
+    // the lock ensures that we're the only ones in the world with a pointer to
+    // the state.
+    fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
+        unsafe {
+            let guard = self.lock.lock();
+            (guard, &mut *self.state.get())
+        }
+    }
+
+    pub fn send(&self, t: T) -> Result<(), T> {
+        let (guard, state) = self.lock();
+
+        // wait for a slot to become available, and enqueue the data
+        while !state.disconnected && state.buf.size() == state.buf.cap() {
+            state.queue.enqueue(&self.lock);
+        }
+        if state.disconnected { return Err(t) }
+        state.buf.enqueue(t);
+
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            // if our capacity is 0, then we need to wait for a receiver to be
+            // available to take our data. After waiting, we check again to make
+            // sure the port didn't go away in the meantime. If it did, we need
+            // to hand back our data.
+            NoneBlocked if state.cap == 0 => {
+                let mut canceled = false;
+                assert!(state.canceled.is_none());
+                state.canceled = Some(unsafe { mem::transmute(&mut canceled) });
+                wait(&mut state.blocker, BlockedSender, &self.lock);
+                if canceled {Err(state.buf.dequeue())} else {Ok(())}
+            }
+
+            // success, we buffered some data
+            NoneBlocked => Ok(()),
+
+            // success, someone's about to receive our buffered data.
+            BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
+
+            BlockedSender(..) => panic!("lolwut"),
+        }
+    }
+
+    pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
+        let (guard, state) = self.lock();
+        if state.disconnected {
+            Err(super::RecvDisconnected(t))
+        } else if state.buf.size() == state.buf.cap() {
+            Err(super::Full(t))
+        } else if state.cap == 0 {
+            // With capacity 0, even though we have buffer space we can't
+            // transfer the data unless there's a receiver waiting.
+            match mem::replace(&mut state.blocker, NoneBlocked) {
+                NoneBlocked => Err(super::Full(t)),
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(task) => {
+                    state.buf.enqueue(t);
+                    wakeup(task, guard);
+                    Ok(())
+                }
+            }
+        } else {
+            // If the buffer has some space and the capacity isn't 0, then we
+            // just enqueue the data for later retrieval, ensuring to wake up
+            // any blocked receiver if there is one.
+            assert!(state.buf.size() < state.buf.cap());
+            state.buf.enqueue(t);
+            match mem::replace(&mut state.blocker, NoneBlocked) {
+                BlockedReceiver(task) => wakeup(task, guard),
+                NoneBlocked => {}
+                BlockedSender(..) => unreachable!(),
+            }
+            Ok(())
+        }
+    }
+
+    // Receives a message from this channel
+    //
+    // When reading this, remember that there can only ever be one receiver at
+    // time.
+    pub fn recv(&self) -> Result<T, ()> {
+        let (guard, state) = self.lock();
+
+        // Wait for the buffer to have something in it. No need for a while loop
+        // because we're the only receiver.
+        let mut waited = false;
+        if !state.disconnected && state.buf.size() == 0 {
+            wait(&mut state.blocker, BlockedReceiver, &self.lock);
+            waited = true;
+        }
+        if state.disconnected && state.buf.size() == 0 { return Err(()) }
+
+        // Pick up the data, wake up our neighbors, and carry on
+        assert!(state.buf.size() > 0);
+        let ret = state.buf.dequeue();
+        self.wakeup_senders(waited, guard, state);
+        return Ok(ret);
+    }
+
+    pub fn try_recv(&self) -> Result<T, Failure> {
+        let (guard, state) = self.lock();
+
+        // Easy cases first
+        if state.disconnected { return Err(Disconnected) }
+        if state.buf.size() == 0 { return Err(Empty) }
+
+        // Be sure to wake up neighbors
+        let ret = Ok(state.buf.dequeue());
+        self.wakeup_senders(false, guard, state);
+
+        return ret;
+    }
+
+    // Wake up pending senders after some data has been received
+    //
+    // * `waited` - flag if the receiver blocked to receive some data, or if it
+    //              just picked up some data on the way out
+    // * `guard` - the lock guard that is held over this channel's lock
+    fn wakeup_senders(&self, waited: bool,
+                      guard: LockGuard,
+                      state: &mut State<T>) {
+        let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
+
+        // If this is a no-buffer channel (cap == 0), then if we didn't wait we
+        // need to ACK the sender. If we waited, then the sender waking us up
+        // was already the ACK.
+        let pending_sender2 = if state.cap == 0 && !waited {
+            match mem::replace(&mut state.blocker, NoneBlocked) {
+                NoneBlocked => None,
+                BlockedReceiver(..) => unreachable!(),
+                BlockedSender(task) => {
+                    state.canceled.take();
+                    Some(task)
+                }
+            }
+        } else {
+            None
+        };
+        mem::drop((state, guard));
+
+        // only outside of the lock do we wake up the pending tasks
+        pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
+        pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&self) {
+        self.channels.fetch_add(1, atomic::SeqCst);
+    }
+
+    pub fn drop_chan(&self) {
+        // Only flag the channel as disconnected if we're the last channel
+        match self.channels.fetch_sub(1, atomic::SeqCst) {
+            1 => {}
+            _ => return
+        }
+
+        // Not much to do other than wake up a receiver if one's there
+        let (guard, state) = self.lock();
+        if state.disconnected { return }
+        state.disconnected = true;
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => {}
+            BlockedSender(..) => unreachable!(),
+            BlockedReceiver(task) => wakeup(task, guard),
+        }
+    }
+
+    pub fn drop_port(&self) {
+        let (guard, state) = self.lock();
+
+        if state.disconnected { return }
+        state.disconnected = true;
+
+        // If the capacity is 0, then the sender may want its data back after
+        // we're disconnected. Otherwise it's now our responsibility to destroy
+        // the buffered data. As with many other portions of this code, this
+        // needs to be careful to destroy the data *outside* of the lock to
+        // prevent deadlock.
+        let _data = if state.cap != 0 {
+            mem::replace(&mut state.buf.buf, Vec::new())
+        } else {
+            Vec::new()
+        };
+        let mut queue = mem::replace(&mut state.queue, Queue {
+            head: 0 as *mut Node,
+            tail: 0 as *mut Node,
+        });
+
+        let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => None,
+            BlockedSender(task) => {
+                *state.canceled.take().unwrap() = true;
+                Some(task)
+            }
+            BlockedReceiver(..) => unreachable!(),
+        };
+        mem::drop((state, guard));
+
+        loop {
+            match queue.dequeue() {
+                Some(task) => { task.wake().map(|t| t.reawaken()); }
+                None => break,
+            }
+        }
+        waiter.map(|t| t.wake().map(|t| t.reawaken()));
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // If Ok, the value is whether this port has data, if Err, then the upgraded
+    // port needs to be checked instead of this one.
+    pub fn can_recv(&self) -> bool {
+        let (_g, state) = self.lock();
+        state.disconnected || state.buf.size() > 0
+    }
+
+    // Attempts to start selection on this port. This can either succeed or fail
+    // because there is data waiting.
+    pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
+        let (_g, state) = self.lock();
+        if state.disconnected || state.buf.size() > 0 {
+            Err(task)
+        } else {
+            match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
+                NoneBlocked => {}
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(..) => unreachable!(),
+            }
+            Ok(())
+        }
+    }
+
+    // Remove a previous selecting task from this port. This ensures that the
+    // blocked task will no longer be visible to any other threads.
+    //
+    // The return value indicates whether there's data on this port.
+    pub fn abort_selection(&self) -> bool {
+        let (_g, state) = self.lock();
+        match mem::replace(&mut state.blocker, NoneBlocked) {
+            NoneBlocked => true,
+            BlockedSender(task) => {
+                state.blocker = BlockedSender(task);
+                true
+            }
+            BlockedReceiver(task) => { task.trash(); false }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        assert_eq!(self.channels.load(atomic::SeqCst), 0);
+        let (_g, state) = self.lock();
+        assert!(state.queue.dequeue().is_none());
+        assert!(state.canceled.is_none());
+    }
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Buffer, a simple ring buffer backed by Vec<T>
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Buffer<T> {
+    fn enqueue(&mut self, t: T) {
+        let pos = (self.start + self.size) % self.buf.len();
+        self.size += 1;
+        let prev = mem::replace(&mut self.buf[pos], Some(t));
+        assert!(prev.is_none());
+    }
+
+    fn dequeue(&mut self) -> T {
+        let start = self.start;
+        self.size -= 1;
+        self.start = (self.start + 1) % self.buf.len();
+        self.buf[start].take().unwrap()
+    }
+
+    fn size(&self) -> uint { self.size }
+    fn cap(&self) -> uint { self.buf.len() }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
+////////////////////////////////////////////////////////////////////////////////
+
+impl Queue {
+    fn enqueue(&mut self, lock: &NativeMutex) {
+        let task: Box<Task> = Local::take();
+        let mut node = Node {
+            task: None,
+            next: 0 as *mut Node,
+        };
+        task.deschedule(1, |task| {
+            node.task = Some(task);
+            if self.tail.is_null() {
+                self.head = &mut node as *mut Node;
+                self.tail = &mut node as *mut Node;
+            } else {
+                unsafe {
+                    (*self.tail).next = &mut node as *mut Node;
+                    self.tail = &mut node as *mut Node;
+                }
+            }
+            unsafe { lock.unlock_noguard(); }
+            Ok(())
+        });
+        unsafe { lock.lock_noguard(); }
+        assert!(node.next.is_null());
+    }
+
+    fn dequeue(&mut self) -> Option<BlockedTask> {
+        if self.head.is_null() {
+            return None
+        }
+        let node = self.head;
+        self.head = unsafe { (*node).next };
+        if self.head.is_null() {
+            self.tail = 0 as *mut Node;
+        }
+        unsafe {
+            (*node).next = 0 as *mut Node;
+            Some((*node).task.take().unwrap())
+        }
+    }
+}