about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-05 17:56:17 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-16 17:47:11 -0800
commitbfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch)
treeb10aeff181eff3a8654df495d2ad8826490f6533
parent000cda611f8224ac780fa37432f869f425cd2bb7 (diff)
downloadrust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz
rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
    * Based on a single-producer single-consumer lock-free queue that doesn't
      always have to allocate on every send.
    * Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
    * try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
    * try_recv == never block
    * recv_opt == block, don't fail
    * iter() == Iterator<T> for Port<T>
    * removed peek
    * Type::new
* Removed rt::comm
-rw-r--r--src/libstd/comm.rs311
-rw-r--r--src/libstd/comm/imp.rs337
-rw-r--r--src/libstd/comm/mod.rs1371
-rw-r--r--src/libstd/comm/select.rs498
-rw-r--r--src/libstd/lib.rs3
-rw-r--r--src/libstd/rt/mpmc_bounded_queue.rs18
-rw-r--r--src/libstd/rt/mpsc_queue.rs230
-rw-r--r--src/libstd/rt/spsc_queue.rs296
-rw-r--r--src/libstd/rt/task.rs1
9 files changed, 2631 insertions, 434 deletions
diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs
deleted file mode 100644
index c5ed464de23..00000000000
--- a/src/libstd/comm.rs
+++ /dev/null
@@ -1,311 +0,0 @@
-// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-/*!
-Message passing
-*/
-
-#[allow(missing_doc)];
-
-use clone::Clone;
-use iter::Iterator;
-use kinds::Send;
-use option::Option;
-use rtcomm = rt::comm;
-
-/// A trait for things that can send multiple messages.
-pub trait GenericChan<T> {
-    /// Sends a message.
-    fn send(&self, x: T);
-}
-
-/// Things that can send multiple messages and can detect when the receiver
-/// is closed
-pub trait GenericSmartChan<T> {
-    /// Sends a message, or report if the receiver has closed the connection.
-    fn try_send(&self, x: T) -> bool;
-}
-
-/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
-pub trait SendDeferred<T> {
-    fn send_deferred(&self, val: T);
-    fn try_send_deferred(&self, val: T) -> bool;
-}
-
-/// A trait for things that can receive multiple messages.
-pub trait GenericPort<T> {
-    /// Receives a message, or fails if the connection closes.
-    fn recv(&self) -> T;
-
-    /// Receives a message, or returns `none` if
-    /// the connection is closed or closes.
-    fn try_recv(&self) -> Option<T>;
-
-    /// Returns an iterator that breaks once the connection closes.
-    ///
-    /// # Example
-    ///
-    /// ~~~rust
-    /// do spawn {
-    ///     for x in port.recv_iter() {
-    ///         if pred(x) { break; }
-    ///         println!("{}", x);
-    ///     }
-    /// }
-    /// ~~~
-    fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> {
-        RecvIterator { port: self }
-    }
-}
-
-pub struct RecvIterator<'a, P> {
-    priv port: &'a P,
-}
-
-impl<'a, T, P: GenericPort<T>> Iterator<T> for RecvIterator<'a, P> {
-    fn next(&mut self) -> Option<T> {
-        self.port.try_recv()
-    }
-}
-
-/// Ports that can `peek`
-pub trait Peekable<T> {
-    /// Returns true if a message is available
-    fn peek(&self) -> bool;
-}
-
-/* priv is disabled to allow users to get at traits like Select. */
-pub struct PortOne<T> { /* priv */ x: rtcomm::PortOne<T> }
-pub struct ChanOne<T> { /* priv */ x: rtcomm::ChanOne<T> }
-
-pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
-    let (p, c) = rtcomm::oneshot();
-    (PortOne { x: p }, ChanOne { x: c })
-}
-
-pub struct Port<T> { /* priv */ x: rtcomm::Port<T> }
-pub struct Chan<T> { /* priv */ x: rtcomm::Chan<T> }
-
-pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
-    let (p, c) = rtcomm::stream();
-    (Port { x: p }, Chan { x: c })
-}
-
-impl<T: Send> ChanOne<T> {
-    pub fn send(self, val: T) {
-        let ChanOne { x: c } = self;
-        c.send(val)
-    }
-
-    pub fn try_send(self, val: T) -> bool {
-        let ChanOne { x: c } = self;
-        c.try_send(val)
-    }
-
-    pub fn send_deferred(self, val: T) {
-        let ChanOne { x: c } = self;
-        c.send_deferred(val)
-    }
-
-    pub fn try_send_deferred(self, val: T) -> bool {
-        let ChanOne{ x: c } = self;
-        c.try_send_deferred(val)
-    }
-}
-
-impl<T: Send> PortOne<T> {
-    pub fn recv(self) -> T {
-        let PortOne { x: p } = self;
-        p.recv()
-    }
-
-    pub fn try_recv(self) -> Option<T> {
-        let PortOne { x: p } = self;
-        p.try_recv()
-    }
-}
-
-impl<T: Send> Peekable<T>  for PortOne<T> {
-    fn peek(&self) -> bool {
-        let &PortOne { x: ref p } = self;
-        p.peek()
-    }
-}
-
-impl<T: Send> GenericChan<T> for Chan<T> {
-    fn send(&self, val: T) {
-        let &Chan { x: ref c } = self;
-        c.send(val)
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for Chan<T> {
-    fn try_send(&self, val: T) -> bool {
-        let &Chan { x: ref c } = self;
-        c.try_send(val)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for Chan<T> {
-    fn send_deferred(&self, val: T) {
-        let &Chan { x: ref c } = self;
-        c.send_deferred(val)
-    }
-
-    fn try_send_deferred(&self, val: T) -> bool {
-        let &Chan { x: ref c } = self;
-        c.try_send_deferred(val)
-    }
-}
-
-impl<T: Send> GenericPort<T> for Port<T> {
-    fn recv(&self) -> T {
-        let &Port { x: ref p } = self;
-        p.recv()
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        let &Port { x: ref p } = self;
-        p.try_recv()
-    }
-}
-
-impl<T: Send> Peekable<T> for Port<T> {
-    fn peek(&self) -> bool {
-        let &Port { x: ref p } = self;
-        p.peek()
-    }
-}
-
-
-pub struct SharedChan<T> { /* priv */ x: rtcomm::SharedChan<T> }
-
-impl<T: Send> SharedChan<T> {
-    pub fn new(c: Chan<T>) -> SharedChan<T> {
-        let Chan { x: c } = c;
-        SharedChan { x: rtcomm::SharedChan::new(c) }
-    }
-}
-
-impl<T: Send> GenericChan<T> for SharedChan<T> {
-    fn send(&self, val: T) {
-        let &SharedChan { x: ref c } = self;
-        c.send(val)
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
-    fn try_send(&self, val: T) -> bool {
-        let &SharedChan { x: ref c } = self;
-        c.try_send(val)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for SharedChan<T> {
-    fn send_deferred(&self, val: T) {
-        let &SharedChan { x: ref c } = self;
-        c.send_deferred(val)
-    }
-
-    fn try_send_deferred(&self, val: T) -> bool {
-        let &SharedChan { x: ref c } = self;
-        c.try_send_deferred(val)
-    }
-}
-
-impl<T: Send> Clone for SharedChan<T> {
-    fn clone(&self) -> SharedChan<T> {
-        let &SharedChan { x: ref c } = self;
-        SharedChan { x: c.clone() }
-    }
-}
-
-pub struct SharedPort<T> { /* priv */ x: rtcomm::SharedPort<T> }
-
-impl<T: Send> SharedPort<T> {
-    pub fn new(p: Port<T>) -> SharedPort<T> {
-        let Port { x: p } = p;
-        SharedPort { x: rtcomm::SharedPort::new(p) }
-    }
-}
-
-impl<T: Send> GenericPort<T> for SharedPort<T> {
-    fn recv(&self) -> T {
-        let &SharedPort { x: ref p } = self;
-        p.recv()
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        let &SharedPort { x: ref p } = self;
-        p.try_recv()
-    }
-}
-
-impl<T: Send> Clone for SharedPort<T> {
-    fn clone(&self) -> SharedPort<T> {
-        let &SharedPort { x: ref p } = self;
-        SharedPort { x: p.clone() }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use comm::*;
-    use prelude::*;
-
-    #[test]
-    fn test_nested_recv_iter() {
-        let (port, chan) = stream::<int>();
-        let (total_port, total_chan) = oneshot::<int>();
-
-        do spawn {
-            let mut acc = 0;
-            for x in port.recv_iter() {
-                acc += x;
-                for x in port.recv_iter() {
-                    acc += x;
-                    for x in port.try_recv().move_iter() {
-                        acc += x;
-                        total_chan.send(acc);
-                    }
-                }
-            }
-        }
-
-        chan.send(3);
-        chan.send(1);
-        chan.send(2);
-        assert_eq!(total_port.recv(), 6);
-    }
-
-    #[test]
-    fn test_recv_iter_break() {
-        let (port, chan) = stream::<int>();
-        let (count_port, count_chan) = oneshot::<int>();
-
-        do spawn {
-            let mut count = 0;
-            for x in port.recv_iter() {
-                if count >= 3 {
-                    count_chan.send(count);
-                    break;
-                } else {
-                    count += x;
-                }
-            }
-        }
-
-        chan.send(2);
-        chan.send(2);
-        chan.send(2);
-        chan.send(2);
-        assert_eq!(count_port.recv(), 4);
-    }
-}
diff --git a/src/libstd/comm/imp.rs b/src/libstd/comm/imp.rs
new file mode 100644
index 00000000000..bd1d6fed901
--- /dev/null
+++ b/src/libstd/comm/imp.rs
@@ -0,0 +1,337 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! One of the major goals behind this channel implementation is to work
+//! seamlessly on and off the runtime. This also means that the code isn't
+//! littered with "if is_green() { ... } else { ... }". Right now, the rest of
+//! the runtime isn't quite ready to for this abstraction to be done very
+//! nicely, so the conditional "if green" blocks are all contained in this inner
+//! module.
+//!
+//! The goal of this module is to mirror what the runtime "should be", not the
+//! state that it is currently in today. You'll notice that there is no mention
+//! of schedulers or is_green inside any of the channel code, it is currently
+//! entirely contained in this one module.
+//!
+//! In the ideal world, nothing in this module exists and it is all implemented
+//! elsewhere in the runtime (in the proper location). All of this code is
+//! structured in order to easily refactor this to the correct location whenever
+//! we have the trait objects in place to serve as the boundary of the
+//! abstraction.
+
+use iter::{range, Iterator};
+use ops::Drop;
+use option::{Some, None, Option};
+use rt::local::Local;
+use rt::sched::{SchedHandle, Scheduler, TaskFromFriend};
+use rt::thread::Thread;
+use rt;
+use unstable::mutex::Mutex;
+use unstable::sync::UnsafeArc;
+
+// A task handle is a method of waking up a blocked task. The handle itself
+// is completely opaque and only has a wake() method defined on it. This
+// method will wake the method regardless of the context of the thread which
+// is currently calling wake().
+//
+// This abstraction should be able to be created when putting a task to
+// sleep. This should basically be a method on whatever the local Task is,
+// consuming the local Task.
+
+pub struct TaskHandle {
+    priv inner: TaskRepr
+}
+enum TaskRepr {
+    Green(rt::BlockedTask, *mut SchedHandle),
+    Native(NativeWakeupStyle),
+}
+enum NativeWakeupStyle {
+    ArcWakeup(UnsafeArc<Mutex>),    // shared mutex to synchronize on
+    LocalWakeup(*mut Mutex),        // synchronize on the task-local mutex
+}
+
+impl TaskHandle {
+    // Signal that this handle should be woken up. The `can_resched`
+    // argument indicates whether the current task could possibly be
+    // rescheduled or not. This does not have a lot of meaning for the
+    // native case, but for an M:N case it indicates whether a context
+    // switch can happen or not.
+    pub fn wake(self, can_resched: bool) {
+        match self.inner {
+            Green(task, handle) => {
+                // If we have a local scheduler, then use that to run the
+                // blocked task, otherwise we can use the handle to send the
+                // task back to its home.
+                if rt::in_green_task_context() {
+                    if can_resched {
+                        task.wake().map(Scheduler::run_task);
+                    } else {
+                        let mut s: ~Scheduler = Local::take();
+                        s.enqueue_blocked_task(task);
+                        Local::put(s);
+                    }
+                } else {
+                    let task = match task.wake() {
+                        Some(task) => task, None => return
+                    };
+                    // XXX: this is not an easy section of code to refactor.
+                    //      If this handle is owned by the Task (which it
+                    //      should be), then this would be a use-after-free
+                    //      because once the task is pushed onto the message
+                    //      queue, the handle is gone.
+                    //
+                    //      Currently the handle is instead owned by the
+                    //      Port/Chan pair, which means that because a
+                    //      channel is invoking this method the handle will
+                    //      continue to stay alive for the entire duration
+                    //      of this method. This will require thought when
+                    //      moving the handle into the task.
+                    unsafe { (*handle).send(TaskFromFriend(task)) }
+                }
+            }
+
+            // Note that there are no use-after-free races in this code. In
+            // the arc-case, we own the lock, and in the local case, we're
+            // using a lock so it's guranteed that they aren't running while
+            // we hold the lock.
+            Native(ArcWakeup(lock)) => {
+                unsafe {
+                    let lock = lock.get();
+                    (*lock).lock();
+                    (*lock).signal();
+                    (*lock).unlock();
+                }
+            }
+            Native(LocalWakeup(lock)) => {
+                unsafe {
+                    (*lock).lock();
+                    (*lock).signal();
+                    (*lock).unlock();
+                }
+            }
+        }
+    }
+
+    // Trashes handle to this task. This ensures that necessary memory is
+    // deallocated, and there may be some extra assertions as well.
+    pub fn trash(self) {
+        match self.inner {
+            Green(task, _) => task.assert_already_awake(),
+            Native(..) => {}
+        }
+    }
+}
+
+// This structure is an abstraction of what should be stored in the local
+// task itself. This data is currently stored inside of each channel, but
+// this should rather be stored in each task (and channels will still
+// continue to lazily initialize this data).
+
+pub struct TaskData {
+    priv handle: Option<SchedHandle>,
+    priv lock: Mutex,
+}
+
+impl TaskData {
+    pub fn new() -> TaskData {
+        TaskData {
+            handle: None,
+            lock: unsafe { Mutex::empty() },
+        }
+    }
+}
+
+impl Drop for TaskData {
+    fn drop(&mut self) {
+        unsafe { self.lock.destroy() }
+    }
+}
+
+// Now this is the really fun part. This is where all the M:N/1:1-agnostic
+// along with recv/select-agnostic blocking information goes. A "blocking
+// context" is really just a stack-allocated structure (which is probably
+// fine to be a stack-trait-object).
+//
+// This has some particularly strange interfaces, but the reason for all
+// this is to support selection/recv/1:1/M:N all in one bundle.
+
+pub struct BlockingContext<'a> {
+    priv inner: BlockingRepr<'a>
+}
+
+enum BlockingRepr<'a> {
+    GreenBlock(rt::BlockedTask, &'a mut Scheduler),
+    NativeBlock(Option<UnsafeArc<Mutex>>),
+}
+
+impl<'a> BlockingContext<'a> {
+    // Creates one blocking context. The data provided should in theory be
+    // acquired from the local task, but it is instead acquired from the
+    // channel currently.
+    //
+    // This function will call `f` with a blocking context, plus the data
+    // that it is given. This function will then return whether this task
+    // should actually go to sleep or not. If `true` is returned, then this
+    // function does not return until someone calls `wake()` on the task.
+    // If `false` is returned, then this function immediately returns.
+    //
+    // # Safety note
+    //
+    // Note that this stack closure may not be run on the same stack as when
+    // this function was called. This means that the environment of this
+    // stack closure could be unsafely aliased. This is currently prevented
+    // through the guarantee that this function will never return before `f`
+    // finishes executing.
+    pub fn one(data: &mut TaskData,
+               f: |BlockingContext, &mut TaskData| -> bool) {
+        if rt::in_green_task_context() {
+            let sched: ~Scheduler = Local::take();
+            sched.deschedule_running_task_and_then(|sched, task| {
+                let ctx = BlockingContext { inner: GreenBlock(task, sched) };
+                // no need to do something on success/failure other than
+                // returning because the `block` function for a BlockingContext
+                // takes care of reawakening itself if the blocking procedure
+                // fails. If this function is successful, then we're already
+                // blocked, and if it fails, the task will already be
+                // rescheduled.
+                f(ctx, data);
+            });
+        } else {
+            unsafe { data.lock.lock(); }
+            let ctx = BlockingContext { inner: NativeBlock(None) };
+            if f(ctx, data) {
+                unsafe { data.lock.wait(); }
+            }
+            unsafe { data.lock.unlock(); }
+        }
+    }
+
+    // Creates many blocking contexts. The intended use case for this
+    // function is selection over a number of ports. This will create `amt`
+    // blocking contexts, yielding them to `f` in turn. If `f` returns
+    // false, then this function aborts and returns immediately. If `f`
+    // repeatedly returns `true` `amt` times, then this function will block.
+    pub fn many(amt: uint, f: |BlockingContext| -> bool) {
+        if rt::in_green_task_context() {
+            let sched: ~Scheduler = Local::take();
+            sched.deschedule_running_task_and_then(|sched, task| {
+                for handle in task.make_selectable(amt) {
+                    let ctx = BlockingContext {
+                        inner: GreenBlock(handle, sched)
+                    };
+                    // see comment above in `one` for why no further action is
+                    // necessary here
+                    if !f(ctx) { break }
+                }
+            });
+        } else {
+            // In the native case, our decision to block must be shared
+            // amongst all of the channels. It may be possible to
+            // stack-allocate this mutex (instead of putting it in an
+            // UnsafeArc box), but for now in order to prevent
+            // use-after-free trivially we place this into a box and then
+            // pass that around.
+            unsafe {
+                let mtx = UnsafeArc::new(Mutex::new());
+                (*mtx.get()).lock();
+                let success = range(0, amt).all(|_| {
+                    f(BlockingContext {
+                        inner: NativeBlock(Some(mtx.clone()))
+                    })
+                });
+                if success {
+                    (*mtx.get()).wait();
+                }
+                (*mtx.get()).unlock();
+            }
+        }
+    }
+
+    // This function will consume this BlockingContext, and optionally block
+    // if according to the atomic `decision` function. The semantics of this
+    // functions are:
+    //
+    //  * `slot` is required to be a `None`-slot (which is owned by the
+    //    channel)
+    //  * The `slot` will be filled in with a blocked version of the current
+    //    task (with `wake`-ability if this function is successful).
+    //  * If the `decision` function returns true, then this function
+    //    immediately returns having relinquished ownership of the task.
+    //  * If the `decision` function returns false, then the `slot` is reset
+    //    to `None` and the task is re-scheduled if necessary (remember that
+    //    the task will not resume executing before the outer `one` or
+    //    `many` function has returned. This function is expected to have a
+    //    release memory fence in order for the modifications of `to_wake` to be
+    //    visible to other tasks. Code which attempts to read `to_wake` should
+    //    have an acquiring memory fence to guarantee that this write is
+    //    visible.
+    //
+    // This function will return whether the blocking occurred or not.
+    pub fn block(self,
+                 data: &mut TaskData,
+                 slot: &mut Option<TaskHandle>,
+                 decision: || -> bool) -> bool {
+        assert!(slot.is_none());
+        match self.inner {
+            GreenBlock(task, sched) => {
+                if data.handle.is_none() {
+                    data.handle = Some(sched.make_handle());
+                }
+                let handle = data.handle.get_mut_ref() as *mut SchedHandle;
+                *slot = Some(TaskHandle { inner: Green(task, handle) });
+
+                if !decision() {
+                    match slot.take_unwrap().inner {
+                        Green(task, _) => sched.enqueue_blocked_task(task),
+                        Native(..) => unreachable!()
+                    }
+                    false
+                } else {
+                    true
+                }
+            }
+            NativeBlock(shared) => {
+                *slot = Some(TaskHandle {
+                    inner: Native(match shared {
+                        Some(arc) => ArcWakeup(arc),
+                        None => LocalWakeup(&mut data.lock as *mut Mutex),
+                    })
+                });
+
+                if !decision() {
+                    *slot = None;
+                    false
+                } else {
+                    true
+                }
+            }
+        }
+    }
+}
+
+// Agnostic method of forcing a yield of the current task
+pub fn yield_now() {
+    if rt::in_green_task_context() {
+        let sched: ~Scheduler = Local::take();
+        sched.yield_now();
+    } else {
+        Thread::yield_now();
+    }
+}
+
+// Agnostic method of "maybe yielding" in order to provide fairness
+pub fn maybe_yield() {
+    if rt::in_green_task_context() {
+        let sched: ~Scheduler = Local::take();
+        sched.maybe_yield();
+    } else {
+        // the OS decides fairness, nothing for us to do.
+    }
+}
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
new file mode 100644
index 00000000000..9a65e9973cb
--- /dev/null
+++ b/src/libstd/comm/mod.rs
@@ -0,0 +1,1371 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Rust Communication Primitives
+//!
+//! Rust makes it very difficult to share data among tasks to prevent race
+//! conditions and to improve parallelism, but there is often a need for
+//! communication between concurrent tasks. The primitives defined in this
+//! module are the building blocks for synchronization in rust.
+//!
+//! This module currently provides three main types:
+//!
+//! * `Chan`
+//! * `Port`
+//! * `SharedChan`
+//!
+//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A
+//! `SharedChan` is clone-able such that many tasks can send simultaneously to
+//! one receiving port. These communication primitives are *task blocking*, not
+//! *thread blocking*. This means that if one task is blocked on a channel,
+//! other tasks can continue to make progress.
+//!
+//! Rust channels can be used as if they have an infinite internal buffer. What
+//! this means is that the `send` operation will never block. `Port`s, on the
+//! other hand, will block the task if there is no data to be received.
+//!
+//! ## Failure Propagation
+//!
+//! In addition to being a core primitive for communicating in rust, channels
+//! and ports are the points at which failure is propagated among tasks.
+//! Whenever the one half of channel is closed, the other half will have its
+//! next operation `fail!`. The purpose of this is to allow propagation of
+//! failure among tasks that are linked to one another via channels.
+//!
+//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform
+//! their respective operations without failing, however.
+//!
+//! ## Outside the Runtime
+//!
+//! All channels and ports work seamlessly inside and outside of the rust
+//! runtime. This means that code may use channels to communicate information
+//! inside and outside of the runtime. For example, if rust were embedded as an
+//! FFI module in another application, the rust runtime would probably be
+//! running in its own external thread pool. Channels created can communicate
+//! from the native application threads to the rust threads through the use of
+//! native mutexes and condition variables.
+//!
+//! What this means is that if a native thread is using a channel, execution
+//! will be blocked accordingly by blocking the OS thread.
+//!
+//! # Example
+//!
+//! ```rust
+//! // Create a simple streaming channel
+//! let (port, chan) = Chan::new();
+//! do spawn {
+//!     chan.send(10);
+//! }
+//! assert_eq!(port.recv(), 10);
+//!
+//! // Create a shared channel which can be sent along from many tasks
+//! let (port, chan) = SharedChan::new();
+//! for i in range(0, 10) {
+//!     let chan = chan.clone();
+//!     do spawn {
+//!         chan.send(i);
+//!     }
+//! }
+//!
+//! for _ in range(0, 10) {
+//!     let j = port.recv();
+//!     assert!(0 <= j && j < 10);
+//! }
+//!
+//! // The call to recv() will fail!() because the channel has already hung
+//! // up (or been deallocated)
+//! let (port, chan) = Chan::new();
+//! drop(chan);
+//! port.recv();
+//! ```
+
+// A description of how Rust's channel implementation works
+//
+// Channels are supposed to be the basic building block for all other
+// concurrent primitives that are used in Rust. As a result, the channel type
+// needs to be highly optimized, flexible, and broad enough for use everywhere.
+//
+// The choice of implementation of all channels is to be built on lock-free data
+// structures. The channels themselves are then consequently also lock-free data
+// structures. As always with lock-free code, this is a very "here be dragons"
+// territory, especially because I'm unaware of any academic papers which have
+// gone into great length about channels of these flavors.
+//
+// ## Flavors of channels
+//
+// Rust channels come in two flavors: streams and shared channels. A stream has
+// one sender and one receiver while a shared channel could have multiple
+// senders. This choice heavily influences the design of the protocol set
+// forth for both senders/receivers.
+//
+// ## Concurrent queues
+//
+// The basic idea of Rust's Chan/Port types is that send() never blocks, but
+// recv() obviously blocks. This means that under the hood there must be some
+// shared and concurrent queue holding all of the actual data.
+//
+// With two flavors of channels, two flavors of queues are also used. We have
+// chosen to use queues from a well-known author which are abbreviated as SPSC
+// and MPSC (single producer, single consumer and multiple producer, single
+// consumer). SPSC queues are used for streams while MPSC queues are used for
+// shared channels.
+//
+// ### SPSC optimizations
+//
+// The SPSC queue found online is essentially a linked list of nodes where one
+// half of the nodes are the "queue of data" and the other half of nodes are a
+// cache of unused nodes. The unused nodes are used such that an allocation is
+// not required on every push() and a free doesn't need to happen on every
+// pop().
+//
+// As found online, however, the cache of nodes is of an infinite size. This
+// means that if a channel at one point in its life had 50k items in the queue,
+// then the queue will always have the capacity for 50k items. I believed that
+// this was an unnecessary limitation of the implementation, so I have altered
+// the queue to optionally have a bound on the cache size.
+//
+// By default, streams will have an unbounded SPSC queue with a small-ish cache
+// size. The hope is that the cache is still large enough to have very fast
+// send() operations while not too large such that millions of channels can
+// coexist at once.
+//
+// ### MPSC optimizations
+//
+// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
+// a linked list under the hood to earn its unboundedness, but I have not put
+// forth much effort into having a cache of nodes similar to the SPSC queue.
+//
+// For now, I believe that this is "ok" because shared channels are not the most
+// common type, but soon we may wish to revisit this queue choice and determine
+// another candidate for backend storage of shared channels.
+//
+// ## Overview of the Implementation
+//
+// Now that there's a little background on the concurrent queues used, it's
+// worth going into much more detail about the channels themselves. The basic
+// pseudocode for a send/recv are:
+//
+//
+//      send(t)                             recv()
+//        queue.push(t)                       return if queue.pop()
+//        if increment() == -1                deschedule {
+//          wakeup()                            if decrement() > 0
+//                                                cancel_deschedule()
+//                                            }
+//                                            queue.pop()
+//
+// As mentioned before, there are no locks in this implementation, only atomic
+// instructions are used.
+//
+// ### The internal atomic counter
+//
+// Every channel/port/shared channel have a shared counter with their
+// counterparts to keep track of the size of the queue. This counter is used to
+// abort descheduling by the receiver and to know when to wake up on the sending
+// side.
+//
+// As seen in the pseudocode, senders will increment this count and receivers
+// will decrement the count. The theory behind this is that if a sender sees a
+// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
+// then it doesn't need to block.
+//
+// The recv() method has a beginning call to pop(), and if successful, it needs
+// to decrement the count. It is a crucial implementation detail that this
+// decrement does *not* happen to the shared counter. If this were the case,
+// then it would be possible for the counter to be very negative when there were
+// no receivers waiting, in which case the senders would have to determine when
+// it was actually appropriate to wake up a receiver.
+//
+// Instead, the "steal count" is kept track of separately (not atomically
+// because it's only used by ports), and then the decrement() call when
+// descheduling will lump in all of the recent steals into one large decrement.
+//
+// The implication of this is that if a sender sees a -1 count, then there's
+// guaranteed to be a waiter waiting!
+//
+// ## Native Implementation
+//
+// A major goal of these channels is to work seamlessly on and off the runtime.
+// All of the previous race conditions have been worded in terms of
+// scheduler-isms (which is obviously not available without the runtime).
+//
+// For now, native usage of channels (off the runtime) will fall back onto
+// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
+// is still entirely lock-free, the "deschedule" blocks above are surrounded by
+// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
+// condition variable.
+//
+// ## Select
+//
+// Being able to support selection over channels has greatly influenced this
+// design, and not only does selection need to work inside the runtime, but also
+// outside the runtime.
+//
+// The implementation is fairly straightforward. The goal of select() is not to
+// return some data, but only to return which channel can receive data without
+// blocking. The implementation is essentially the entire blocking procedure
+// followed by an increment as soon as its woken up. The cancellation procedure
+// involves an increment and swapping out of to_wake to acquire ownership of the
+// task to unblock.
+//
+// Sadly this current implementation requires multiple allocations, so I have
+// seen the throughput of select() be much worse than it should be. I do not
+// believe that there is anything fundamental which needs to change about these
+// channels, however, in order to support a more efficient select().
+//
+// # Conclusion
+//
+// And now that you've seen all the races that I found and attempted to fix,
+// here's the code for you to find some more!
+
+use cast;
+use clone::Clone;
+use container::Container;
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Option, Some, None};
+use rt::thread::Thread;
+use unstable::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed};
+use vec::{ImmutableVector, OwnedVector};
+
+use spsc = rt::spsc_queue;
+use mpsc = rt::mpsc_queue;
+
+use self::imp::{TaskHandle, TaskData, BlockingContext};
+pub use self::select::Select;
+
+macro_rules! test (
+    { fn $name:ident() $b:block $($a:attr)*} => (
+        mod $name {
+            #[allow(unused_imports)];
+
+            use util;
+            use super::super::*;
+            use prelude::*;
+
+            fn f() $b
+
+            $($a)* #[test] fn uv() { f() }
+            $($a)* #[test] fn native() {
+                use unstable::run_in_bare_thread;
+                run_in_bare_thread(f);
+            }
+        }
+    )
+)
+
+mod imp;
+mod select;
+
+///////////////////////////////////////////////////////////////////////////////
+// Helper type to abstract ports for channels and shared channels
+///////////////////////////////////////////////////////////////////////////////
+
+enum Consumer<T> {
+    SPSC(spsc::Consumer<T, Packet>),
+    MPSC(mpsc::Consumer<T, Packet>),
+}
+
+impl<T: Send> Consumer<T>{
+    unsafe fn packet(&self) -> *mut Packet {
+        match *self {
+            SPSC(ref c) => c.packet(),
+            MPSC(ref c) => c.packet(),
+        }
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Public structs
+///////////////////////////////////////////////////////////////////////////////
+
+/// The receiving-half of Rust's channel type. This half can only be owned by
+/// one task
+pub struct Port<T> {
+    priv queue: Consumer<T>,
+}
+
+/// An iterator over messages received on a port, this iterator will block
+/// whenever `next` is called, waiting for a new message, and `None` will be
+/// returned when the corresponding channel has hung up.
+pub struct PortIterator<'a, T> {
+    priv port: &'a Port<T>
+}
+
+/// The sending-half of Rust's channel type. This half can only be owned by one
+/// task
+pub struct Chan<T> {
+    priv queue: spsc::Producer<T, Packet>,
+}
+
+/// The sending-half of Rust's channel type. This half can be shared among many
+/// tasks by creating copies of itself through the `clone` method.
+pub struct SharedChan<T> {
+    priv queue: mpsc::Producer<T, Packet>,
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// Internal struct definitions
+///////////////////////////////////////////////////////////////////////////////
+
+struct Packet {
+    cnt: AtomicInt, // How many items are on this channel
+    steals: int,    // How many times has a port received without blocking?
+    to_wake: Option<TaskHandle>, // Task to wake up
+
+    data: TaskData,
+
+    // This lock is used to wake up native threads blocked in select. The
+    // `lock` field is not used because the thread blocking in select must
+    // block on only one mutex.
+    //selection_lock: Option<UnsafeArc<Mutex>>,
+
+    // The number of channels which are currently using this packet. This is
+    // used to reference count shared channels.
+    channels: AtomicInt,
+
+    selecting: AtomicBool,
+    selection_id: uint,
+    select_next: *mut Packet,
+    select_prev: *mut Packet,
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// All implementations -- the fun part
+///////////////////////////////////////////////////////////////////////////////
+
+static DISCONNECTED: int = int::min_value;
+static RESCHED_FREQ: int = 200;
+
+impl Packet {
+    fn new() -> Packet {
+        Packet {
+            cnt: AtomicInt::new(0),
+            steals: 0,
+            to_wake: None,
+            data: TaskData::new(),
+            channels: AtomicInt::new(1),
+
+            selecting: AtomicBool::new(false),
+            selection_id: 0,
+            select_next: 0 as *mut Packet,
+            select_prev: 0 as *mut Packet,
+        }
+    }
+
+    // Increments the channel size count, preserving the disconnected state if
+    // the other end has disconnected.
+    fn increment(&mut self) -> int {
+        match self.cnt.fetch_add(1, SeqCst) {
+            DISCONNECTED => {
+                // see the comment in 'try' for a shared channel for why this
+                // window of "not disconnected" is "ok".
+                self.cnt.store(DISCONNECTED, SeqCst);
+                DISCONNECTED
+            }
+            n => n
+        }
+    }
+
+    // Decrements the reference count of the channel, returning whether the task
+    // should block or not. This assumes that the task is ready to sleep in that
+    // the `to_wake` field has already been filled in. Once this decrement
+    // happens, the task could wake up on the other end.
+    //
+    // From an implementation perspective, this is also when our "steal count"
+    // gets merged into the "channel count". Our steal count is reset to 0 after
+    // this function completes.
+    //
+    // As with increment(), this preserves the disconnected state if the
+    // channel is disconnected.
+    fn decrement(&mut self) -> bool {
+        let steals = self.steals;
+        self.steals = 0;
+        match self.cnt.fetch_sub(1 + steals, SeqCst) {
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, SeqCst);
+                false
+            }
+            n => {
+                assert!(n >= 0);
+                n - steals <= 0
+            }
+        }
+    }
+
+    // Helper function for select, tests whether this port can receive without
+    // blocking (obviously not an atomic decision).
+    fn can_recv(&self) -> bool {
+        let cnt = self.cnt.load(SeqCst);
+        cnt == DISCONNECTED || cnt - self.steals > 0
+    }
+
+    // This function must have had at least an acquire fence before it to be
+    // properly called.
+    fn wakeup(&mut self, can_resched: bool) {
+        self.to_wake.take_unwrap().wake(can_resched);
+        self.selecting.store(false, Relaxed);
+    }
+
+    // Aborts the selection process for a port. This happens as part of select()
+    // once the task has reawoken. This will place the channel back into a
+    // consistent state which is ready to be received from again.
+    //
+    // The method of doing this is a little subtle. These channels have the
+    // invariant that if -1 is seen, then to_wake is always Some(..) and should
+    // be woken up. This aborting process at least needs to add 1 to the
+    // reference count, but that is not guaranteed to make the count positive
+    // (our steal count subtraction could mean that after the addition the
+    // channel count is still negative).
+    //
+    // In order to get around this, we force our channel count to go above 0 by
+    // adding a large number >= 1 to it. This way no sender will see -1 unless
+    // we are indeed blocking. This "extra lump" we took out of the channel
+    // becomes our steal count (which will get re-factored into the count on the
+    // next blocking recv)
+    //
+    // The return value of this method is whether there is data on this channel
+    // to receive or not.
+    fn abort_selection(&mut self, take_to_wake: bool) -> bool {
+        // make sure steals + 1 makes the count go non-negative
+        let steals = {
+            let cnt = self.cnt.load(SeqCst);
+            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+        };
+        let prev = self.cnt.fetch_add(steals + 1, SeqCst);
+
+        // If we were previously disconnected, then we know for sure that there
+        // is no task in to_wake, so just keep going
+        if prev == DISCONNECTED {
+            assert!(self.to_wake.is_none());
+            self.cnt.store(DISCONNECTED, SeqCst);
+            self.selecting.store(false, SeqCst);
+            true // there is data, that data is that we're disconnected
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+
+            // If the previous count was negative, then we just made things go
+            // positive, hence we passed the -1 boundary and we're responsible
+            // for removing the to_wake() field and trashing it.
+            if prev < 0 {
+                if take_to_wake {
+                    self.to_wake.take_unwrap().trash();
+                } else {
+                    assert!(self.to_wake.is_none());
+                }
+
+                // We woke ourselves up, we're responsible for cancelling
+                assert!(self.selecting.load(Relaxed));
+                self.selecting.store(false, Relaxed);
+            }
+            assert_eq!(self.steals, 0);
+            self.steals = steals;
+
+            // if we were previously positive, then there's surely data to
+            // receive
+            prev >= 0
+        }
+    }
+
+    // Decrement the reference count on a channel. This is called whenever a
+    // Chan is dropped and may end up waking up a receiver. It's the receiver's
+    // responsibility on the other end to figure out that we've disconnected.
+    unsafe fn drop_chan(&mut self) {
+        match self.channels.fetch_sub(1, SeqCst) {
+            1 => {
+                match self.cnt.swap(DISCONNECTED, SeqCst) {
+                    -1 => { self.wakeup(false); }
+                    DISCONNECTED => {}
+                    n => { assert!(n >= 0); }
+                }
+            }
+            n if n > 1 => {},
+            n => fail!("bad number of channels left {}", n),
+        }
+    }
+}
+
+impl Drop for Packet {
+    fn drop(&mut self) {
+        unsafe {
+            // Note that this load is not only an assert for correctness about
+            // disconnection, but also a proper fence before the read of
+            // `to_wake`, so this assert cannot be removed with also removing
+            // the `to_wake` assert.
+            assert_eq!(self.cnt.load(SeqCst), DISCONNECTED);
+            assert!(self.to_wake.is_none());
+            assert_eq!(self.channels.load(SeqCst), 0);
+        }
+    }
+}
+
+impl<T: Send> Chan<T> {
+    /// Creates a new port/channel pair. All data send on the channel returned
+    /// will become available on the port as well. See the documentation of
+    /// `Port` and `Chan` to see what's possible with them.
+    pub fn new() -> (Port<T>, Chan<T>) {
+        // arbitrary 128 size cache -- this is just a max cache size, not a
+        // maximum buffer size
+        let (c, p) = spsc::queue(128, Packet::new());
+        let c = SPSC(c);
+        (Port { queue: c }, Chan { queue: p })
+    }
+
+    /// Sends a value along this channel to be received by the corresponding
+    /// port.
+    ///
+    /// Rust channels are infinitely buffered so this method will never block.
+    /// This method may trigger a rescheduling, however, in order to wake up a
+    /// blocked receiver (if one is present). If no scheduling is desired, then
+    /// the `send_deferred` guarantees that there will be no reschedulings.
+    ///
+    /// # Failure
+    ///
+    /// This function will fail if the other end of the channel has hung up.
+    /// This means that if the corresponding port has fallen out of scope, this
+    /// function will trigger a fail message saying that a message is being sent
+    /// on a closed channel.
+    ///
+    /// Note that if this function does *not* fail, it does not mean that the
+    /// data will be successfully received. All sends are placed into a queue,
+    /// so it is possible for a send to succeed (the other end is alive), but
+    /// then the other end could immediately disconnect.
+    ///
+    /// The purpose of this functionality is to propagate failure among tasks.
+    /// If failure is not desired, then consider using the `try_send` method
+    pub fn send(&self, t: T) {
+        if !self.try_send(t) {
+            fail!("sending on a closed channel");
+        }
+    }
+
+    /// This function is equivalent in the semantics of `send`, but it
+    /// guarantees that a rescheduling will never occur when this method is
+    /// called.
+    pub fn send_deferred(&self, t: T) {
+        if !self.try_send_deferred(t) {
+            fail!("sending on a closed channel");
+        }
+    }
+
+    /// Attempts to send a value on this channel, returning whether it was
+    /// successfully sent.
+    ///
+    /// A successful send occurs when it is determined that the other end of the
+    /// channel has not hung up already. An unsuccessful send would be one where
+    /// the corresponding port has already been deallocated. Note that a return
+    /// value of `false` means that the data will never be received, but a
+    /// return value of `true` does *not* mean that the data will be received.
+    /// It is possible for the corresponding port to hang up immediately after
+    /// this function returns `true`.
+    ///
+    /// Like `send`, this method will never block. If the failure of send cannot
+    /// be tolerated, then this method should be used instead.
+    pub fn try_send(&self, t: T) -> bool { self.try(t, true) }
+
+    /// This function is equivalent in the semantics of `try_send`, but it
+    /// guarantees that a rescheduling will never occur when this method is
+    /// called.
+    pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) }
+
+    fn try(&self, t: T, can_resched: bool) -> bool {
+        unsafe {
+            let this = cast::transmute_mut(self);
+            this.queue.push(t);
+            let packet = this.queue.packet();
+            match (*packet).increment() {
+                // As described above, -1 == wakeup
+                -1 => { (*packet).wakeup(can_resched); true }
+                // Also as above, SPSC queues must be >= -2
+                -2 => true,
+                // We succeeded if we sent data
+                DISCONNECTED => this.queue.is_empty(),
+                // In order to prevent starvation of other tasks in situations
+                // where a task sends repeatedly without ever receiving, we
+                // occassionally yield instead of doing a send immediately.
+                // Only doing this if we're doing a rescheduling send, otherwise
+                // the caller is expecting not to context switch.
+                //
+                // Note that we don't unconditionally attempt to yield because
+                // the TLS overhead can be a bit much.
+                n => {
+                    assert!(n >= 0);
+                    if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
+                        imp::maybe_yield();
+                    }
+                    true
+                }
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Chan<T> {
+    fn drop(&mut self) {
+        unsafe { (*self.queue.packet()).drop_chan(); }
+    }
+}
+
+impl<T: Send> SharedChan<T> {
+    /// Creates a new shared channel and port pair. The purpose of a shared
+    /// channel is to be cloneable such that many tasks can send data at the
+    /// same time. All data sent on any channel will become available on the
+    /// provided port as well.
+    pub fn new() -> (Port<T>, SharedChan<T>) {
+        let (c, p) = mpsc::queue(Packet::new());
+        let c = MPSC(c);
+        (Port { queue: c }, SharedChan { queue: p })
+    }
+
+    /// Equivalent method to `send` on the `Chan` type (using the same
+    /// semantics)
+    pub fn send(&self, t: T) {
+        if !self.try_send(t) {
+            fail!("sending on a closed channel");
+        }
+    }
+
+    /// This function is equivalent in the semantics of `send`, but it
+    /// guarantees that a rescheduling will never occur when this method is
+    /// called.
+    pub fn send_deferred(&self, t: T) {
+        if !self.try_send_deferred(t) {
+            fail!("sending on a closed channel");
+        }
+    }
+
+    /// Equivalent method to `try_send` on the `Chan` type (using the same
+    /// semantics)
+    pub fn try_send(&self, t: T) -> bool { self.try(t, true) }
+
+    /// This function is equivalent in the semantics of `try_send`, but it
+    /// guarantees that a rescheduling will never occur when this method is
+    /// called.
+    pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) }
+
+    fn try(&self, t: T, can_resched: bool) -> bool {
+        unsafe {
+            // Note that the multiple sender case is a little tricker
+            // semantically than the single sender case. The logic for
+            // incrementing is "add and if disconnected store disconnected".
+            // This could end up leading some senders to believe that there
+            // wasn't a disconnect if in fact there was a disconnect. This means
+            // that while one thread is attempting to re-store the disconnected
+            // states, other threads could walk through merrily incrementing
+            // this very-negative disconnected count. To prevent senders from
+            // spuriously attempting to send when the channels is actually
+            // disconnected, the count has a ranged check here.
+            //
+            // This is also done for another reason. Remember that the return
+            // value of this function is:
+            //
+            //  `true` == the data *may* be received, this essentially has no
+            //            meaning
+            //  `false` == the data will *never* be received, this has a lot of
+            //             meaning
+            //
+            // In the SPSC case, we have a check of 'queue.is_empty()' to see
+            // whether the data was actually received, but this same condition
+            // means nothing in a multi-producer context. As a result, this
+            // preflight check serves as the definitive "this will never be
+            // received". Once we get beyond this check, we have permanently
+            // entered the realm of "this may be received"
+            let packet = self.queue.packet();
+            if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 {
+                return false
+            }
+
+            let this = cast::transmute_mut(self);
+            this.queue.push(t);
+
+            match (*packet).increment() {
+                DISCONNECTED => {} // oh well, we tried
+                -1 => { (*packet).wakeup(can_resched); }
+                n => {
+                    if can_resched && n > 0 && n % RESCHED_FREQ == 0 {
+                        imp::maybe_yield();
+                    }
+                }
+            }
+            true
+        }
+    }
+}
+
+impl<T: Send> Clone for SharedChan<T> {
+    fn clone(&self) -> SharedChan<T> {
+        unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); }
+        SharedChan { queue: self.queue.clone() }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for SharedChan<T> {
+    fn drop(&mut self) {
+        unsafe { (*self.queue.packet()).drop_chan(); }
+    }
+}
+
+impl<T: Send> Port<T> {
+    /// Blocks waiting for a value on this port
+    ///
+    /// This function will block if necessary to wait for a corresponding send
+    /// on the channel from its paired `Chan` structure. This port will be woken
+    /// up when data is ready, and the data will be returned.
+    ///
+    /// # Failure
+    ///
+    /// Similar to channels, this method will trigger a task failure if the
+    /// other end of the channel has hung up (been deallocated). The purpose of
+    /// this is to propagate failure among tasks.
+    ///
+    /// If failure is not desired, then there are two options:
+    ///
+    /// * If blocking is still desired, the `recv_opt` method will return `None`
+    ///   when the other end hangs up
+    ///
+    /// * If blocking is not desired, then the `try_recv` method will attempt to
+    ///   peek at a value on this port.
+    pub fn recv(&self) -> T {
+        match self.recv_opt() {
+            Some(t) => t,
+            None => fail!("receiving on a closed channel"),
+        }
+    }
+
+    /// Attempts to return a pending value on this port without blocking
+    ///
+    /// This method will never block the caller in order to wait for data to
+    /// become available. Instead, this will always return immediately with a
+    /// possible option of pending data on the channel.
+    ///
+    /// This is useful for a flavor of "optimistic check" before deciding to
+    /// block on a port.
+    ///
+    /// This function cannot fail.
+    pub fn try_recv(&self) -> Option<T> {
+        self.try_recv_inc(true)
+    }
+
+    fn try_recv_inc(&self, increment: bool) -> Option<T> {
+        // This is a "best effort" situation, so if a queue is inconsistent just
+        // don't worry about it.
+        let this = unsafe { cast::transmute_mut(self) };
+        let ret = match this.queue {
+            SPSC(ref mut queue) => queue.pop(),
+            MPSC(ref mut queue) => match queue.pop() {
+                mpsc::Data(t) => Some(t),
+                mpsc::Empty => None,
+
+                // This is a bit of an interesting case. The channel is
+                // reported as having data available, but our pop() has
+                // failed due to the queue being in an inconsistent state.
+                // This means that there is some pusher somewhere which has
+                // yet to complete, but we are guaranteed that a pop will
+                // eventually succeed. In this case, we spin in a yield loop
+                // because the remote sender should finish their enqueue
+                // operation "very quickly".
+                //
+                // Note that this yield loop does *not* attempt to do a green
+                // yield (regardless of the context), but *always* performs an
+                // OS-thread yield. The reasoning for this is that the pusher in
+                // question which is causing the inconsistent state is
+                // guaranteed to *not* be a blocked task (green tasks can't get
+                // pre-empted), so it must be on a different OS thread. Also,
+                // `try_recv` is normally a "guaranteed no rescheduling" context
+                // in a green-thread situation. By yielding control of the
+                // thread, we will hopefully allow time for the remote task on
+                // the other OS thread to make progress.
+                //
+                // Avoiding this yield loop would require a different queue
+                // abstraction which provides the guarantee that after M
+                // pushes have succeeded, at least M pops will succeed. The
+                // current queues guarantee that if there are N active
+                // pushes, you can pop N times once all N have finished.
+                mpsc::Inconsistent => {
+                    let data;
+                    loop {
+                        Thread::yield_now();
+                        match queue.pop() {
+                            mpsc::Data(t) => { data = t; break }
+                            mpsc::Empty => fail!("inconsistent => empty"),
+                            mpsc::Inconsistent => {}
+                        }
+                    }
+                    Some(data)
+                }
+            }
+        };
+        if increment && ret.is_some() {
+            unsafe { (*this.queue.packet()).steals += 1; }
+        }
+        return ret;
+    }
+
+    /// Attempt to wait for a value on this port, but does not fail if the
+    /// corresponding channel has hung up.
+    ///
+    /// This implementation of iterators for ports will always block if there is
+    /// not data available on the port, but it will not fail in the case that
+    /// the channel has been deallocated.
+    ///
+    /// In other words, this function has the same semantics as the `recv`
+    /// method except for the failure aspect.
+    ///
+    /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
+    /// the value found on the port is returned.
+    pub fn recv_opt(&self) -> Option<T> {
+        // optimistic preflight check (scheduling is expensive)
+        match self.try_recv() { None => {}, data => return data }
+
+        let packet;
+        let this;
+        unsafe {
+            this = cast::transmute_mut(self);
+            packet = this.queue.packet();
+            BlockingContext::one(&mut (*packet).data, |ctx, data| {
+                ctx.block(data, &mut (*packet).to_wake, || (*packet).decrement())
+            });
+        }
+
+        let data = self.try_recv_inc(false);
+        if data.is_none() &&
+           unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
+            fail!("bug: woke up too soon");
+        }
+        return data;
+    }
+
+    /// Returns an iterator which will block waiting for messages, but never
+    /// `fail!`. It will return `None` when the channel has hung up.
+    pub fn iter<'a>(&'a self) -> PortIterator<'a, T> {
+        PortIterator { port: self }
+    }
+}
+
+impl<'a, T: Send> Iterator<T> for PortIterator<'a, T> {
+    fn next(&mut self) -> Option<T> { self.port.recv_opt() }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Port<T> {
+    fn drop(&mut self) {
+        // All we need to do is store that we're disconnected. If the channel
+        // half has already disconnected, then we'll just deallocate everything
+        // when the shared packet is deallocated.
+        unsafe {
+            (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst);
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+
+    use task;
+    use rt::thread::Thread;
+    use super::*;
+    use rt::test::*;
+
+    test!(fn smoke() {
+        let (p, c) = Chan::new();
+        c.send(1);
+        assert_eq!(p.recv(), 1);
+    })
+
+    test!(fn drop_full() {
+        let (_p, c) = Chan::new();
+        c.send(~1);
+    })
+
+    test!(fn drop_full_shared() {
+        let (_p, c) = SharedChan::new();
+        c.send(~1);
+    })
+
+    test!(fn smoke_shared() {
+        let (p, c) = SharedChan::new();
+        c.send(1);
+        assert_eq!(p.recv(), 1);
+        let c = c.clone();
+        c.send(1);
+        assert_eq!(p.recv(), 1);
+    })
+
+    #[test]
+    fn smoke_threads() {
+        let (p, c) = Chan::new();
+        do task::spawn_sched(task::SingleThreaded) {
+            c.send(1);
+        }
+        assert_eq!(p.recv(), 1);
+    }
+
+    #[test] #[should_fail]
+    fn smoke_port_gone() {
+        let (p, c) = Chan::new();
+        drop(p);
+        c.send(1);
+    }
+
+    #[test] #[should_fail]
+    fn smoke_shared_port_gone() {
+        let (p, c) = SharedChan::new();
+        drop(p);
+        c.send(1);
+    }
+
+    #[test] #[should_fail]
+    fn smoke_shared_port_gone2() {
+        let (p, c) = SharedChan::new();
+        drop(p);
+        let c2 = c.clone();
+        drop(c);
+        c2.send(1);
+    }
+
+    #[test] #[should_fail]
+    fn port_gone_concurrent() {
+        let (p, c) = Chan::new();
+        do task::spawn_sched(task::SingleThreaded) {
+            p.recv();
+        }
+        loop { c.send(1) }
+    }
+
+    #[test] #[should_fail]
+    fn port_gone_concurrent_shared() {
+        let (p, c) = SharedChan::new();
+        let c1 = c.clone();
+        do task::spawn_sched(task::SingleThreaded) {
+            p.recv();
+        }
+        loop {
+            c.send(1);
+            c1.send(1);
+        }
+    }
+
+    #[test] #[should_fail]
+    fn smoke_chan_gone() {
+        let (p, c) = Chan::<int>::new();
+        drop(c);
+        p.recv();
+    }
+
+    #[test] #[should_fail]
+    fn smoke_chan_gone_shared() {
+        let (p, c) = SharedChan::<()>::new();
+        let c2 = c.clone();
+        drop(c);
+        drop(c2);
+        p.recv();
+    }
+
+    #[test] #[should_fail]
+    fn chan_gone_concurrent() {
+        let (p, c) = Chan::new();
+        do task::spawn_sched(task::SingleThreaded) {
+            c.send(1);
+            c.send(1);
+        }
+        loop { p.recv(); }
+    }
+
+    #[test]
+    fn stress() {
+        let (p, c) = Chan::new();
+        do task::spawn_sched(task::SingleThreaded) {
+            for _ in range(0, 10000) { c.send(1); }
+        }
+        for _ in range(0, 10000) {
+            assert_eq!(p.recv(), 1);
+        }
+    }
+
+    #[test]
+    fn stress_shared() {
+        static AMT: uint = 10000;
+        static NTHREADS: uint = 8;
+        let (p, c) = SharedChan::<int>::new();
+        let (p1, c1) = Chan::new();
+
+        do spawn {
+            for _ in range(0, AMT * NTHREADS) {
+                assert_eq!(p.recv(), 1);
+            }
+            assert_eq!(p.try_recv(), None);
+            c1.send(());
+        }
+
+        for _ in range(0, NTHREADS) {
+            let c = c.clone();
+            do task::spawn_sched(task::SingleThreaded) {
+                for _ in range(0, AMT) { c.send(1); }
+            }
+        }
+        p1.recv();
+
+    }
+
+    #[test]
+    fn send_from_outside_runtime() {
+        let (p, c) = Chan::<int>::new();
+        let (p1, c1) = Chan::new();
+        do spawn {
+            c1.send(());
+            for _ in range(0, 40) {
+                assert_eq!(p.recv(), 1);
+            }
+        }
+        p1.recv();
+        let t = do Thread::start {
+            for _ in range(0, 40) {
+                c.send(1);
+            }
+        };
+        t.join();
+    }
+
+    #[test]
+    fn recv_from_outside_runtime() {
+        let (p, c) = Chan::<int>::new();
+        let t = do Thread::start {
+            for _ in range(0, 40) {
+                assert_eq!(p.recv(), 1);
+            }
+        };
+        for _ in range(0, 40) {
+            c.send(1);
+        }
+        t.join();
+    }
+
+    #[test]
+    fn no_runtime() {
+        let (p1, c1) = Chan::<int>::new();
+        let (p2, c2) = Chan::<int>::new();
+        let t1 = do Thread::start {
+            assert_eq!(p1.recv(), 1);
+            c2.send(2);
+        };
+        let t2 = do Thread::start {
+            c1.send(1);
+            assert_eq!(p2.recv(), 2);
+        };
+        t1.join();
+        t2.join();
+    }
+
+    #[test]
+    fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (port, _chan) = Chan::<int>::new();
+            { let _p = port; }
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (_port, chan) = Chan::<int>::new();
+            { let _c = chan; }
+        }
+    }
+
+    #[test] #[should_fail]
+    fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        let (port, chan) = Chan::<~int>::new();
+        { let _p = port; }
+        chan.send(~0);
+    }
+
+    #[test]
+    fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will fail
+        do run_in_newsched_task {
+            let res = do spawntask_try {
+                let (port, chan) = Chan::<~int>::new();
+                { let _c = chan; }
+                port.recv();
+            };
+            // What is our res?
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_send_then_recv() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<~int>::new();
+            chan.send(~10);
+            assert!(port.recv() == ~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_open() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            assert!(chan.try_send(10));
+            assert!(port.recv() == 10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            { let _p = port; }
+            assert!(!chan.try_send(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_open() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            chan.send(10);
+            assert!(port.try_recv() == Some(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            { let _c = chan; }
+            assert!(port.recv_opt() == None);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_data() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            assert!(port.try_recv().is_none());
+            chan.send(10);
+            assert!(port.try_recv().is_some());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_close() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<int>::new();
+            { let _c = chan; }
+            assert!(port.try_recv().is_none());
+            assert!(port.try_recv().is_none());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_open() {
+        do run_in_newsched_task {
+            let (port, _) = Chan::<int>::new();
+            assert!(port.try_recv().is_none());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_send() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<~int>::new();
+            do spawntask {
+                assert!(port.recv() == ~10);
+            }
+
+            chan.send(~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_close() {
+        do run_in_newsched_task {
+            let (port, chan) = Chan::<~int>::new();
+            do spawntask_later {
+                let _chan = chan;
+            }
+            let res = do spawntask_try {
+                assert!(port.recv() == ~10);
+            };
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_close_stress() {
+        stress_factor().times(|| {
+            do run_in_newsched_task {
+                let (port, chan) = Chan::<int>::new();
+                let thread = do spawntask_thread {
+                    let _p = port;
+                };
+                let _chan = chan;
+                thread.join();
+            }
+        })
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_close_stress() {
+        stress_factor().times(|| {
+            let (port, chan) = Chan::<int>::new();
+            do spawn {
+                let _p = port;
+            }
+            do task::try {
+                chan.send(1);
+            };
+        })
+    }
+
+    #[test]
+    fn oneshot_multi_thread_recv_close_stress() {
+        stress_factor().times(|| {
+            let (port, chan) = Chan::<int>::new();
+            do spawn {
+                let port = port;
+                let res = do task::try {
+                    port.recv();
+                };
+                assert!(res.is_err());
+            };
+            do spawn {
+                let chan = chan;
+                do spawn {
+                    let _chan = chan;
+                }
+            };
+        })
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_recv_stress() {
+        stress_factor().times(|| {
+            let (port, chan) = Chan::<~int>::new();
+            do spawn {
+                chan.send(~10);
+            }
+            do spawn {
+                assert!(port.recv() == ~10);
+            }
+        })
+    }
+
+    #[test]
+    fn stream_send_recv_stress() {
+        stress_factor().times(|| {
+            let (port, chan) = Chan::<~int>::new();
+
+            send(chan, 0);
+            recv(port, 0);
+
+            fn send(chan: Chan<~int>, i: int) {
+                if i == 10 { return }
+
+                do spawntask_random {
+                    chan.send(~i);
+                    send(chan, i + 1);
+                }
+            }
+
+            fn recv(port: Port<~int>, i: int) {
+                if i == 10 { return }
+
+                do spawntask_random {
+                    assert!(port.recv() == ~i);
+                    recv(port, i + 1);
+                };
+            }
+        })
+    }
+
+    #[test]
+    fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        do run_in_newsched_task {
+            let (port, chan) = Chan::new();
+            10000.times(|| { chan.send(()) });
+            10000.times(|| { port.recv() });
+        }
+    }
+
+    #[test]
+    fn shared_chan_stress() {
+        do run_in_mt_newsched_task {
+            let (port, chan) = SharedChan::new();
+            let total = stress_factor() + 100;
+            total.times(|| {
+                let chan_clone = chan.clone();
+                do spawntask_random {
+                    chan_clone.send(());
+                }
+            });
+
+            total.times(|| {
+                port.recv();
+            });
+        }
+    }
+
+    #[test]
+    fn test_nested_recv_iter() {
+        let (port, chan) = Chan::<int>::new();
+        let (total_port, total_chan) = Chan::<int>::new();
+
+        do spawn {
+            let mut acc = 0;
+            for x in port.iter() {
+                acc += x;
+            }
+            total_chan.send(acc);
+        }
+
+        chan.send(3);
+        chan.send(1);
+        chan.send(2);
+        drop(chan);
+        assert_eq!(total_port.recv(), 6);
+    }
+
+    #[test]
+    fn test_recv_iter_break() {
+        let (port, chan) = Chan::<int>::new();
+        let (count_port, count_chan) = Chan::<int>::new();
+
+        do spawn {
+            let mut count = 0;
+            for x in port.iter() {
+                if count >= 3 {
+                    break;
+                } else {
+                    count += x;
+                }
+            }
+            count_chan.send(count);
+        }
+
+        chan.send(2);
+        chan.send(2);
+        chan.send(2);
+        chan.try_send(2);
+        drop(chan);
+        assert_eq!(count_port.recv(), 4);
+    }
+}
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
new file mode 100644
index 00000000000..81a77000bad
--- /dev/null
+++ b/src/libstd/comm/select.rs
@@ -0,0 +1,498 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Selection over an array of ports
+//!
+//! This module contains the implementation machinery necessary for selecting
+//! over a number of ports. One large goal of this module is to provide an
+//! efficient interface to selecting over any port of any type.
+//!
+//! This is achieved through an architecture of a "port set" in which ports are
+//! added to a set and then the entire set is waited on at once. The set can be
+//! waited on multiple times to prevent re-adding each port to the set.
+//!
+//! Usage of this module is currently encouraged to go through the use of the
+//! `select!` macro. This macro allows naturally binding of variables to the
+//! received values of ports in a much more natural syntax then usage of the
+//! `Select` structure directly.
+//!
+//! # Example
+//!
+//! ```rust
+//! let (mut p1, c1) = Chan::new();
+//! let (mut p2, c2) = Chan::new();
+//!
+//! c1.send(1);
+//! c2.send(2);
+//!
+//! select! (
+//!     val = p1.recv() => {
+//!         assert_eq!(val, 1);
+//!     }
+//!     val = p2.recv() => {
+//!         assert_eq!(val, 2);
+//!     }
+//! )
+
+use cast;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use ptr::RawPtr;
+use super::imp::BlockingContext;
+use super::{Packet, Port, imp};
+use uint;
+use unstable::atomics::{Relaxed, SeqCst};
+
+macro_rules! select {
+    (
+        $name1:pat = $port1:ident.$meth1:ident() => $code1:expr,
+        $($name:pat = $port:ident.$meth:ident() => $code:expr),*
+    ) => ({
+        use std::comm::Select;
+        let sel = Select::new();
+        let mut $port1 = sel.add(&mut $port1);
+        $( let mut $port = sel.add(&mut $port); )*
+        let ret = sel.wait();
+        if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 }
+        $( else if ret == $port.id { let $name = $port.$meth(); $code } )*
+        else { unreachable!() }
+    })
+}
+
+/// The "port set" of the select interface. This structure is used to manage a
+/// set of ports which are being selected over.
+#[no_freeze]
+#[no_send]
+pub struct Select {
+    priv head: *mut Packet,
+    priv tail: *mut Packet,
+    priv next_id: uint,
+}
+
+/// A handle to a port which is currently a member of a `Select` set of ports.
+/// This handle is used to keep the port in the set as well as interact with the
+/// underlying port.
+pub struct Handle<'self, T> {
+    id: uint,
+    priv selector: &'self Select,
+    priv port: &'self mut Port<T>,
+}
+
+struct PacketIterator { priv cur: *mut Packet }
+
+impl Select {
+    /// Creates a new selection structure. This set is initially empty and
+    /// `wait` will fail!() if called.
+    ///
+    /// Usage of this struct directly can sometimes be burdensome, and usage is
+    /// rather much easier through the `select!` macro.
+    pub fn new() -> Select {
+        Select {
+            head: 0 as *mut Packet,
+            tail: 0 as *mut Packet,
+            next_id: 1,
+        }
+    }
+
+    /// Adds a new port to this set, returning a handle which is then used to
+    /// receive on the port.
+    ///
+    /// Note that this port parameter takes `&mut Port` instead of `&Port`. None
+    /// of the methods of receiving on a port require `&mut self`, but `&mut` is
+    /// used here in order to have the compiler guarantee that the same port is
+    /// not added to this set more than once.
+    ///
+    /// When the returned handle falls out of scope, the port will be removed
+    /// from this set. While the handle is in this set, usage of the port can be
+    /// done through the `Handle`'s receiving methods.
+    pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> {
+        let this = unsafe { cast::transmute_mut(self) };
+        let id = this.next_id;
+        this.next_id += 1;
+        unsafe {
+            let packet = port.queue.packet();
+            assert!(!(*packet).selecting.load(Relaxed));
+            assert_eq!((*packet).selection_id, 0);
+            (*packet).selection_id = id;
+            if this.head.is_null() {
+                this.head = packet;
+                this.tail = packet;
+            } else {
+                (*packet).select_prev = this.tail;
+                assert!((*packet).select_next.is_null());
+                (*this.tail).select_next = packet;
+                this.tail = packet;
+            }
+        }
+        Handle { id: id, selector: this, port: port }
+    }
+
+    /// Waits for an event on this port set. The returned valus is *not* and
+    /// index, but rather an id. This id can be queried against any active
+    /// `Handle` structures (each one has a public `id` field). The handle with
+    /// the matching `id` will have some sort of event available on it. The
+    /// event could either be that data is available or the corresponding
+    /// channel has been closed.
+    pub fn wait(&self) -> uint {
+        // Note that this is currently an inefficient implementation. We in
+        // theory have knowledge about all ports in the set ahead of time, so
+        // this method shouldn't really have to iterate over all of them yet
+        // again. The idea with this "port set" interface is to get the
+        // interface right this time around, and later this implementation can
+        // be optimized.
+        //
+        // This implementation can be summarized by:
+        //
+        //      fn select(ports) {
+        //          if any port ready { return ready index }
+        //          deschedule {
+        //              block on all ports
+        //          }
+        //          unblock on all ports
+        //          return ready index
+        //      }
+        //
+        // Most notably, the iterations over all of the ports shouldn't be
+        // necessary.
+        unsafe {
+            let mut amt = 0;
+            for p in self.iter() {
+                assert!(!(*p).selecting.load(Relaxed));
+                amt += 1;
+                if (*p).can_recv() {
+                    return (*p).selection_id;
+                }
+            }
+            assert!(amt > 0);
+
+            let mut ready_index = amt;
+            let mut ready_id = uint::max_value;
+            let mut iter = self.iter().enumerate();
+
+            // Acquire a number of blocking contexts, and block on each one
+            // sequentially until one fails. If one fails, then abort
+            // immediately so we can go unblock on all the other ports.
+            BlockingContext::many(amt, |ctx| {
+                let (i, packet) = iter.next().unwrap();
+                (*packet).selecting.store(true, SeqCst);
+                if !ctx.block(&mut (*packet).data,
+                              &mut (*packet).to_wake,
+                              || (*packet).decrement()) {
+                    (*packet).abort_selection(false);
+                    (*packet).selecting.store(false, SeqCst);
+                    ready_index = i;
+                    ready_id = (*packet).selection_id;
+                    false
+                } else {
+                    true
+                }
+            });
+
+            // Abort the selection process on each port. If the abort process
+            // returns `true`, then that means that the port is ready to receive
+            // some data. Note that this also means that the port may have yet
+            // to have fully read the `to_wake` field and woken us up (although
+            // the wakeup is guaranteed to fail).
+            //
+            // This situation happens in the window of where a sender invokes
+            // increment(), sees -1, and then decides to wake up the task. After
+            // all this is done, the sending thread will set `selecting` to
+            // `false`. Until this is done, we cannot return. If we were to
+            // return, then a sender could wake up a port which has gone back to
+            // sleep after this call to `select`.
+            //
+            // Note that it is a "fairly small window" in which an increment()
+            // views that it should wake a thread up until the `selecting` bit
+            // is set to false. For now, the implementation currently just spins
+            // in a yield loop. This is very distasteful, but this
+            // implementation is already nowhere near what it should ideally be.
+            // A rewrite should focus on avoiding a yield loop, and for now this
+            // implementation is tying us over to a more efficient "don't
+            // iterate over everything every time" implementation.
+            for packet in self.iter().take(ready_index) {
+                if (*packet).abort_selection(true) {
+                    ready_id = (*packet).selection_id;
+                    while (*packet).selecting.load(Relaxed) {
+                        imp::yield_now();
+                    }
+                }
+            }
+
+            // Sanity check for now to make sure that everyone is turned off.
+            for packet in self.iter() {
+                assert!(!(*packet).selecting.load(Relaxed));
+            }
+
+            return ready_id;
+        }
+    }
+
+    unsafe fn remove(&self, packet: *mut Packet) {
+        let this = cast::transmute_mut(self);
+        assert!(!(*packet).selecting.load(Relaxed));
+        if (*packet).select_prev.is_null() {
+            assert_eq!(packet, this.head);
+            this.head = (*packet).select_next;
+        } else {
+            (*(*packet).select_prev).select_next = (*packet).select_next;
+        }
+        if (*packet).select_next.is_null() {
+            assert_eq!(packet, this.tail);
+            this.tail = (*packet).select_prev;
+        } else {
+            (*(*packet).select_next).select_prev = (*packet).select_prev;
+        }
+        (*packet).select_next = 0 as *mut Packet;
+        (*packet).select_prev = 0 as *mut Packet;
+        (*packet).selection_id = 0;
+    }
+
+    fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } }
+}
+
+impl<'self, T: Send> Handle<'self, T> {
+    /// Receive a value on the underlying port. Has the same semantics as
+    /// `Port.recv`
+    pub fn recv(&mut self) -> T { self.port.recv() }
+    /// Block to receive a value on the underlying port, returning `Some` on
+    /// success or `None` if the channel disconnects. This function has the same
+    /// semantics as `Port.recv_opt`
+    pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
+    /// Immediately attempt to receive a value on a port, this function will
+    /// never block. Has the same semantics as `Port.try_recv`.
+    pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
+}
+
+#[unsafe_destructor]
+impl Drop for Select {
+    fn drop(&mut self) {
+        assert!(self.head.is_null());
+        assert!(self.tail.is_null());
+    }
+}
+
+#[unsafe_destructor]
+impl<'self, T: Send> Drop for Handle<'self, T> {
+    fn drop(&mut self) {
+        unsafe { self.selector.remove(self.port.queue.packet()) }
+    }
+}
+
+impl Iterator<*mut Packet> for PacketIterator {
+    fn next(&mut self) -> Option<*mut Packet> {
+        if self.cur.is_null() {
+            None
+        } else {
+            let ret = Some(self.cur);
+            unsafe { self.cur = (*self.cur).select_next; }
+            ret
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::super::*;
+    use prelude::*;
+
+    test!(fn smoke() {
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        c1.send(1);
+        select! (
+            foo = p1.recv() => { assert_eq!(foo, 1); },
+            _bar = p2.recv() => { fail!() }
+        )
+        c2.send(2);
+        select! (
+            _foo = p1.recv() => { fail!() },
+            bar = p2.recv() => { assert_eq!(bar, 2) }
+        )
+        drop(c1);
+        select! (
+            foo = p1.recv_opt() => { assert_eq!(foo, None); },
+            _bar = p2.recv() => { fail!() }
+        )
+        drop(c2);
+        select! (
+            bar = p2.recv_opt() => { assert_eq!(bar, None); },
+        )
+    })
+
+    test!(fn smoke2() {
+        let (mut p1, _c1) = Chan::<int>::new();
+        let (mut p2, _c2) = Chan::<int>::new();
+        let (mut p3, _c3) = Chan::<int>::new();
+        let (mut p4, _c4) = Chan::<int>::new();
+        let (mut p5, c5) = Chan::<int>::new();
+        c5.send(4);
+        select! (
+            _foo = p1.recv() => { fail!("1") },
+            _foo = p2.recv() => { fail!("2") },
+            _foo = p3.recv() => { fail!("3") },
+            _foo = p4.recv() => { fail!("4") },
+            foo = p5.recv() => { assert_eq!(foo, 4); }
+        )
+    })
+
+    test!(fn closed() {
+        let (mut p1, _c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        drop(c2);
+
+        select! (
+            _a1 = p1.recv_opt() => { fail!() },
+            a2 = p2.recv_opt() => { assert_eq!(a2, None); }
+        )
+    })
+
+    #[test]
+    fn unblocks() {
+        use std::io::timer;
+
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, _c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<int>::new();
+
+        do spawn {
+            timer::sleep(3);
+            c1.send(1);
+            p3.recv();
+            timer::sleep(3);
+        }
+
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            _b = p2.recv() => { fail!() }
+        )
+        c3.send(1);
+        select! (
+            a = p1.recv_opt() => { assert_eq!(a, None); },
+            _b = p2.recv() => { fail!() }
+        )
+    }
+
+    #[test]
+    fn both_ready() {
+        use std::io::timer;
+
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        do spawn {
+            timer::sleep(3);
+            c1.send(1);
+            c2.send(2);
+            p3.recv();
+        }
+
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            a = p2.recv() => { assert_eq!(a, 2); }
+        )
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            a = p2.recv() => { assert_eq!(a, 2); }
+        )
+        c3.send(());
+    }
+
+    #[test]
+    fn stress() {
+        static AMT: int = 10000;
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        do spawn {
+            for i in range(0, AMT) {
+                if i % 2 == 0 {
+                    c1.send(i);
+                } else {
+                    c2.send(i);
+                }
+                p3.recv();
+            }
+        }
+
+        for i in range(0, AMT) {
+            select! (
+                i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); },
+                i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); }
+            )
+            c3.send(());
+        }
+    }
+
+    #[test]
+    fn stress_native() {
+        use std::rt::thread::Thread;
+        use std::unstable::run_in_bare_thread;
+        static AMT: int = 10000;
+
+        do run_in_bare_thread {
+            let (mut p1, c1) = Chan::<int>::new();
+            let (mut p2, c2) = Chan::<int>::new();
+            let (p3, c3) = Chan::<()>::new();
+
+            let t = do Thread::start {
+                for i in range(0, AMT) {
+                    if i % 2 == 0 {
+                        c1.send(i);
+                    } else {
+                        c2.send(i);
+                    }
+                    p3.recv();
+                }
+            };
+
+            for i in range(0, AMT) {
+                select! (
+                    i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); },
+                    i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); }
+                )
+                c3.send(());
+            }
+            t.join();
+        }
+    }
+
+    #[test]
+    fn native_both_ready() {
+        use std::rt::thread::Thread;
+        use std::unstable::run_in_bare_thread;
+
+        do run_in_bare_thread {
+            let (mut p1, c1) = Chan::<int>::new();
+            let (mut p2, c2) = Chan::<int>::new();
+            let (p3, c3) = Chan::<()>::new();
+
+            let t = do Thread::start {
+                c1.send(1);
+                c2.send(2);
+                p3.recv();
+            };
+
+            select! (
+                a = p1.recv() => { assert_eq!(a, 1); },
+                b = p2.recv() => { assert_eq!(b, 2); }
+            )
+            select! (
+                a = p1.recv() => { assert_eq!(a, 1); },
+                b = p2.recv() => { assert_eq!(b, 2); }
+            )
+            c3.send(());
+            t.join();
+        }
+    }
+}
diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs
index 3ac91a67d7e..6948eb60b1f 100644
--- a/src/libstd/lib.rs
+++ b/src/libstd/lib.rs
@@ -203,15 +203,16 @@ pub mod rt;
 mod std {
     pub use clone;
     pub use cmp;
+    pub use comm;
     pub use condition;
     pub use fmt;
+    pub use io;
     pub use kinds;
     pub use local_data;
     pub use logging;
     pub use logging;
     pub use option;
     pub use os;
-    pub use io;
     pub use rt;
     pub use str;
     pub use to_bytes;
diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs
index 7f607fcf12a..1e04e5eb78d 100644
--- a/src/libstd/rt/mpmc_bounded_queue.rs
+++ b/src/libstd/rt/mpmc_bounded_queue.rs
@@ -1,5 +1,4 @@
-/* Multi-producer/multi-consumer bounded queue
- * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are met:
  *
@@ -163,7 +162,6 @@ mod tests {
     use prelude::*;
     use option::*;
     use task;
-    use comm;
     use super::Queue;
 
     #[test]
@@ -174,10 +172,9 @@ mod tests {
         assert_eq!(None, q.pop());
 
         for _ in range(0, nthreads) {
-            let (port, chan)  = comm::stream();
-            chan.send(q.clone());
+            let q = q.clone();
             do task::spawn_sched(task::SingleThreaded) {
-                let mut q = port.recv();
+                let mut q = q;
                 for i in range(0, nmsgs) {
                     assert!(q.push(i));
                 }
@@ -186,12 +183,11 @@ mod tests {
 
         let mut completion_ports = ~[];
         for _ in range(0, nthreads) {
-            let (completion_port, completion_chan) = comm::stream();
+            let (completion_port, completion_chan) = Chan::new();
             completion_ports.push(completion_port);
-            let (port, chan)  = comm::stream();
-            chan.send(q.clone());
+            let q = q.clone();
             do task::spawn_sched(task::SingleThreaded) {
-                let mut q = port.recv();
+                let mut q = q;
                 let mut i = 0u;
                 loop {
                     match q.pop() {
@@ -206,7 +202,7 @@ mod tests {
             }
         }
 
-        for completion_port in completion_ports.iter() {
+        for completion_port in completion_ports.mut_iter() {
             assert_eq!(nmsgs, completion_port.recv());
         }
     }
diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs
index 4f39a1df4fa..d575028af70 100644
--- a/src/libstd/rt/mpsc_queue.rs
+++ b/src/libstd/rt/mpsc_queue.rs
@@ -1,5 +1,4 @@
-/* Multi-producer/single-consumer queue
- * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are met:
  *
@@ -27,163 +26,177 @@
  */
 
 //! A mostly lock-free multi-producer, single consumer queue.
-// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
 
-use unstable::sync::UnsafeArc;
-use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire};
-use ptr::{mut_null, to_mut_unsafe_ptr};
+// http://www.1024cores.net/home/lock-free-algorithms
+//                         /queues/non-intrusive-mpsc-node-based-queue
+
 use cast;
-use option::*;
 use clone::Clone;
 use kinds::Send;
+use ops::Drop;
+use option::{Option, None, Some};
+use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
+use unstable::sync::UnsafeArc;
+
+pub enum PopResult<T> {
+    /// Some data has been popped
+    Data(T),
+    /// The queue is empty
+    Empty,
+    /// The queue is in an inconsistent state. Popping data should succeed, but
+    /// some pushers have yet to make enough progress in order allow a pop to
+    /// succeed. It is recommended that a pop() occur "in the near future" in
+    /// order to see if the sender has made progress or not
+    Inconsistent,
+}
 
 struct Node<T> {
     next: AtomicPtr<Node<T>>,
     value: Option<T>,
 }
 
-impl<T> Node<T> {
-    fn empty() -> Node<T> {
-        Node{next: AtomicPtr::new(mut_null()), value: None}
-    }
-
-    fn with_value(value: T) -> Node<T> {
-        Node{next: AtomicPtr::new(mut_null()), value: Some(value)}
-    }
-}
-
-struct State<T> {
-    pad0: [u8, ..64],
+struct State<T, P> {
     head: AtomicPtr<Node<T>>,
-    pad1: [u8, ..64],
-    stub: Node<T>,
-    pad2: [u8, ..64],
     tail: *mut Node<T>,
-    pad3: [u8, ..64],
+    packet: P,
 }
 
-struct Queue<T> {
-    priv state: UnsafeArc<State<T>>,
+pub struct Consumer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
 }
 
-impl<T: Send> Clone for Queue<T> {
-    fn clone(&self) -> Queue<T> {
-        Queue {
-            state: self.state.clone()
-        }
-    }
+pub struct Producer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
 }
 
-impl<T: Send> State<T> {
-    pub fn new() -> State<T> {
-        State{
-            pad0: [0, ..64],
-            head: AtomicPtr::new(mut_null()),
-            pad1: [0, ..64],
-            stub: Node::<T>::empty(),
-            pad2: [0, ..64],
-            tail: mut_null(),
-            pad3: [0, ..64],
-        }
+impl<T: Send, P: Send> Clone for Producer<T, P> {
+    fn clone(&self) -> Producer<T, P> {
+        Producer { state: self.state.clone() }
     }
+}
 
-    fn init(&mut self) {
-        let stub = self.get_stub_unsafe();
-        self.head.store(stub, Relaxed);
-        self.tail = stub;
+pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) {
+    unsafe {
+        let (a, b) = UnsafeArc::new2(State::new(p));
+        (Consumer { state: a }, Producer { state: b })
     }
+}
 
-    fn get_stub_unsafe(&mut self) -> *mut Node<T> {
-        to_mut_unsafe_ptr(&mut self.stub)
+impl<T> Node<T> {
+    unsafe fn new(v: Option<T>) -> *mut Node<T> {
+        cast::transmute(~Node {
+            next: AtomicPtr::new(0 as *mut Node<T>),
+            value: v,
+        })
     }
+}
 
-    fn push(&mut self, value: T) {
-        unsafe {
-            let node = cast::transmute(~Node::with_value(value));
-            self.push_node(node);
+impl<T: Send, P: Send> State<T, P> {
+    pub unsafe fn new(p: P) -> State<T, P> {
+        let stub = Node::new(None);
+        State {
+            head: AtomicPtr::new(stub),
+            tail: stub,
+            packet: p,
         }
     }
 
-    fn push_node(&mut self, node: *mut Node<T>) {
-        unsafe {
-            (*node).next.store(mut_null(), Release);
-            let prev = self.head.swap(node, Relaxed);
-            (*prev).next.store(node, Release);
-        }
+    unsafe fn push(&mut self, t: T) {
+        let n = Node::new(Some(t));
+        let prev = self.head.swap(n, AcqRel);
+        (*prev).next.store(n, Release);
     }
 
-    fn pop(&mut self) -> Option<T> {
-        unsafe {
-            let mut tail = self.tail;
-            let mut next = (*tail).next.load(Acquire);
-            let stub = self.get_stub_unsafe();
-            if tail == stub {
-                if mut_null() == next {
-                    return None
-                }
-                self.tail = next;
-                tail = next;
-                next = (*next).next.load(Acquire);
-            }
-            if next != mut_null() {
-                let tail: ~Node<T> = cast::transmute(tail);
-                self.tail = next;
-                return tail.value
-            }
-            let head = self.head.load(Relaxed);
-            if tail != head {
-                return None
-            }
-            self.push_node(stub);
-            next = (*tail).next.load(Acquire);
-            if next != mut_null() {
-                let tail: ~Node<T> = cast::transmute(tail);
-                self.tail = next;
-                return tail.value
-            }
+    unsafe fn pop(&mut self) -> PopResult<T> {
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+
+        if !next.is_null() {
+            self.tail = next;
+            assert!((*tail).value.is_none());
+            assert!((*next).value.is_some());
+            let ret = (*next).value.take_unwrap();
+            let _: ~Node<T> = cast::transmute(tail);
+            return Data(ret);
         }
-        None
+
+        if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
+    }
+
+    unsafe fn is_empty(&mut self) -> bool {
+        return (*self.tail).next.load(Acquire).is_null();
     }
 }
 
-impl<T: Send> Queue<T> {
-    pub fn new() -> Queue<T> {
+#[unsafe_destructor]
+impl<T: Send, P: Send> Drop for State<T, P> {
+    fn drop(&mut self) {
         unsafe {
-            let q = Queue{state: UnsafeArc::new(State::new())};
-            (*q.state.get()).init();
-            q
+            let mut cur = self.tail;
+            while !cur.is_null() {
+                let next = (*cur).next.load(Relaxed);
+                let _: ~Node<T> = cast::transmute(cur);
+                cur = next;
+            }
         }
     }
+}
 
+impl<T: Send, P: Send> Producer<T, P> {
     pub fn push(&mut self, value: T) {
         unsafe { (*self.state.get()).push(value) }
     }
+    pub fn is_empty(&self) -> bool {
+        unsafe{ (*self.state.get()).is_empty() }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
 
-    pub fn pop(&mut self) -> Option<T> {
-        unsafe{ (*self.state.get()).pop() }
+impl<T: Send, P: Send> Consumer<T, P> {
+    pub fn pop(&mut self) -> PopResult<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+    pub fn casual_pop(&mut self) -> Option<T> {
+        match self.pop() {
+            Data(t) => Some(t),
+            Empty | Inconsistent => None,
+        }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
     }
 }
 
 #[cfg(test)]
 mod tests {
     use prelude::*;
-    use option::*;
+
     use task;
-    use comm;
-    use super::Queue;
+    use super::{queue, Data, Empty, Inconsistent};
+
+    #[test]
+    fn test_full() {
+        let (_, mut p) = queue(());
+        p.push(~1);
+        p.push(~2);
+    }
 
     #[test]
     fn test() {
         let nthreads = 8u;
         let nmsgs = 1000u;
-        let mut q = Queue::new();
-        assert_eq!(None, q.pop());
+        let (mut c, p) = queue(());
+        match c.pop() {
+            Empty => {}
+            Inconsistent | Data(..) => fail!()
+        }
 
         for _ in range(0, nthreads) {
-            let (port, chan)  = comm::stream();
-            chan.send(q.clone());
+            let q = p.clone();
             do task::spawn_sched(task::SingleThreaded) {
-                let mut q = port.recv();
+                let mut q = q;
                 for i in range(0, nmsgs) {
                     q.push(i);
                 }
@@ -191,13 +204,10 @@ mod tests {
         }
 
         let mut i = 0u;
-        loop {
-            match q.pop() {
-                None => {},
-                Some(_) => {
-                    i += 1;
-                    if i == nthreads*nmsgs { break }
-                }
+        while i < nthreads * nmsgs {
+            match c.pop() {
+                Empty | Inconsistent => {},
+                Data(_) => { i += 1 }
             }
         }
     }
diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/rt/spsc_queue.rs
new file mode 100644
index 00000000000..f14533d726a
--- /dev/null
+++ b/src/libstd/rt/spsc_queue.rs
@@ -0,0 +1,296 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *    1. Redistributions of source code must retain the above copyright notice,
+ *       this list of conditions and the following disclaimer.
+ *
+ *    2. Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
+use cast;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
+use unstable::sync::UnsafeArc;
+
+// Node within the linked list queue of messages to send
+struct Node<T> {
+    // XXX: this could be an uninitialized T if we're careful enough, and
+    //      that would reduce memory usage (and be a bit faster).
+    //      is it worth it?
+    value: Option<T>,           // nullable for re-use of nodes
+    next: AtomicPtr<Node<T>>,   // next node in the queue
+}
+
+// The producer/consumer halves both need access to the `tail` field, and if
+// they both have access to that we may as well just give them both access
+// to this whole structure.
+struct State<T, P> {
+    // consumer fields
+    tail: *mut Node<T>, // where to pop from
+    tail_prev: AtomicPtr<Node<T>>, // where to pop from
+
+    // producer fields
+    head: *mut Node<T>,      // where to push to
+    first: *mut Node<T>,     // where to get new nodes from
+    tail_copy: *mut Node<T>, // between first/tail
+
+    // Cache maintenance fields. Additions and subtractions are stored
+    // separately in order to allow them to use nonatomic addition/subtraction.
+    cache_bound: uint,
+    cache_additions: AtomicUint,
+    cache_subtractions: AtomicUint,
+
+    packet: P,
+}
+
+pub struct Producer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+pub struct Consumer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+pub fn queue<T: Send, P: Send>(bound: uint,
+                               p: P) -> (Consumer<T, P>, Producer<T, P>)
+{
+    let n1 = Node::new();
+    let n2 = Node::new();
+    unsafe { (*n1).next.store(n2, Relaxed) }
+    let state = State {
+        tail: n2,
+        tail_prev: AtomicPtr::new(n1),
+        head: n2,
+        first: n1,
+        tail_copy: n1,
+        cache_bound: bound,
+        cache_additions: AtomicUint::new(0),
+        cache_subtractions: AtomicUint::new(0),
+        packet: p,
+    };
+    let (arc1, arc2) = UnsafeArc::new2(state);
+    (Consumer { state: arc1 }, Producer { state: arc2 })
+}
+
+impl<T: Send> Node<T> {
+    fn new() -> *mut Node<T> {
+        unsafe {
+            cast::transmute(~Node {
+                value: None,
+                next: AtomicPtr::new(0 as *mut Node<T>),
+            })
+        }
+    }
+}
+
+impl<T: Send, P: Send> Producer<T, P> {
+    pub fn push(&mut self, t: T) {
+        unsafe { (*self.state.get()).push(t) }
+    }
+    pub fn is_empty(&self) -> bool {
+        unsafe { (*self.state.get()).is_empty() }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+impl<T: Send, P: Send> Consumer<T, P> {
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+impl<T: Send, P: Send> State<T, P> {
+    // remember that there is only one thread executing `push` (and only one
+    // thread executing `pop`)
+    unsafe fn push(&mut self, t: T) {
+        // Acquire a node (which either uses a cached one or allocates a new
+        // one), and then append this to the 'head' node.
+        let n = self.alloc();
+        assert!((*n).value.is_none());
+        (*n).value = Some(t);
+        (*n).next.store(0 as *mut Node<T>, Relaxed);
+        (*self.head).next.store(n, Release);
+        self.head = n;
+    }
+
+    unsafe fn alloc(&mut self) -> *mut Node<T> {
+        // First try to see if we can consume the 'first' node for our uses.
+        // We try to avoid as many atomic instructions as possible here, so
+        // the addition to cache_subtractions is not atomic (plus we're the
+        // only one subtracting from the cache).
+        if self.first != self.tail_copy {
+            if self.cache_bound > 0 {
+                let b = self.cache_subtractions.load(Relaxed);
+                self.cache_subtractions.store(b + 1, Relaxed);
+            }
+            let ret = self.first;
+            self.first = (*ret).next.load(Relaxed);
+            return ret;
+        }
+        // If the above fails, then update our copy of the tail and try
+        // again.
+        self.tail_copy = self.tail_prev.load(Acquire);
+        if self.first != self.tail_copy {
+            if self.cache_bound > 0 {
+                let b = self.cache_subtractions.load(Relaxed);
+                self.cache_subtractions.store(b + 1, Relaxed);
+            }
+            let ret = self.first;
+            self.first = (*ret).next.load(Relaxed);
+            return ret;
+        }
+        // If all of that fails, then we have to allocate a new node
+        // (there's nothing in the node cache).
+        Node::new()
+    }
+
+    // remember that there is only one thread executing `pop` (and only one
+    // thread executing `push`)
+    unsafe fn pop(&mut self) -> Option<T> {
+        // The `tail` node is not actually a used node, but rather a
+        // sentinel from where we should start popping from. Hence, look at
+        // tail's next field and see if we can use it. If we do a pop, then
+        // the current tail node is a candidate for going into the cache.
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+        if next.is_null() { return None }
+        assert!((*next).value.is_some());
+        let ret = (*next).value.take();
+
+        self.tail = next;
+        if self.cache_bound == 0 {
+            self.tail_prev.store(tail, Release);
+        } else {
+            // XXX: this is dubious with overflow.
+            let additions = self.cache_additions.load(Relaxed);
+            let subtractions = self.cache_subtractions.load(Relaxed);
+            let size = additions - subtractions;
+
+            if size < self.cache_bound {
+                self.tail_prev.store(tail, Release);
+                self.cache_additions.store(additions + 1, Relaxed);
+            } else {
+                (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
+                // We have successfully erased all references to 'tail', so
+                // now we can safely drop it.
+                let _: ~Node<T> = cast::transmute(tail);
+            }
+        }
+        return ret;
+    }
+
+    unsafe fn is_empty(&self) -> bool {
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+        return next.is_null();
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send, P: Send> Drop for State<T, P> {
+    fn drop(&mut self) {
+        unsafe {
+            let mut cur = self.first;
+            while !cur.is_null() {
+                let next = (*cur).next.load(Relaxed);
+                let _n: ~Node<T> = cast::transmute(cur);
+                cur = next;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+    use super::queue;
+    use task;
+
+    #[test]
+    fn smoke() {
+        let (mut c, mut p) = queue(0, ());
+        p.push(1);
+        p.push(2);
+        assert_eq!(c.pop(), Some(1));
+        assert_eq!(c.pop(), Some(2));
+        assert_eq!(c.pop(), None);
+        p.push(3);
+        p.push(4);
+        assert_eq!(c.pop(), Some(3));
+        assert_eq!(c.pop(), Some(4));
+        assert_eq!(c.pop(), None);
+    }
+
+    #[test]
+    fn drop_full() {
+        let (_, mut p) = queue(0, ());
+        p.push(~1);
+        p.push(~2);
+    }
+
+    #[test]
+    fn smoke_bound() {
+        let (mut c, mut p) = queue(1, ());
+        p.push(1);
+        p.push(2);
+        assert_eq!(c.pop(), Some(1));
+        assert_eq!(c.pop(), Some(2));
+        assert_eq!(c.pop(), None);
+        p.push(3);
+        p.push(4);
+        assert_eq!(c.pop(), Some(3));
+        assert_eq!(c.pop(), Some(4));
+        assert_eq!(c.pop(), None);
+    }
+
+    #[test]
+    fn stress() {
+        stress_bound(0);
+        stress_bound(1);
+
+        fn stress_bound(bound: uint) {
+            let (c, mut p) = queue(bound, ());
+            do task::spawn_sched(task::SingleThreaded) {
+                let mut c = c;
+                for _ in range(0, 100000) {
+                    loop {
+                        match c.pop() {
+                            Some(1) => break,
+                            Some(_) => fail!(),
+                            None => {}
+                        }
+                    }
+                }
+            }
+            for _ in range(0, 100000) {
+                p.push(1);
+            }
+        }
+    }
+}
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 86cc895eb27..2adc32f33fb 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -26,7 +26,6 @@ use option::{Option, Some, None};
 use rt::borrowck::BorrowRecord;
 use rt::borrowck;
 use rt::context::Context;
-use rt::context;
 use rt::env;
 use io::Writer;
 use rt::kill::Death;