about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs1141
-rw-r--r--src/libstd/rt/kill.rs37
-rw-r--r--src/libstd/rt/local_ptr.rs3
-rw-r--r--src/libstd/rt/message_queue.rs55
-rw-r--r--src/libstd/rt/mod.rs21
-rw-r--r--src/libstd/rt/rtio.rs4
-rw-r--r--src/libstd/rt/sched.rs105
-rw-r--r--src/libstd/rt/select.rs29
-rw-r--r--src/libstd/rt/task.rs62
-rw-r--r--src/libstd/rt/test.rs22
-rw-r--r--src/libstd/rt/thread.rs187
11 files changed, 220 insertions, 1446 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
deleted file mode 100644
index 2fa34994292..00000000000
--- a/src/libstd/rt/comm.rs
+++ /dev/null
@@ -1,1141 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Ports and channels.
-
-use option::*;
-use cast;
-use ops::Drop;
-use rt::kill::BlockedTask;
-use kinds::Send;
-use rt;
-use rt::sched::Scheduler;
-use rt::local::Local;
-use rt::select::{SelectInner, SelectPortInner};
-use select::{Select, SelectPort};
-use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
-use unstable::sync::UnsafeArc;
-use util;
-use util::Void;
-use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable, SendDeferred};
-use cell::RefCell;
-use clone::Clone;
-use tuple::ImmutableTuple;
-
-/// A combined refcount / BlockedTask-as-uint pointer.
-///
-/// Can be equal to the following values:
-///
-/// * 2 - both endpoints are alive
-/// * 1 - either the sender or the receiver is dead, determined by context
-/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
-type State = uint;
-
-static STATE_BOTH: State = 2;
-static STATE_ONE: State = 1;
-
-/// The heap-allocated structure shared between two endpoints.
-struct Packet<T> {
-    state: AtomicUint,
-    payload: Option<T>,
-}
-
-// A one-shot channel.
-pub struct ChanOne<T> {
-    priv void_packet: *mut Void,
-    priv suppress_finalize: bool
-}
-
-/// A one-shot port.
-pub struct PortOne<T> {
-    priv void_packet: *mut Void,
-    priv suppress_finalize: bool
-}
-
-pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
-    let packet: ~Packet<T> = ~Packet {
-        state: AtomicUint::new(STATE_BOTH),
-        payload: None
-    };
-
-    unsafe {
-        let packet: *mut Void = cast::transmute(packet);
-        let port = PortOne {
-            void_packet: packet,
-            suppress_finalize: false
-        };
-        let chan = ChanOne {
-            void_packet: packet,
-            suppress_finalize: false
-        };
-        return (port, chan);
-    }
-}
-
-impl<T: Send> ChanOne<T> {
-    #[inline]
-    fn packet(&self) -> *mut Packet<T> {
-        unsafe {
-            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
-            let p: *mut Packet<T> = &mut **p;
-            return p;
-        }
-    }
-
-    /// Send a message on the one-shot channel. If a receiver task is blocked
-    /// waiting for the message, will wake it up and reschedule to it.
-    pub fn send(self, val: T) {
-        self.try_send(val);
-    }
-
-    /// As `send`, but also returns whether or not the receiver endpoint is still open.
-    pub fn try_send(self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-
-    /// Send a message without immediately rescheduling to a blocked receiver.
-    /// This can be useful in contexts where rescheduling is forbidden, or to
-    /// optimize for when the sender expects to still have useful work to do.
-    pub fn send_deferred(self, val: T) {
-        self.try_send_deferred(val);
-    }
-
-    /// As `send_deferred` and `try_send` together.
-    pub fn try_send_deferred(self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-
-    // 'do_resched' configures whether the scheduler immediately switches to
-    // the receiving task, or leaves the sending task still running.
-    fn try_send_inner(mut self, val: T, do_resched: bool) -> bool {
-        if do_resched {
-            rtassert!(!rt::in_sched_context());
-        }
-
-        // In order to prevent starvation of other tasks in situations
-        // where a task sends repeatedly without ever receiving, we
-        // occassionally yield instead of doing a send immediately.
-        // Only doing this if we're doing a rescheduling send,
-        // otherwise the caller is expecting not to context switch.
-        if do_resched {
-            // XXX: This TLS hit should be combined with other uses of the scheduler below
-            let sched: ~Scheduler = Local::take();
-            sched.maybe_yield();
-        }
-
-        let mut recvr_active = true;
-        let packet = self.packet();
-
-        unsafe {
-
-            // Install the payload
-            rtassert!((*packet).payload.is_none());
-            (*packet).payload = Some(val);
-
-            // Atomically swap out the old state to figure out what
-            // the port's up to, issuing a release barrier to prevent
-            // reordering of the payload write. This also issues an
-            // acquire barrier that keeps the subsequent access of the
-            // ~Task pointer from being reordered.
-            let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
-
-            // Suppress the synchronizing actions in the finalizer. We're
-            // done with the packet. NB: In case of do_resched, this *must*
-            // happen before waking up a blocked task (or be unkillable),
-            // because we might get a kill signal during the reschedule.
-            self.suppress_finalize = true;
-
-            match oldstate {
-                STATE_BOTH => {
-                    // Port is not waiting yet. Nothing to do
-                }
-                STATE_ONE => {
-                    // Port has closed. Need to clean up.
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                    recvr_active = false;
-                }
-                task_as_state => {
-                    // Port is blocked. Wake it up.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    if do_resched {
-                        recvr.wake().map(|woken_task| {
-                            Scheduler::run_task(woken_task);
-                        });
-                    } else {
-                        let mut sched = Local::borrow(None::<Scheduler>);
-                        sched.get().enqueue_blocked_task(recvr);
-                    }
-                }
-            }
-        }
-
-        return recvr_active;
-    }
-}
-
-impl<T: Send> PortOne<T> {
-    fn packet(&self) -> *mut Packet<T> {
-        unsafe {
-            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
-            let p: *mut Packet<T> = &mut **p;
-            return p;
-        }
-    }
-
-    /// Wait for a message on the one-shot port. Fails if the send end is closed.
-    pub fn recv(self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on closed channel");
-            }
-        }
-    }
-
-    /// As `recv`, but returns `None` if the send end is closed rather than failing.
-    pub fn try_recv(mut self) -> Option<T> {
-        // Optimistic check. If data was sent already, we don't even need to block.
-        // No release barrier needed here; we're not handing off our task pointer yet.
-        if !self.optimistic_check() {
-            // No data available yet.
-            // Switch to the scheduler to put the ~Task into the Packet state.
-            let sched: ~Scheduler = Local::take();
-            sched.deschedule_running_task_and_then(|sched, task| {
-                self.block_on(sched, task);
-            })
-        }
-
-        // Task resumes.
-        self.recv_ready()
-    }
-}
-
-impl<T: Send> SelectInner for PortOne<T> {
-    #[inline] #[cfg(not(test))]
-    fn optimistic_check(&mut self) -> bool {
-        unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
-    }
-
-    #[inline] #[cfg(test)]
-    fn optimistic_check(&mut self) -> bool {
-        // The optimistic check is never necessary for correctness. For testing
-        // purposes, making it randomly return false simulates a racing sender.
-        use rand::{Rand};
-        let mut sched = Local::borrow(None::<Scheduler>);
-        let actually_check = Rand::rand(&mut sched.get().rng);
-        if actually_check {
-            unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
-        } else {
-            false
-        }
-    }
-
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        unsafe {
-            // Atomically swap the task pointer into the Packet state, issuing
-            // an acquire barrier to prevent reordering of the subsequent read
-            // of the payload. Also issues a release barrier to prevent
-            // reordering of any previous writes to the task structure.
-            let task_as_state = task.cast_to_uint();
-            let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Data has not been sent. Now we're blocked.
-                    rtdebug!("non-rendezvous recv");
-                    false
-                }
-                STATE_ONE => {
-                    // Re-record that we are the only owner of the packet.
-                    // No barrier needed, even if the task gets reawoken
-                    // on a different core -- this is analogous to writing a
-                    // payload; a barrier in enqueueing the task protects it.
-                    // NB(#8132). This *must* occur before the enqueue below.
-                    // FIXME(#6842, #8130) This is usually only needed for the
-                    // assertion in recv_ready, except in the case of select().
-                    // This won't actually ever have cacheline contention, but
-                    // maybe should be optimized out with a cfg(test) anyway?
-                    (*self.packet()).state.store(STATE_ONE, Relaxed);
-
-                    rtdebug!("rendezvous recv");
-
-                    // Channel is closed. Switch back and check the data.
-                    // NB: We have to drop back into the scheduler event loop here
-                    // instead of switching immediately back or we could end up
-                    // triggering infinite recursion on the scheduler's stack.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    sched.enqueue_blocked_task(recvr);
-                    true
-                }
-                _ => rtabort!("can't block_on; a task is already blocked")
-            }
-        }
-    }
-
-    // This is the only select trait function that's not also used in recv.
-    fn unblock_from(&mut self) -> bool {
-        let packet = self.packet();
-        unsafe {
-            // In case the data is available, the acquire barrier here matches
-            // the release barrier the sender used to release the payload.
-            match (*packet).state.load(Acquire) {
-                // Impossible. We removed STATE_BOTH when blocking on it, and
-                // no self-respecting sender would put it back.
-                STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
-                // Here, a sender already tried to wake us up. Perhaps they
-                // even succeeded! Data is available.
-                STATE_ONE     => true,
-                // Still registered as blocked. Need to "unblock" the pointer.
-                task_as_state => {
-                    // In the window between the load and the CAS, a sender
-                    // might take the pointer and set the refcount to ONE. If
-                    // that happens, we shouldn't clobber that with BOTH!
-                    // Acquire barrier again for the same reason as above.
-                    match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
-                                                           Acquire) {
-                        STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
-                        STATE_ONE  => true, // Lost the race. Data available.
-                        same_ptr   => {
-                            // We successfully unblocked our task pointer.
-                            rtassert!(task_as_state == same_ptr);
-                            let handle = BlockedTask::cast_from_uint(task_as_state);
-                            // Because we are already awake, the handle we
-                            // gave to this port shall already be empty.
-                            handle.assert_already_awake();
-                            false
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
-
-impl<T: Send> Select for PortOne<T> { }
-
-impl<T: Send> SelectPortInner<T> for PortOne<T> {
-    fn recv_ready(mut self) -> Option<T> {
-        let packet = self.packet();
-
-        // No further memory barrier is needed here to access the
-        // payload. Some scenarios:
-        //
-        // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
-        // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
-        //    and ran on its thread. The sending task issued a read barrier when taking the
-        //    pointer to the receiving task.
-        // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
-        //    is pinned to some other scheduler, so the sending task had to give us to
-        //    a different scheduler for resuming. That send synchronized memory.
-        unsafe {
-            // See corresponding store() above in block_on for rationale.
-            // FIXME(#8130) This can happen only in test builds.
-            // This load is not required for correctness and may be compiled out.
-            rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
-
-            let payload = (*packet).payload.take();
-
-            // The sender has closed up shop. Drop the packet.
-            let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-            // Suppress the synchronizing actions in the finalizer. We're done with the packet.
-            self.suppress_finalize = true;
-            return payload;
-        }
-    }
-}
-
-impl<T: Send> SelectPort<T> for PortOne<T> { }
-
-impl<T: Send> Peekable<T> for PortOne<T> {
-    fn peek(&self) -> bool {
-        unsafe {
-            let packet: *mut Packet<T> = self.packet();
-            let oldstate = (*packet).state.load(SeqCst);
-            match oldstate {
-                STATE_BOTH => false,
-                STATE_ONE => (*packet).payload.is_some(),
-                _ => rtabort!("peeked on a blocked task")
-            }
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for ChanOne<T> {
-    fn drop(&mut self) {
-        if self.suppress_finalize { return }
-
-        unsafe {
-            let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Port still active. It will destroy the Packet.
-                },
-                STATE_ONE => {
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                },
-                task_as_state => {
-                    // The port is blocked waiting for a message we will never send. Wake it.
-                    rtassert!((*self.packet()).payload.is_none());
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    recvr.wake().map(|woken_task| {
-                        Scheduler::run_task(woken_task);
-                    });
-                }
-            }
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for PortOne<T> {
-    fn drop(&mut self) {
-        if self.suppress_finalize { return }
-
-        unsafe {
-            let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst);
-            match oldstate {
-                STATE_BOTH => {
-                    // Chan still active. It will destroy the packet.
-                },
-                STATE_ONE => {
-                    let _packet: ~Packet<T> = cast::transmute(self.void_packet);
-                }
-                task_as_state => {
-                    // This case occurs during unwinding, when the blocked
-                    // receiver was killed awake. The task can't still be
-                    // blocked (we are it), but we need to free the handle.
-                    let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    recvr.assert_already_awake();
-                }
-            }
-        }
-    }
-}
-
-struct StreamPayload<T> {
-    val: T,
-    next: PortOne<StreamPayload<T>>
-}
-
-type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
-type StreamPortOne<T> = PortOne<StreamPayload<T>>;
-
-/// A channel with unbounded size.
-pub struct Chan<T> {
-    // FIXME #5372. Using RefCell because we don't take &mut self
-    next: RefCell<StreamChanOne<T>>
-}
-
-/// An port with unbounded size.
-pub struct Port<T> {
-    // FIXME #5372. Using RefCell because we don't take &mut self
-    next: RefCell<Option<StreamPortOne<T>>>
-}
-
-pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
-    let (pone, cone) = oneshot();
-    let port = Port { next: RefCell::new(Some(pone)) };
-    let chan = Chan { next: RefCell::new(cone) };
-    return (port, chan);
-}
-
-impl<T: Send> Chan<T> {
-    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
-        let (next_pone, mut cone) = oneshot();
-        let mut b = self.next.borrow_mut();
-        util::swap(&mut cone, b.get());
-        cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
-    }
-}
-
-impl<T: Send> GenericChan<T> for Chan<T> {
-    fn send(&self, val: T) {
-        self.try_send(val);
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for Chan<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for Chan<T> {
-    fn send_deferred(&self, val: T) {
-        self.try_send_deferred(val);
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-}
-
-impl<T: Send> GenericPort<T> for Port<T> {
-    fn recv(&self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on closed channel");
-            }
-        }
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        let mut b = self.next.borrow_mut();
-        b.get().take().map_default(None, |pone| {
-            match pone.try_recv() {
-                Some(StreamPayload { val, next }) => {
-                    *b.get() = Some(next);
-                    Some(val)
-                }
-                None => None
-            }
-        })
-    }
-}
-
-impl<T: Send> Peekable<T> for Port<T> {
-    fn peek(&self) -> bool {
-        self.next.with_mut(|p| p.get_mut_ref().peek())
-    }
-}
-
-// XXX: Kind of gross. A Port<T> should be selectable so you can make an array
-// of them, but a &Port<T> should also be selectable so you can select2 on it
-// alongside a PortOne<U> without passing the port by value in recv_ready.
-
-impl<'a, T: Send> SelectInner for &'a Port<T> {
-    #[inline]
-    fn optimistic_check(&mut self) -> bool {
-        self.next.with_mut(|pone| { pone.get_mut_ref().optimistic_check() })
-    }
-
-    #[inline]
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        let mut b = self.next.borrow_mut();
-        b.get().get_mut_ref().block_on(sched, task)
-    }
-
-    #[inline]
-    fn unblock_from(&mut self) -> bool {
-        self.next.with_mut(|pone| { pone.get_mut_ref().unblock_from() })
-    }
-}
-
-impl<'a, T: Send> Select for &'a Port<T> { }
-
-impl<T: Send> SelectInner for Port<T> {
-    #[inline]
-    fn optimistic_check(&mut self) -> bool {
-        (&*self).optimistic_check()
-    }
-
-    #[inline]
-    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
-        (&*self).block_on(sched, task)
-    }
-
-    #[inline]
-    fn unblock_from(&mut self) -> bool {
-        (&*self).unblock_from()
-    }
-}
-
-impl<T: Send> Select for Port<T> { }
-
-impl<'a, T: Send> SelectPortInner<T> for &'a Port<T> {
-    fn recv_ready(self) -> Option<T> {
-        let mut b = self.next.borrow_mut();
-        match b.get().take_unwrap().recv_ready() {
-            Some(StreamPayload { val, next }) => {
-                *b.get() = Some(next);
-                Some(val)
-            }
-            None => None
-        }
-    }
-}
-
-impl<'a, T: Send> SelectPort<T> for &'a Port<T> { }
-
-pub struct SharedChan<T> {
-    // Just like Chan, but a shared AtomicOption
-    priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
-}
-
-impl<T: Send> SharedChan<T> {
-    pub fn new(chan: Chan<T>) -> SharedChan<T> {
-        let next = chan.next.unwrap();
-        let next = AtomicOption::new(~next);
-        SharedChan { next: UnsafeArc::new(next) }
-    }
-}
-
-impl<T: Send> SharedChan<T> {
-    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
-        unsafe {
-            let (next_pone, next_cone) = oneshot();
-            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
-            cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
-                                         do_resched)
-        }
-    }
-}
-
-impl<T: Send> GenericChan<T> for SharedChan<T> {
-    fn send(&self, val: T) {
-        self.try_send(val);
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.try_send_inner(val, true)
-    }
-}
-
-impl<T: Send> SendDeferred<T> for SharedChan<T> {
-    fn send_deferred(&self, val: T) {
-        self.try_send_deferred(val);
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.try_send_inner(val, false)
-    }
-}
-
-impl<T: Send> Clone for SharedChan<T> {
-    fn clone(&self) -> SharedChan<T> {
-        SharedChan {
-            next: self.next.clone()
-        }
-    }
-}
-
-pub struct SharedPort<T> {
-    // The next port on which we will receive the next port on which we will receive T
-    priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
-}
-
-impl<T: Send> SharedPort<T> {
-    pub fn new(port: Port<T>) -> SharedPort<T> {
-        // Put the data port into a new link pipe
-        let next_data_port = port.next.unwrap().unwrap();
-        let (next_link_port, next_link_chan) = oneshot();
-        next_link_chan.send(next_data_port);
-        let next_link = AtomicOption::new(~next_link_port);
-        SharedPort { next_link: UnsafeArc::new(next_link) }
-    }
-}
-
-impl<T: Send> GenericPort<T> for SharedPort<T> {
-    fn recv(&self) -> T {
-        match self.try_recv() {
-            Some(val) => val,
-            None => {
-                fail!("receiving on a closed channel");
-            }
-        }
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        unsafe {
-            let (next_link_port, next_link_chan) = oneshot();
-            let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
-            let link_port = link_port.unwrap();
-            let data_port = link_port.recv();
-            let (next_data_port, res) = match data_port.try_recv() {
-                Some(StreamPayload { val, next }) => {
-                    (next, Some(val))
-                }
-                None => {
-                    let (next_data_port, _) = oneshot();
-                    (next_data_port, None)
-                }
-            };
-            next_link_chan.send(next_data_port);
-            return res;
-        }
-    }
-}
-
-impl<T: Send> Clone for SharedPort<T> {
-    fn clone(&self) -> SharedPort<T> {
-        SharedPort {
-            next_link: self.next_link.clone()
-        }
-    }
-}
-
-// FIXME #7760: Need better name
-type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
-
-pub fn megapipe<T: Send>() -> MegaPipe<T> {
-    let (port, chan) = stream();
-    (SharedPort::new(port), SharedChan::new(chan))
-}
-
-impl<T: Send> GenericChan<T> for MegaPipe<T> {
-    fn send(&self, val: T) {
-        self.second_ref().send(val)
-    }
-}
-
-impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
-    fn try_send(&self, val: T) -> bool {
-        self.second_ref().try_send(val)
-    }
-}
-
-impl<T: Send> GenericPort<T> for MegaPipe<T> {
-    fn recv(&self) -> T {
-        self.first_ref().recv()
-    }
-
-    fn try_recv(&self) -> Option<T> {
-        self.first_ref().try_recv()
-    }
-}
-
-impl<T: Send> SendDeferred<T> for MegaPipe<T> {
-    fn send_deferred(&self, val: T) {
-        self.second_ref().send_deferred(val)
-    }
-    fn try_send_deferred(&self, val: T) -> bool {
-        self.second_ref().try_send_deferred(val)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use option::*;
-    use rt::test::*;
-    use num::Times;
-    use rt::util;
-
-    #[test]
-    fn oneshot_single_thread_close_port_first() {
-        // Simple test of closing without sending
-        do run_in_newsched_task {
-            let (port, _chan) = oneshot::<int>();
-            { let _p = port; }
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_close_chan_first() {
-        // Simple test of closing without sending
-        do run_in_newsched_task {
-            let (_port, chan) = oneshot::<int>();
-            { let _c = chan; }
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_send_port_close() {
-        // Testing that the sender cleans up the payload if receiver is closed
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            { let _p = port; }
-            chan.send(~0);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_recv_chan_close() {
-        // Receiving on a closed chan will fail
-        do run_in_newsched_task {
-            let res = do spawntask_try {
-                let (port, chan) = oneshot::<~int>();
-                { let _c = chan; }
-                port.recv();
-            };
-            // What is our res?
-            rtdebug!("res is: {:?}", res.is_err());
-            assert!(res.is_err());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_send_then_recv() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            chan.send(~10);
-            assert!(port.recv() == ~10);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_send_open() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            assert!(chan.try_send(10));
-            assert!(port.recv() == 10);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_send_closed() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _p = port; }
-            assert!(!chan.try_send(10));
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_recv_open() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            chan.send(10);
-            assert!(port.try_recv() == Some(10));
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_try_recv_closed() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _c = chan; }
-            assert!(port.try_recv() == None);
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_data() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            assert!(!port.peek());
-            chan.send(10);
-            assert!(port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_close() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<int>();
-            { let _c = chan; }
-            assert!(!port.peek());
-            assert!(!port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_single_thread_peek_open() {
-        do run_in_newsched_task {
-            let (port, _) = oneshot::<int>();
-            assert!(!port.peek());
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_task_recv_then_send() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            do spawntask {
-                assert!(port.recv() == ~10);
-            }
-
-            chan.send(~10);
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_task_recv_then_close() {
-        do run_in_newsched_task {
-            let (port, chan) = oneshot::<~int>();
-            do spawntask_later {
-                let _ = chan;
-            }
-            let res = do spawntask_try {
-                assert!(port.recv() == ~10);
-            };
-            assert!(res.is_err());
-        }
-    }
-
-    #[test]
-    fn oneshot_multi_thread_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread = do spawntask_thread {
-                    let _ = port;
-                };
-                let _chan = chan;
-                thread.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_send_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread1 = do spawntask_thread {
-                    let _ = port;
-                };
-                let thread2 = do spawntask_thread {
-                    chan.send(1);
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_recv_close_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<int>();
-                let thread1 = do spawntask_thread {
-                    let port = port;
-                    let res = do spawntask_try {
-                        port.recv();
-                    };
-                    assert!(res.is_err());
-                };
-                let thread2 = do spawntask_thread {
-                    let chan = chan;
-                    do spawntask {
-                        let _ = chan;
-                    }
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn oneshot_multi_thread_send_recv_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_newsched_task {
-                let (port, chan) = oneshot::<~int>();
-                let thread1 = do spawntask_thread {
-                    chan.send(~10);
-                };
-                let thread2 = do spawntask_thread {
-                    assert!(port.recv() == ~10);
-                };
-                thread1.join();
-                thread2.join();
-            }
-        })
-    }
-
-    #[test]
-    fn stream_send_recv_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        stress_factor().times(|| {
-            do run_in_mt_newsched_task {
-                let (port, chan) = stream::<~int>();
-
-                send(chan, 0);
-                recv(port, 0);
-
-                fn send(chan: Chan<~int>, i: int) {
-                    if i == 10 { return }
-
-                    do spawntask_random {
-                        chan.send(~i);
-                        send(chan, i + 1);
-                    }
-                }
-
-                fn recv(port: Port<~int>, i: int) {
-                    if i == 10 { return }
-
-                    do spawntask_random {
-                        assert!(port.recv() == ~i);
-                        recv(port, i + 1);
-                    };
-                }
-            }
-        })
-    }
-
-    #[test]
-    fn recv_a_lot() {
-        // Regression test that we don't run out of stack in scheduler context
-        do run_in_newsched_task {
-            let (port, chan) = stream();
-            10000.times(|| { chan.send(()) });
-            10000.times(|| { port.recv() });
-        }
-    }
-
-    #[test]
-    fn shared_chan_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        do run_in_mt_newsched_task {
-            let (port, chan) = stream();
-            let chan = SharedChan::new(chan);
-            let total = stress_factor() + 100;
-            total.times(|| {
-                let chan_clone = chan.clone();
-                do spawntask_random {
-                    chan_clone.send(());
-                }
-            });
-
-            total.times(|| {
-                port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn shared_port_stress() {
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream();
-            let (port, chan) = stream();
-            let end_chan = SharedChan::new(end_chan);
-            let port = SharedPort::new(port);
-            let total = stress_factor() + 100;
-            total.times(|| {
-                let end_chan_clone = end_chan.clone();
-                let port_clone = port.clone();
-                do spawntask_random {
-                    port_clone.recv();
-                    end_chan_clone.send(());
-                }
-            });
-
-            total.times(|| {
-                chan.send(());
-            });
-
-            total.times(|| {
-                end_port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn shared_port_close_simple() {
-        do run_in_mt_newsched_task {
-            let (port, chan) = stream::<()>();
-            let port = SharedPort::new(port);
-            { let _chan = chan; }
-            assert!(port.try_recv().is_none());
-        }
-    }
-
-    #[test]
-    fn shared_port_close() {
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream::<bool>();
-            let (port, chan) = stream::<()>();
-            let end_chan = SharedChan::new(end_chan);
-            let port = SharedPort::new(port);
-            let chan = SharedChan::new(chan);
-            let send_total = 10;
-            let recv_total = 20;
-            do spawntask_random {
-                send_total.times(|| {
-                    let chan_clone = chan.clone();
-                    do spawntask_random {
-                        chan_clone.send(());
-                    }
-                });
-            }
-            let end_chan_clone = end_chan.clone();
-            do spawntask_random {
-                recv_total.times(|| {
-                    let port_clone = port.clone();
-                    let end_chan_clone = end_chan_clone.clone();
-                    do spawntask_random {
-                        let recvd = port_clone.try_recv().is_some();
-                        end_chan_clone.send(recvd);
-                    }
-                });
-            }
-
-            let mut recvd = 0;
-            recv_total.times(|| {
-                recvd += if end_port.recv() { 1 } else { 0 };
-            });
-
-            assert!(recvd == send_total);
-        }
-    }
-
-    #[test]
-    fn megapipe_stress() {
-        use rand;
-        use rand::Rng;
-
-        if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
-
-        do run_in_mt_newsched_task {
-            let (end_port, end_chan) = stream::<()>();
-            let end_chan = SharedChan::new(end_chan);
-            let pipe = megapipe();
-            let total = stress_factor() + 10;
-            let mut rng = rand::rng();
-            total.times(|| {
-                let msgs = rng.gen_range(0u, 10);
-                let pipe_clone = pipe.clone();
-                let end_chan_clone = end_chan.clone();
-                do spawntask_random {
-                    msgs.times(|| {
-                        pipe_clone.send(());
-                    });
-                    msgs.times(|| {
-                        pipe_clone.recv();
-                    });
-                }
-
-                end_chan_clone.send(());
-            });
-
-            total.times(|| {
-                end_port.recv();
-            });
-        }
-    }
-
-    #[test]
-    fn send_deferred() {
-        use unstable::sync::atomic;
-
-        // Tests no-rescheduling of send_deferred on all types of channels.
-        do run_in_newsched_task {
-            let (pone, cone) = oneshot();
-            let (pstream, cstream) = stream();
-            let (pshared, cshared) = stream();
-            let cshared = SharedChan::new(cshared);
-            let mp = megapipe();
-
-            do spawntask { pone.recv(); }
-            do spawntask { pstream.recv(); }
-            do spawntask { pshared.recv(); }
-            let p_mp = mp.clone();
-            do spawntask { p_mp.recv(); }
-
-            unsafe {
-                let _guard = atomic();
-                cone.send_deferred(());
-                cstream.send_deferred(());
-                cshared.send_deferred(());
-                mp.send_deferred(());
-            }
-        }
-    }
-
-}
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index e3f9cd09632..f4f128cf5aa 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -153,8 +153,9 @@ There are two known issues with the current scheme for exit code propagation.
 use cast;
 use option::{Option, Some, None};
 use prelude::*;
+use iter;
+use task::TaskResult;
 use rt::task::Task;
-use rt::task::UnwindResult;
 use unstable::atomics::{AtomicUint, SeqCst};
 use unstable::sync::UnsafeArc;
 
@@ -169,11 +170,21 @@ pub enum BlockedTask {
 pub struct Death {
     // Action to be done with the exit code. If set, also makes the task wait
     // until all its watched children exit before collecting the status.
-    on_exit:         Option<proc(UnwindResult)>,
+    on_exit:         Option<proc(TaskResult)>,
     // nesting level counter for unstable::atomically calls (0 == can deschedule).
     priv wont_sleep:      int,
 }
 
+pub struct BlockedTaskIterator {
+    priv inner: UnsafeArc<AtomicUint>,
+}
+
+impl Iterator<BlockedTask> for BlockedTaskIterator {
+    fn next(&mut self) -> Option<BlockedTask> {
+        Some(Shared(self.inner.clone()))
+    }
+}
+
 impl BlockedTask {
     /// Returns Some if the task was successfully woken; None if already killed.
     pub fn wake(self) -> Option<~Task> {
@@ -194,19 +205,17 @@ impl BlockedTask {
     }
 
     /// Converts one blocked task handle to a list of many handles to the same.
-    pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
-        let handles = match self {
+    pub fn make_selectable(self, num_handles: uint)
+        -> iter::Take<BlockedTaskIterator>
+    {
+        let arc = match self {
             Owned(task) => {
-                let flag = unsafe {
-                    AtomicUint::new(cast::transmute(task))
-                };
-                UnsafeArc::newN(flag, num_handles)
+                let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
+                UnsafeArc::new(flag)
             }
-            Shared(arc) => arc.cloneN(num_handles),
+            Shared(arc) => arc.clone(),
         };
-        // Even if the task was unkillable before, we use 'Killable' because
-        // multiple pipes will have handles. It does not really mean killable.
-        handles.move_iter().map(|x| Shared(x)).collect()
+        BlockedTaskIterator{ inner: arc }.take(num_handles)
     }
 
     // This assertion has two flavours because the wake involves an atomic op.
@@ -254,10 +263,10 @@ impl Death {
     }
 
     /// Collect failure exit codes from children and propagate them to a parent.
-    pub fn collect_failure(&mut self, result: UnwindResult) {
+    pub fn collect_failure(&mut self, result: TaskResult) {
         match self.on_exit.take() {
+            Some(f) => f(result),
             None => {}
-            Some(on_exit) => on_exit(result),
         }
     }
 
diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs
index 66fe9742121..925aa802ad5 100644
--- a/src/libstd/rt/local_ptr.rs
+++ b/src/libstd/rt/local_ptr.rs
@@ -77,10 +77,9 @@ pub unsafe fn borrow<T>() -> Borrowed<T> {
 /// it wherever possible.
 #[cfg(not(windows), not(target_os = "android"))]
 pub mod compiled {
-    #[cfg(not(test))]
-    use libc::c_void;
     use cast;
     use option::{Option, Some, None};
+    #[cfg(not(test))] use libc::c_void;
 
     #[cfg(test)]
     pub use realstd::rt::shouldnt_be_public::RT_TLS_PTR;
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
deleted file mode 100644
index 10e457368f0..00000000000
--- a/src/libstd/rt/message_queue.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! A concurrent queue that supports multiple producers and a
-//! single consumer.
-
-use kinds::Send;
-use vec::OwnedVector;
-use option::Option;
-use clone::Clone;
-use rt::mpsc_queue::Queue;
-
-pub struct MessageQueue<T> {
-    priv queue: Queue<T>
-}
-
-impl<T: Send> MessageQueue<T> {
-    pub fn new() -> MessageQueue<T> {
-        MessageQueue {
-            queue: Queue::new()
-        }
-    }
-
-    #[inline]
-    pub fn push(&mut self, value: T) {
-        self.queue.push(value)
-    }
-
-    #[inline]
-    pub fn pop(&mut self) -> Option<T> {
-        self.queue.pop()
-    }
-
-    /// A pop that may sometimes miss enqueued elements, but is much faster
-    /// to give up without doing any synchronization
-    #[inline]
-    pub fn casual_pop(&mut self) -> Option<T> {
-        self.queue.pop()
-    }
-}
-
-impl<T: Send> Clone for MessageQueue<T> {
-    fn clone(&self) -> MessageQueue<T> {
-        MessageQueue {
-            queue: self.queue.clone()
-        }
-    }
-}
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index ce8d1ab1983..5d2179e8b96 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -65,7 +65,7 @@ use ptr::RawPtr;
 use rt::local::Local;
 use rt::sched::{Scheduler, Shutdown};
 use rt::sleeper_list::SleeperList;
-use rt::task::UnwindResult;
+use task::TaskResult;
 use rt::task::{Task, SchedTask, GreenTask, Sched};
 use send_str::SendStrStatic;
 use unstable::atomics::{AtomicInt, AtomicBool, SeqCst};
@@ -91,8 +91,6 @@ pub use self::kill::BlockedTask;
 // XXX: these probably shouldn't be public...
 #[doc(hidden)]
 pub mod shouldnt_be_public {
-    pub use super::select::SelectInner;
-    pub use super::select::{SelectInner, SelectPortInner};
     pub use super::local_ptr::native::maybe_tls_key;
     #[cfg(not(windows), not(target_os = "android"))]
     pub use super::local_ptr::compiled::RT_TLS_PTR;
@@ -123,11 +121,11 @@ pub mod rtio;
 /// or task-local storage.
 pub mod local;
 
-/// A parallel queue.
-pub mod message_queue;
-
 /// A mostly lock-free multi-producer, single consumer queue.
-mod mpsc_queue;
+pub mod mpsc_queue;
+
+/// A lock-free single-producer, single consumer queue.
+pub mod spsc_queue;
 
 /// A lock-free multi-producer, multi-consumer bounded queue.
 mod mpmc_bounded_queue;
@@ -169,11 +167,6 @@ pub mod rc;
 /// scheduler and task context
 pub mod tube;
 
-/// Simple reimplementation of std::comm
-pub mod comm;
-
-mod select;
-
 /// The runtime needs to be able to put a pointer into thread-local storage.
 mod local_ptr;
 
@@ -349,7 +342,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
     // When the main task exits, after all the tasks in the main
     // task tree, shut down the schedulers and set the exit code.
     let handles = handles;
-    let on_exit: proc(UnwindResult) = proc(exit_success) {
+    let on_exit: proc(TaskResult) = proc(exit_success) {
         unsafe {
             assert!(!(*exited_already.get()).swap(true, SeqCst),
                     "the runtime already exited");
@@ -361,7 +354,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
         }
 
         unsafe {
-            let exit_code = if exit_success.is_success() {
+            let exit_code = if exit_success.is_ok() {
                 use rt::util;
 
                 // If we're exiting successfully, then return the global
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 557d9c998ca..b54231421e3 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -10,7 +10,7 @@
 
 use c_str::CString;
 use cast;
-use comm::{SharedChan, PortOne, Port};
+use comm::{SharedChan, Port};
 use libc::c_int;
 use libc;
 use ops::Drop;
@@ -222,7 +222,7 @@ pub trait RtioUdpSocket : RtioSocket {
 
 pub trait RtioTimer {
     fn sleep(&mut self, msecs: u64);
-    fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
+    fn oneshot(&mut self, msecs: u64) -> Port<()>;
     fn period(&mut self, msecs: u64) -> Port<()>;
 }
 
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index fa17efc833b..ac3aeb5a4bb 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -17,7 +17,6 @@ use super::stack::{StackPool};
 use super::rtio::EventLoop;
 use super::context::Context;
 use super::task::{Task, AnySched, Sched};
-use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
 use rt::deque;
 use rt::local_ptr;
@@ -29,6 +28,7 @@ use iter::range;
 use unstable::mutex::Mutex;
 use vec::{OwnedVector};
 
+use mpsc = super::mpsc_queue;
 
 /// A scheduler is responsible for coordinating the execution of Tasks
 /// on a single thread. The scheduler runs inside a slightly modified
@@ -47,7 +47,9 @@ pub struct Scheduler {
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
-    message_queue: MessageQueue<SchedMessage>,
+    message_queue: mpsc::Consumer<SchedMessage, ()>,
+    /// Producer used to clone sched handles from
+    message_producer: mpsc::Producer<SchedMessage, ()>,
     /// A shared list of sleeping schedulers. We'll use this to wake
     /// up schedulers when pushing work onto the work queue.
     sleeper_list: SleeperList,
@@ -104,7 +106,7 @@ enum EffortLevel {
     GiveItYourBest
 }
 
-static MAX_YIELD_CHECKS: uint = 200;
+static MAX_YIELD_CHECKS: uint = 20000;
 
 fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
     let r: uint = Rand::rand(rng);
@@ -135,9 +137,11 @@ impl Scheduler {
                        friend: Option<SchedHandle>)
         -> Scheduler {
 
+        let (consumer, producer) = mpsc::queue(());
         let mut sched = Scheduler {
             sleeper_list: sleeper_list,
-            message_queue: MessageQueue::new(),
+            message_queue: consumer,
+            message_producer: producer,
             sleepy: false,
             no_sleep: false,
             event_loop: event_loop,
@@ -218,7 +222,7 @@ impl Scheduler {
 
         // Should not have any messages
         let message = stask.sched.get_mut_ref().message_queue.pop();
-        rtassert!(message.is_none());
+        rtassert!(match message { mpsc::Empty => true, _ => false });
 
         stask.destroyed = true;
     }
@@ -315,10 +319,27 @@ impl Scheduler {
     fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> {
 
         let msg = if effort == DontTryTooHard {
-            // Do a cheap check that may miss messages
             self.message_queue.casual_pop()
         } else {
-            self.message_queue.pop()
+            // When popping our message queue, we could see an "inconsistent"
+            // state which means that we *should* be able to pop data, but we
+            // are unable to at this time. Our options are:
+            //
+            //  1. Spin waiting for data
+            //  2. Ignore this and pretend we didn't find a message
+            //
+            // If we choose route 1, then if the pusher in question is currently
+            // pre-empted, we're going to take up our entire time slice just
+            // spinning on this queue. If we choose route 2, then the pusher in
+            // question is still guaranteed to make a send() on its async
+            // handle, so we will guaranteed wake up and see its message at some
+            // point.
+            //
+            // I have chosen to take route #2.
+            match self.message_queue.pop() {
+                mpsc::Data(t) => Some(t),
+                mpsc::Empty | mpsc::Inconsistent => None
+            }
         };
 
         match msg {
@@ -793,7 +814,7 @@ impl Scheduler {
 
         return SchedHandle {
             remote: remote,
-            queue: self.message_queue.clone(),
+            queue: self.message_producer.clone(),
             sched_id: self.sched_id()
         };
     }
@@ -813,7 +834,7 @@ pub enum SchedMessage {
 
 pub struct SchedHandle {
     priv remote: ~RemoteCallback,
-    priv queue: MessageQueue<SchedMessage>,
+    priv queue: mpsc::Producer<SchedMessage, ()>,
     sched_id: uint
 }
 
@@ -915,17 +936,17 @@ fn new_sched_rng() -> XorShiftRng {
 #[cfg(test)]
 mod test {
     use prelude::*;
-    use rt::test::*;
-    use unstable::run_in_bare_thread;
+
     use borrow::to_uint;
-    use rt::sched::{Scheduler};
     use rt::deque::BufferPool;
-    use rt::thread::Thread;
-    use rt::task::{Task, Sched};
     use rt::basic;
+    use rt::sched::{Scheduler};
+    use rt::task::{Task, Sched};
+    use rt::test::*;
+    use rt::thread::Thread;
     use rt::util;
-    use option::{Some};
-    use rt::task::UnwindResult;
+    use task::TaskResult;
+    use unstable::run_in_bare_thread;
 
     #[test]
     fn trivial_run_in_newsched_task_test() {
@@ -1010,8 +1031,8 @@ mod test {
                 assert!(Task::on_appropriate_sched());
             };
 
-            let on_exit: proc(UnwindResult) = proc(exit_status) {
-                rtassert!(exit_status.is_success())
+            let on_exit: proc(TaskResult) = proc(exit_status) {
+                rtassert!(exit_status.is_ok())
             };
             task.death.on_exit = Some(on_exit);
 
@@ -1027,7 +1048,6 @@ mod test {
         use rt::sleeper_list::SleeperList;
         use rt::sched::Shutdown;
         use borrow;
-        use rt::comm::*;
 
         do run_in_bare_thread {
 
@@ -1089,7 +1109,7 @@ mod test {
             rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
 
             // Signal from the special task that we are done.
-            let (port, chan) = oneshot::<()>();
+            let (port, chan) = Chan::<()>::new();
 
             let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
                 rtdebug!("*about to submit task2*");
@@ -1160,10 +1180,8 @@ mod test {
 
     #[test]
     fn handle() {
-        use rt::comm::*;
-
         do run_in_bare_thread {
-            let (port, chan) = oneshot::<()>();
+            let (port, chan) = Chan::new();
 
             let thread_one = do Thread::start {
                 let chan = chan;
@@ -1230,7 +1248,6 @@ mod test {
 
     #[test]
     fn multithreading() {
-        use rt::comm::*;
         use num::Times;
         use vec::OwnedVector;
         use container::Container;
@@ -1238,7 +1255,7 @@ mod test {
         do run_in_mt_newsched_task {
             let mut ports = ~[];
             10.times(|| {
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
                 do spawntask_later {
                     chan.send(());
                 }
@@ -1253,21 +1270,17 @@ mod test {
 
      #[test]
     fn thread_ring() {
-        use rt::comm::*;
-        use comm::{GenericPort, GenericChan};
-
         do run_in_mt_newsched_task {
-            let (end_port, end_chan) = oneshot();
+            let (end_port, end_chan) = Chan::new();
 
             let n_tasks = 10;
             let token = 2000;
 
-            let (p, ch1) = stream();
-            let mut p = p;
+            let (mut p, ch1) = Chan::new();
             ch1.send((token, end_chan));
             let mut i = 2;
             while i <= n_tasks {
-                let (next_p, ch) = stream();
+                let (next_p, ch) = Chan::new();
                 let imm_i = i;
                 let imm_p = p;
                 do spawntask_random {
@@ -1276,23 +1289,23 @@ mod test {
                 p = next_p;
                 i += 1;
             }
-            let imm_p = p;
-            let imm_ch = ch1;
+            let p = p;
             do spawntask_random {
-                roundtrip(1, n_tasks, &imm_p, &imm_ch);
+                roundtrip(1, n_tasks, &p, &ch1);
             }
 
             end_port.recv();
         }
 
         fn roundtrip(id: int, n_tasks: int,
-                     p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
+                     p: &Port<(int, Chan<()>)>,
+                     ch: &Chan<(int, Chan<()>)>) {
             while (true) {
                 match p.recv() {
                     (1, end_chan) => {
-                                debug!("{}\n", id);
-                                end_chan.send(());
-                                return;
+                        debug!("{}\n", id);
+                        end_chan.send(());
+                        return;
                     }
                     (token, end_chan) => {
                         debug!("thread: {}   got token: {}", id, token);
@@ -1331,16 +1344,14 @@ mod test {
 
     // FIXME: #9407: xfail-test
     fn dont_starve_1() {
-        use rt::comm::oneshot;
-
         stress_factor().times(|| {
             do run_in_mt_newsched_task {
-                let (port, chan) = oneshot();
+                let (port, chan) = Chan::new();
 
                 // This task should not be able to starve the sender;
                 // The sender should get stolen to another thread.
                 do spawntask {
-                    while !port.peek() { }
+                    while port.try_recv().is_none() { }
                 }
 
                 chan.send(());
@@ -1350,17 +1361,15 @@ mod test {
 
     #[test]
     fn dont_starve_2() {
-        use rt::comm::oneshot;
-
         stress_factor().times(|| {
             do run_in_newsched_task {
-                let (port, chan) = oneshot();
-                let (_port2, chan2) = stream();
+                let (port, chan) = Chan::new();
+                let (_port2, chan2) = Chan::new();
 
                 // This task should not be able to starve the other task.
                 // The sends should eventually yield.
                 do spawntask {
-                    while !port.peek() {
+                    while port.try_recv().is_none() {
                         chan2.send(());
                     }
                 }
diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs
deleted file mode 100644
index 6cde0a1f216..00000000000
--- a/src/libstd/rt/select.rs
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Module for private, abstraction-leaking select traits. Wrapped in std::select.
-
-use rt::kill::BlockedTask;
-use rt::sched::Scheduler;
-use option::Option;
-
-pub trait SelectInner {
-    // Returns true if data was available.
-    fn optimistic_check(&mut self) -> bool;
-    // Returns true if data was available. If so, shall also wake() the task.
-    fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
-    // Returns true if data was available.
-    fn unblock_from(&mut self) -> bool;
-}
-
-pub trait SelectPortInner<T> {
-    fn recv_ready(self) -> Option<T>;
-}
-
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 2adc32f33fb..62e012f9f41 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -20,20 +20,22 @@ use prelude::*;
 use borrow;
 use cast::transmute;
 use cleanup;
+use io::Writer;
 use libc::{c_void, uintptr_t, c_char, size_t};
 use local_data;
 use option::{Option, Some, None};
 use rt::borrowck::BorrowRecord;
 use rt::borrowck;
+use rt::context;
 use rt::context::Context;
 use rt::env;
-use io::Writer;
 use rt::kill::Death;
 use rt::local::Local;
 use rt::logging::StdErrLogger;
 use rt::sched::{Scheduler, SchedHandle};
 use rt::stack::{StackSegment, StackPool};
 use send_str::SendStr;
+use task::TaskResult;
 use unstable::finally::Finally;
 use unstable::mutex::Mutex;
 
@@ -90,46 +92,17 @@ pub enum SchedHome {
 pub struct GarbageCollector;
 pub struct LocalStorage(Option<local_data::Map>);
 
-/// Represents the reason for the current unwinding process
-pub enum UnwindResult {
-    /// The task is ending successfully
-    Success,
-
-    /// The Task is failing with reason `~Any`
-    Failure(~Any),
-}
-
-impl UnwindResult {
-    /// Returns `true` if this `UnwindResult` is a failure
-    #[inline]
-    pub fn is_failure(&self) -> bool {
-        match *self {
-            Success => false,
-            Failure(_) => true
-        }
-    }
-
-    /// Returns `true` if this `UnwindResult` is a success
-    #[inline]
-    pub fn is_success(&self) -> bool {
-        match *self {
-            Success => true,
-            Failure(_) => false
-        }
-    }
-}
-
 pub struct Unwinder {
     unwinding: bool,
     cause: Option<~Any>
 }
 
 impl Unwinder {
-    fn to_unwind_result(&mut self) -> UnwindResult {
+    fn result(&mut self) -> TaskResult {
         if self.unwinding {
-            Failure(self.cause.take().unwrap())
+            Err(self.cause.take().unwrap())
         } else {
-            Success
+            Ok(())
         }
     }
 }
@@ -326,7 +299,7 @@ impl Task {
         // Cleanup the dynamic borrowck debugging info
         borrowck::clear_task_borrow_list();
 
-        self.death.collect_failure(self.unwinder.to_unwind_result());
+        self.death.collect_failure(self.unwinder.result());
         self.destroyed = true;
     }
 
@@ -691,6 +664,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> !
 mod test {
     use super::*;
     use rt::test::*;
+    use prelude::*;
 
     #[test]
     fn local_heap() {
@@ -744,22 +718,9 @@ mod test {
     }
 
     #[test]
-    fn comm_oneshot() {
-        use comm::*;
-
-        do run_in_newsched_task {
-            let (port, chan) = oneshot();
-            chan.send(10);
-            assert!(port.recv() == 10);
-        }
-    }
-
-    #[test]
     fn comm_stream() {
-        use comm::*;
-
         do run_in_newsched_task() {
-            let (port, chan) = stream();
+            let (port, chan) = Chan::new();
             chan.send(10);
             assert!(port.recv() == 10);
         }
@@ -767,11 +728,8 @@ mod test {
 
     #[test]
     fn comm_shared_chan() {
-        use comm::*;
-
         do run_in_newsched_task() {
-            let (port, chan) = stream();
-            let chan = SharedChan::new(chan);
+            let (port, chan) = SharedChan::new();
             chan.send(10);
             assert!(port.recv() == 10);
         }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index 96b80d11129..2b48b396c99 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -21,14 +21,14 @@ use rand::Rng;
 use rand;
 use result::{Result, Ok, Err};
 use rt::basic;
-use rt::comm::oneshot;
 use rt::deque::BufferPool;
+use comm::Chan;
 use rt::new_event_loop;
 use rt::sched::Scheduler;
 use rt::sleeper_list::SleeperList;
 use rt::task::Task;
-use rt::task::UnwindResult;
 use rt::thread::Thread;
+use task::TaskResult;
 use unstable::{run_in_bare_thread};
 use vec;
 use vec::{OwnedVector, MutableVector, ImmutableVector};
@@ -82,10 +82,10 @@ pub fn run_in_uv_task_core(f: proc()) {
     let mut sched = ~new_test_uv_sched();
     let exit_handle = sched.make_handle();
 
-    let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+    let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
         let mut exit_handle = exit_handle;
         exit_handle.send(Shutdown);
-        rtassert!(exit_status.is_success());
+        rtassert!(exit_status.is_ok());
     };
     let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
     task.death.on_exit = Some(on_exit);
@@ -99,10 +99,10 @@ pub fn run_in_newsched_task_core(f: proc()) {
     let mut sched = ~new_test_sched();
     let exit_handle = sched.make_handle();
 
-    let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+    let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
         let mut exit_handle = exit_handle;
         exit_handle.send(Shutdown);
-        rtassert!(exit_status.is_success());
+        rtassert!(exit_status.is_ok());
     };
     let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
     task.death.on_exit = Some(on_exit);
@@ -240,14 +240,14 @@ pub fn run_in_mt_newsched_task(f: proc()) {
         }
 
         let handles = handles;  // Work around not being able to capture mut
-        let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) {
+        let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) {
             // Tell schedulers to exit
             let mut handles = handles;
             for handle in handles.mut_iter() {
                 handle.send(Shutdown);
             }
 
-            rtassert!(exit_status.is_success());
+            rtassert!(exit_status.is_ok());
         };
         let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
                                             None,
@@ -311,8 +311,8 @@ pub fn spawntask_random(f: proc()) {
 
 pub fn spawntask_try(f: proc()) -> Result<(),()> {
 
-    let (port, chan) = oneshot();
-    let on_exit: proc(UnwindResult) = proc(exit_status) {
+    let (port, chan) = Chan::new();
+    let on_exit: proc(TaskResult) = proc(exit_status) {
         chan.send(exit_status)
     };
 
@@ -322,7 +322,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> {
     Scheduler::run_task(new_task);
 
     let exit_status = port.recv();
-    if exit_status.is_success() { Ok(()) } else { Err(()) }
+    if exit_status.is_ok() { Ok(()) } else { Err(()) }
 
 }
 
diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs
index 9031147f8b1..da02988c75c 100644
--- a/src/libstd/rt/thread.rs
+++ b/src/libstd/rt/thread.rs
@@ -21,42 +21,32 @@ use kinds::Send;
 use libc;
 use ops::Drop;
 use option::{Option, Some, None};
-use ptr;
 use uint;
 
-#[cfg(windows)]
-use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T,
-                                   LPVOID, DWORD, LPDWORD, HANDLE};
-
-#[cfg(windows)] type rust_thread = HANDLE;
-#[cfg(unix)] type rust_thread = libc::pthread_t;
-#[cfg(windows)] type rust_thread_return = DWORD;
-#[cfg(unix)] type rust_thread_return = *libc::c_void;
-
-type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return;
+type StartFn = extern "C" fn(*libc::c_void) -> imp::rust_thread_return;
 
 /// This struct represents a native thread's state. This is used to join on an
 /// existing thread created in the join-able state.
 pub struct Thread<T> {
-    priv native: rust_thread,
+    priv native: imp::rust_thread,
     priv joined: bool,
     priv packet: ~Option<T>,
 }
 
-static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024;
+static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024;
 
 // This is the starting point of rust os threads. The first thing we do
 // is make sure that we don't trigger __morestack (also why this has a
 // no_split_stack annotation), and then we extract the main function
 // and invoke it.
 #[no_split_stack]
-extern fn thread_start(main: *libc::c_void) -> rust_thread_return {
+extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return {
     use rt::context;
     unsafe {
         context::record_stack_bounds(0, uint::max_value);
         let f: ~proc() = cast::transmute(main);
         (*f)();
-        cast::transmute(0 as rust_thread_return)
+        cast::transmute(0 as imp::rust_thread_return)
     }
 }
 
@@ -88,7 +78,7 @@ impl Thread<()> {
             *cast::transmute::<&~Option<T>, **mut Option<T>>(&packet)
         };
         let main: proc() = proc() unsafe { *packet2 = Some(main()); };
-        let native = unsafe { native_thread_create(~main) };
+        let native = unsafe { imp::create(~main) };
 
         Thread {
             native: native,
@@ -105,10 +95,16 @@ impl Thread<()> {
     /// there are detached thread still running around.
     pub fn spawn(main: proc()) {
         unsafe {
-            let handle = native_thread_create(~main);
-            native_thread_detach(handle);
+            let handle = imp::create(~main);
+            imp::detach(handle);
         }
     }
+
+    /// Relinquishes the CPU slot that this OS-thread is currently using,
+    /// allowing another thread to run for awhile.
+    pub fn yield_now() {
+        unsafe { imp::yield_now(); }
+    }
 }
 
 impl<T: Send> Thread<T> {
@@ -116,7 +112,7 @@ impl<T: Send> Thread<T> {
     /// calculation.
     pub fn join(mut self) -> T {
         assert!(!self.joined);
-        unsafe { native_thread_join(self.native) };
+        unsafe { imp::join(self.native) };
         self.joined = true;
         assert!(self.packet.is_some());
         self.packet.take_unwrap()
@@ -129,80 +125,115 @@ impl<T: Send> Drop for Thread<T> {
         // This is required for correctness. If this is not done then the thread
         // would fill in a return box which no longer exists.
         if !self.joined {
-            unsafe { native_thread_join(self.native) };
+            unsafe { imp::join(self.native) };
         }
     }
 }
 
 #[cfg(windows)]
-unsafe fn native_thread_create(p: ~proc()) -> rust_thread {
-    let arg: *mut libc::c_void = cast::transmute(p);
-    CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start,
-                 arg, 0, ptr::mut_null())
-}
-
-#[cfg(windows)]
-unsafe fn native_thread_join(native: rust_thread) {
-    use libc::consts::os::extra::INFINITE;
-    WaitForSingleObject(native, INFINITE);
-}
+mod imp {
+    use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL,
+                                       LPVOID, DWORD, LPDWORD, HANDLE};
+    use libc;
+    use cast;
+    use super::DEFAULT_STACK_SIZE;
+
+    pub type rust_thread = HANDLE;
+    pub type rust_thread_return = DWORD;
+
+    pub unsafe fn create(p: ~proc()) -> rust_thread {
+        let arg: *mut libc::c_void = cast::transmute(p);
+        CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, super::thread_start,
+                     arg, 0, ptr::mut_null())
+    }
 
-#[cfg(windows)]
-unsafe fn native_thread_detach(native: rust_thread) {
-    assert!(libc::CloseHandle(native) != 0);
-}
+    pub unsafe fn join(native: rust_thread) {
+        use libc::consts::os::extra::INFINITE;
+        WaitForSingleObject(native, INFINITE);
+    }
 
-#[cfg(unix)]
-unsafe fn native_thread_create(p: ~proc()) -> rust_thread {
-    use unstable::intrinsics;
-    use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE;
+    pub unsafe fn detach(native: rust_thread) {
+        assert!(libc::CloseHandle(native) != 0);
+    }
 
-    let mut native: libc::pthread_t = intrinsics::uninit();
-    let mut attr: libc::pthread_attr_t = intrinsics::uninit();
-    assert_eq!(pthread_attr_init(&mut attr), 0);
-    assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0);
-    assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0);
+    pub unsafe fn yield_now() {
+        // This function will return 0 if there are no other threads to execute,
+        // but this also means that the yield was useless so this isn't really a
+        // case that needs to be worried about.
+        SwitchToThread();
+    }
 
-    let arg: *libc::c_void = cast::transmute(p);
-    assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0);
-    native
+    extern "system" {
+        fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
+                        dwStackSize: SIZE_T,
+                        lpStartAddress: super::StartFn,
+                        lpParameter: LPVOID,
+                        dwCreationFlags: DWORD,
+                        lpThreadId: LPDWORD) -> HANDLE;
+        fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
+        fn SwitchToThread() -> BOOL;
+    }
 }
 
 #[cfg(unix)]
-unsafe fn native_thread_join(native: rust_thread) {
-    assert_eq!(pthread_join(native, ptr::null()), 0);
-}
+mod imp {
+    use cast;
+    use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE;
+    use libc;
+    use ptr;
+    use super::DEFAULT_STACK_SIZE;
+    use unstable::intrinsics;
 
-#[cfg(unix)]
-fn native_thread_detach(native: rust_thread) {
-    unsafe { assert_eq!(pthread_detach(native), 0) }
-}
+    pub type rust_thread = libc::pthread_t;
+    pub type rust_thread_return = *libc::c_void;
+
+    pub unsafe fn create(p: ~proc()) -> rust_thread {
+        let mut native: libc::pthread_t = intrinsics::uninit();
+        let mut attr: libc::pthread_attr_t = intrinsics::uninit();
+        assert_eq!(pthread_attr_init(&mut attr), 0);
+        assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0);
+        assert_eq!(pthread_attr_setdetachstate(&mut attr,
+                                               PTHREAD_CREATE_JOINABLE), 0);
+
+        let arg: *libc::c_void = cast::transmute(p);
+        assert_eq!(pthread_create(&mut native, &attr,
+                                  super::thread_start, arg), 0);
+        native
+    }
 
-#[cfg(windows)]
-extern "system" {
-    fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
-                    dwStackSize: SIZE_T,
-                    lpStartAddress: StartFn,
-                    lpParameter: LPVOID,
-                    dwCreationFlags: DWORD,
-                    lpThreadId: LPDWORD) -> HANDLE;
-    fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
-}
+    pub unsafe fn join(native: rust_thread) {
+        assert_eq!(pthread_join(native, ptr::null()), 0);
+    }
 
-#[cfg(unix)]
-extern {
-    fn pthread_create(native: *mut libc::pthread_t,
-                      attr: *libc::pthread_attr_t,
-                      f: StartFn,
-                      value: *libc::c_void) -> libc::c_int;
-    fn pthread_join(native: libc::pthread_t,
-                    value: **libc::c_void) -> libc::c_int;
-    fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int;
-    fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t,
-                                 stack_size: libc::size_t) -> libc::c_int;
-    fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t,
-                                   state: libc::c_int) -> libc::c_int;
-    fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
+    pub unsafe fn detach(native: rust_thread) {
+        assert_eq!(pthread_detach(native), 0);
+    }
+
+    #[cfg(target_os = "macos")]
+    pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); }
+
+    #[cfg(not(target_os = "macos"))]
+    pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); }
+
+    extern {
+        fn pthread_create(native: *mut libc::pthread_t,
+                          attr: *libc::pthread_attr_t,
+                          f: super::StartFn,
+                          value: *libc::c_void) -> libc::c_int;
+        fn pthread_join(native: libc::pthread_t,
+                        value: **libc::c_void) -> libc::c_int;
+        fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int;
+        fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t,
+                                     stack_size: libc::size_t) -> libc::c_int;
+        fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t,
+                                       state: libc::c_int) -> libc::c_int;
+        fn pthread_detach(thread: libc::pthread_t) -> libc::c_int;
+
+        #[cfg(target_os = "macos")]
+        fn sched_yield() -> libc::c_int;
+        #[cfg(not(target_os = "macos"))]
+        fn pthread_yield() -> libc::c_int;
+    }
 }
 
 #[cfg(test)]