about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libstd/macros.rs19
-rw-r--r--src/libstd/rt/comm.rs341
-rw-r--r--src/libstd/rt/io/extensions.rs5
-rw-r--r--src/libstd/rt/join_latch.rs645
-rw-r--r--src/libstd/rt/local.rs45
-rw-r--r--src/libstd/rt/message_queue.rs3
-rw-r--r--src/libstd/rt/metrics.rs98
-rw-r--r--src/libstd/rt/mod.rs39
-rw-r--r--src/libstd/rt/rtio.rs11
-rw-r--r--src/libstd/rt/sched.rs1058
-rw-r--r--src/libstd/rt/sleeper_list.rs59
-rw-r--r--src/libstd/rt/task.rs60
-rw-r--r--src/libstd/rt/test.rs359
-rw-r--r--src/libstd/rt/tube.rs36
-rw-r--r--src/libstd/rt/uv/async.rs105
-rw-r--r--src/libstd/rt/uv/idle.rs62
-rw-r--r--src/libstd/rt/uv/mod.rs63
-rw-r--r--src/libstd/rt/uv/uvio.rs120
-rw-r--r--src/libstd/rt/uvll.rs443
-rw-r--r--src/libstd/sys.rs6
-rw-r--r--src/libstd/task/mod.rs15
-rw-r--r--src/libstd/task/spawn.rs10
-rw-r--r--src/libstd/unstable/lang.rs4
-rw-r--r--src/libstd/unstable/sync.rs69
-rw-r--r--src/rt/rust_builtin.cpp7
-rw-r--r--src/rt/rust_env.cpp6
-rw-r--r--src/rt/rustrt.def.in1
27 files changed, 2892 insertions, 797 deletions
diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs
index fda48b6ffb7..8d221fe6a1b 100644
--- a/src/libstd/macros.rs
+++ b/src/libstd/macros.rs
@@ -38,16 +38,19 @@ macro_rules! rtassert (
     } )
 )
 
+
+// The do_abort function was originally inside the abort macro, but
+// this was ICEing the compiler so it has been moved outside. Now this
+// seems to work?
+#[allow(missing_doc)]
+pub fn do_abort() -> ! {
+    unsafe { ::libc::abort(); }
+}
+
 macro_rules! abort(
     ($( $msg:expr),+) => ( {
         rtdebug!($($msg),+);
-
-        do_abort();
-
-        // NB: This is in a fn to avoid putting the `unsafe` block in a macro,
-        // which causes spurious 'unnecessary unsafe block' warnings.
-        fn do_abort() -> ! {
-            unsafe { ::libc::abort(); }
-        }
+        ::macros::do_abort();
     } )
 )
+
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 5d85e292861..82e6d44fe62 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -22,10 +22,12 @@ use ops::Drop;
 use kinds::Owned;
 use rt::sched::{Scheduler, Coroutine};
 use rt::local::Local;
-use unstable::intrinsics::{atomic_xchg, atomic_load};
+use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
+use unstable::sync::UnsafeAtomicRcBox;
 use util::Void;
 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
 use cell::Cell;
+use clone::Clone;
 
 /// A combined refcount / ~Task pointer.
 ///
@@ -34,14 +36,14 @@ use cell::Cell;
 /// * 2 - both endpoints are alive
 /// * 1 - either the sender or the receiver is dead, determined by context
 /// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
-type State = int;
+type State = uint;
 
 static STATE_BOTH: State = 2;
 static STATE_ONE: State = 1;
 
 /// The heap-allocated structure shared between two endpoints.
 struct Packet<T> {
-    state: State,
+    state: AtomicUint,
     payload: Option<T>,
 }
 
@@ -70,7 +72,7 @@ pub struct PortOneHack<T> {
 
 pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
     let packet: ~Packet<T> = ~Packet {
-        state: STATE_BOTH,
+        state: AtomicUint::new(STATE_BOTH),
         payload: None
     };
 
@@ -114,12 +116,20 @@ impl<T> ChanOne<T> {
             // reordering of the payload write. This also issues an
             // acquire barrier that keeps the subsequent access of the
             // ~Task pointer from being reordered.
-            let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
+            let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
             match oldstate {
                 STATE_BOTH => {
                     // Port is not waiting yet. Nothing to do
+                    do Local::borrow::<Scheduler, ()> |sched| {
+                        rtdebug!("non-rendezvous send");
+                        sched.metrics.non_rendezvous_sends += 1;
+                    }
                 }
                 STATE_ONE => {
+                    do Local::borrow::<Scheduler, ()> |sched| {
+                        rtdebug!("rendezvous send");
+                        sched.metrics.rendezvous_sends += 1;
+                    }
                     // Port has closed. Need to clean up.
                     let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
                     recvr_active = false;
@@ -127,7 +137,9 @@ impl<T> ChanOne<T> {
                 task_as_state => {
                     // Port is blocked. Wake it up.
                     let recvr: ~Coroutine = cast::transmute(task_as_state);
-                    let sched = Local::take::<Scheduler>();
+                    let mut sched = Local::take::<Scheduler>();
+                    rtdebug!("rendezvous send");
+                    sched.metrics.rendezvous_sends += 1;
                     sched.schedule_task(recvr);
                 }
             }
@@ -158,23 +170,30 @@ impl<T> PortOne<T> {
 
         // Switch to the scheduler to put the ~Task into the Packet state.
         let sched = Local::take::<Scheduler>();
-        do sched.deschedule_running_task_and_then |task| {
+        do sched.deschedule_running_task_and_then |sched, task| {
             unsafe {
                 // Atomically swap the task pointer into the Packet state, issuing
                 // an acquire barrier to prevent reordering of the subsequent read
                 // of the payload. Also issues a release barrier to prevent reordering
                 // of any previous writes to the task structure.
                 let task_as_state: State = cast::transmute(task);
-                let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
+                let oldstate = (*packet).state.swap(task_as_state, SeqCst);
                 match oldstate {
                     STATE_BOTH => {
                         // Data has not been sent. Now we're blocked.
+                        rtdebug!("non-rendezvous recv");
+                        sched.metrics.non_rendezvous_recvs += 1;
                     }
                     STATE_ONE => {
+                        rtdebug!("rendezvous recv");
+                        sched.metrics.rendezvous_recvs += 1;
+
                         // Channel is closed. Switch back and check the data.
+                        // NB: We have to drop back into the scheduler event loop here
+                        // instead of switching immediately back or we could end up
+                        // triggering infinite recursion on the scheduler's stack.
                         let task: ~Coroutine = cast::transmute(task_as_state);
-                        let sched = Local::take::<Scheduler>();
-                        sched.resume_task_immediately(task);
+                        sched.enqueue_task(task);
                     }
                     _ => util::unreachable()
                 }
@@ -210,7 +229,7 @@ impl<T> Peekable<T> for PortOne<T> {
     fn peek(&self) -> bool {
         unsafe {
             let packet: *mut Packet<T> = self.inner.packet();
-            let oldstate = atomic_load(&mut (*packet).state);
+            let oldstate = (*packet).state.load(SeqCst);
             match oldstate {
                 STATE_BOTH => false,
                 STATE_ONE => (*packet).payload.is_some(),
@@ -227,7 +246,7 @@ impl<T> Drop for ChanOneHack<T> {
 
         unsafe {
             let this = cast::transmute_mut(self);
-            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
             match oldstate {
                 STATE_BOTH => {
                     // Port still active. It will destroy the Packet.
@@ -254,7 +273,7 @@ impl<T> Drop for PortOneHack<T> {
 
         unsafe {
             let this = cast::transmute_mut(self);
-            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
             match oldstate {
                 STATE_BOTH => {
                     // Chan still active. It will destroy the packet.
@@ -295,16 +314,19 @@ struct StreamPayload<T> {
     next: PortOne<StreamPayload<T>>
 }
 
+type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
+type StreamPortOne<T> = PortOne<StreamPayload<T>>;
+
 /// A channel with unbounded size.
 pub struct Chan<T> {
     // FIXME #5372. Using Cell because we don't take &mut self
-    next: Cell<ChanOne<StreamPayload<T>>>
+    next: Cell<StreamChanOne<T>>
 }
 
 /// An port with unbounded size.
 pub struct Port<T> {
     // FIXME #5372. Using Cell because we don't take &mut self
-    next: Cell<PortOne<StreamPayload<T>>>
+    next: Cell<StreamPortOne<T>>
 }
 
 pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
@@ -357,6 +379,148 @@ impl<T> Peekable<T> for Port<T> {
     }
 }
 
+pub struct SharedChan<T> {
+    // Just like Chan, but a shared AtomicOption instead of Cell
+    priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
+}
+
+impl<T> SharedChan<T> {
+    pub fn new(chan: Chan<T>) -> SharedChan<T> {
+        let next = chan.next.take();
+        let next = AtomicOption::new(~next);
+        SharedChan { next: UnsafeAtomicRcBox::new(next) }
+    }
+}
+
+impl<T: Owned> GenericChan<T> for SharedChan<T> {
+    fn send(&self, val: T) {
+        self.try_send(val);
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
+    #[cfg(stage0)] // odd type checking errors
+    fn try_send(&self, _val: T) -> bool {
+        fail!()
+    }
+
+    #[cfg(not(stage0))]
+    fn try_send(&self, val: T) -> bool {
+        unsafe {
+            let (next_pone, next_cone) = oneshot();
+            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
+            cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
+        }
+    }
+}
+
+impl<T> Clone for SharedChan<T> {
+    fn clone(&self) -> SharedChan<T> {
+        SharedChan {
+            next: self.next.clone()
+        }
+    }
+}
+
+pub struct SharedPort<T> {
+    // The next port on which we will receive the next port on which we will receive T
+    priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
+}
+
+impl<T> SharedPort<T> {
+    pub fn new(port: Port<T>) -> SharedPort<T> {
+        // Put the data port into a new link pipe
+        let next_data_port = port.next.take();
+        let (next_link_port, next_link_chan) = oneshot();
+        next_link_chan.send(next_data_port);
+        let next_link = AtomicOption::new(~next_link_port);
+        SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
+    }
+}
+
+impl<T: Owned> GenericPort<T> for SharedPort<T> {
+    fn recv(&self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on a closed channel");
+            }
+        }
+    }
+
+    #[cfg(stage0)] // odd type checking errors
+    fn try_recv(&self) -> Option<T> {
+        fail!()
+    }
+
+    #[cfg(not(stage0))]
+    fn try_recv(&self) -> Option<T> {
+        unsafe {
+            let (next_link_port, next_link_chan) = oneshot();
+            let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
+            let link_port = link_port.unwrap();
+            let data_port = link_port.recv();
+            let (next_data_port, res) = match data_port.try_recv() {
+                Some(StreamPayload { val, next }) => {
+                    (next, Some(val))
+                }
+                None => {
+                    let (next_data_port, _) = oneshot();
+                    (next_data_port, None)
+                }
+            };
+            next_link_chan.send(next_data_port);
+            return res;
+        }
+    }
+}
+
+impl<T> Clone for SharedPort<T> {
+    fn clone(&self) -> SharedPort<T> {
+        SharedPort {
+            next_link: self.next_link.clone()
+        }
+    }
+}
+
+// XXX: Need better name
+type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
+
+pub fn megapipe<T: Owned>() -> MegaPipe<T> {
+    let (port, chan) = stream();
+    (SharedPort::new(port), SharedChan::new(chan))
+}
+
+impl<T: Owned> GenericChan<T> for MegaPipe<T> {
+    fn send(&self, val: T) {
+        match *self {
+            (_, ref c) => c.send(val)
+        }
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> {
+    fn try_send(&self, val: T) -> bool {
+        match *self {
+            (_, ref c) => c.try_send(val)
+        }
+    }
+}
+
+impl<T: Owned> GenericPort<T> for MegaPipe<T> {
+    fn recv(&self) -> T {
+        match *self {
+            (ref p, _) => p.recv()
+        }
+    }
+
+    fn try_recv(&self) -> Option<T> {
+        match *self {
+            (ref p, _) => p.try_recv()
+        }
+    }
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
@@ -584,7 +748,7 @@ mod test {
     #[test]
     fn stream_send_recv_stress() {
         for stress_factor().times {
-            do run_in_newsched_task {
+            do run_in_mt_newsched_task {
                 let (port, chan) = stream::<~int>();
 
                 send(chan, 0);
@@ -594,18 +758,18 @@ mod test {
                     if i == 10 { return }
 
                     let chan_cell = Cell::new(chan);
-                    let _thread = do spawntask_thread {
+                    do spawntask_random {
                         let chan = chan_cell.take();
                         chan.send(~i);
                         send(chan, i + 1);
-                    };
+                    }
                 }
 
                 fn recv(port: Port<~int>, i: int) {
                     if i == 10 { return }
 
                     let port_cell = Cell::new(port);
-                    let _thread = do spawntask_thread {
+                    do spawntask_random {
                         let port = port_cell.take();
                         assert!(port.recv() == ~i);
                         recv(port, i + 1);
@@ -614,4 +778,143 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        do run_in_newsched_task {
+            let (port, chan) = stream();
+            for 10000.times { chan.send(()) }
+            for 10000.times { port.recv() }
+        }
+    }
+
+    #[test]
+    fn shared_chan_stress() {
+        do run_in_mt_newsched_task {
+            let (port, chan) = stream();
+            let chan = SharedChan::new(chan);
+            let total = stress_factor() + 100;
+            for total.times {
+                let chan_clone = chan.clone();
+                do spawntask_random {
+                    chan_clone.send(());
+                }
+            }
+
+            for total.times {
+                port.recv();
+            }
+        }
+    }
+
+    #[test]
+    fn shared_port_stress() {
+        do run_in_mt_newsched_task {
+            // XXX: Removing these type annotations causes an ICE
+            let (end_port, end_chan) = stream::<()>();
+            let (port, chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let port = SharedPort::new(port);
+            let total = stress_factor() + 100;
+            for total.times {
+                let end_chan_clone = end_chan.clone();
+                let port_clone = port.clone();
+                do spawntask_random {
+                    port_clone.recv();
+                    end_chan_clone.send(());
+                }
+            }
+
+            for total.times {
+                chan.send(());
+            }
+
+            for total.times {
+                end_port.recv();
+            }
+        }
+    }
+
+    #[test]
+    fn shared_port_close_simple() {
+        do run_in_mt_newsched_task {
+            let (port, chan) = stream::<()>();
+            let port = SharedPort::new(port);
+            { let _chan = chan; }
+            assert!(port.try_recv().is_none());
+        }
+    }
+
+    #[test]
+    fn shared_port_close() {
+        do run_in_mt_newsched_task {
+            let (end_port, end_chan) = stream::<bool>();
+            let (port, chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let port = SharedPort::new(port);
+            let chan = SharedChan::new(chan);
+            let send_total = 10;
+            let recv_total = 20;
+            do spawntask_random {
+                for send_total.times {
+                    let chan_clone = chan.clone();
+                    do spawntask_random {
+                        chan_clone.send(());
+                    }
+                }
+            }
+            let end_chan_clone = end_chan.clone();
+            do spawntask_random {
+                for recv_total.times {
+                    let port_clone = port.clone();
+                    let end_chan_clone = end_chan_clone.clone();
+                    do spawntask_random {
+                        let recvd = port_clone.try_recv().is_some();
+                        end_chan_clone.send(recvd);
+                    }
+                }
+            }
+
+            let mut recvd = 0;
+            for recv_total.times {
+                recvd += if end_port.recv() { 1 } else { 0 };
+            }
+
+            assert!(recvd == send_total);
+        }
+    }
+
+    #[test]
+    fn megapipe_stress() {
+        use rand;
+        use rand::RngUtil;
+
+        do run_in_mt_newsched_task {
+            let (end_port, end_chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let pipe = megapipe();
+            let total = stress_factor() + 10;
+            let mut rng = rand::rng();
+            for total.times {
+                let msgs = rng.gen_uint_range(0, 10);
+                let pipe_clone = pipe.clone();
+                let end_chan_clone = end_chan.clone();
+                do spawntask_random {
+                    for msgs.times {
+                        pipe_clone.send(());
+                    }
+                    for msgs.times {
+                        pipe_clone.recv();
+                    }
+                }
+
+                end_chan_clone.send(());
+            }
+
+            for total.times {
+                end_port.recv();
+            }
+        }
+    }
 }
diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs
index 727ab13a4f6..c7c3eadbe21 100644
--- a/src/libstd/rt/io/extensions.rs
+++ b/src/libstd/rt/io/extensions.rs
@@ -749,8 +749,6 @@ mod test {
     #[should_fail]
     #[ignore(cfg(windows))]
     fn push_bytes_fail_reset_len() {
-        use unstable::finally::Finally;
-
         // push_bytes unsafely sets the vector length. This is testing that
         // upon failure the length is reset correctly.
         let mut reader = MockReader::new();
@@ -772,7 +770,8 @@ mod test {
             reader.push_bytes(&mut *buf, 4);
         }).finally {
             // NB: Using rtassert here to trigger abort on failure since this is a should_fail test
-            rtassert!(*buf == ~[8, 9, 10]);
+            // FIXME: #7049 This fails because buf is still borrowed
+            //rtassert!(*buf == ~[8, 9, 10]);
         }
     }
 
diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs
new file mode 100644
index 00000000000..ad5cf2eb378
--- /dev/null
+++ b/src/libstd/rt/join_latch.rs
@@ -0,0 +1,645 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! The JoinLatch is a concurrent type that establishes the task
+//! tree and propagates failure.
+//!
+//! Each task gets a JoinLatch that is derived from the JoinLatch
+//! of its parent task. Every latch must be released by either calling
+//! the non-blocking `release` method or the task-blocking `wait` method.
+//! Releasing a latch does not complete until all of its child latches
+//! complete.
+//!
+//! Latches carry a `success` flag that is set to `false` during task
+//! failure and is propagated both from children to parents and parents
+//! to children. The status af this flag may be queried for the purposes
+//! of linked failure.
+//!
+//! In addition to failure propagation the task tree serves to keep the
+//! default task schedulers alive. The runtime only sends the shutdown
+//! message to schedulers once the root task exits.
+//!
+//! Under this scheme tasks that terminate before their children become
+//! 'zombies' since they may not exit until their children do. Zombie
+//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks
+//! themselves allowed to terminate.
+//!
+//! XXX: Propagate flag from parents to children.
+//! XXX: Tombstoning actually doesn't work.
+//! XXX: This could probably be done in a way that doesn't leak tombstones
+//!      longer than the life of the child tasks.
+
+use comm::{GenericPort, Peekable, GenericSmartChan};
+use clone::Clone;
+use container::Container;
+use option::{Option, Some, None};
+use ops::Drop;
+use rt::comm::{SharedChan, Port, stream};
+use rt::local::Local;
+use rt::sched::Scheduler;
+use unstable::atomics::{AtomicUint, SeqCst};
+use util;
+use vec::OwnedVector;
+
+// FIXME #7026: Would prefer this to be an enum
+pub struct JoinLatch {
+    priv parent: Option<ParentLink>,
+    priv child: Option<ChildLink>,
+    closed: bool,
+}
+
+// Shared between parents and all their children.
+struct SharedState {
+    /// Reference count, held by a parent and all children.
+    count: AtomicUint,
+    success: bool
+}
+
+struct ParentLink {
+    shared: *mut SharedState,
+    // For communicating with the parent.
+    chan: SharedChan<Message>
+}
+
+struct ChildLink {
+    shared: ~SharedState,
+    // For receiving from children.
+    port: Port<Message>,
+    chan: SharedChan<Message>,
+    // Prevents dropping the child SharedState reference counts multiple times.
+    dropped_child: bool
+}
+
+// Messages from child latches to parent.
+enum Message {
+    Tombstone(~JoinLatch),
+    ChildrenTerminated
+}
+
+impl JoinLatch {
+    pub fn new_root() -> ~JoinLatch {
+        let this = ~JoinLatch {
+            parent: None,
+            child: None,
+            closed: false
+        };
+        rtdebug!("new root latch %x", this.id());
+        return this;
+    }
+
+    fn id(&self) -> uint {
+        unsafe { ::cast::transmute(&*self) }
+    }
+
+    pub fn new_child(&mut self) -> ~JoinLatch {
+        rtassert!(!self.closed);
+
+        if self.child.is_none() {
+            // This is the first time spawning a child
+            let shared = ~SharedState {
+                count: AtomicUint::new(1),
+                success: true
+            };
+            let (port, chan) = stream();
+            let chan = SharedChan::new(chan);
+            let child = ChildLink {
+                shared: shared,
+                port: port,
+                chan: chan,
+                dropped_child: false
+            };
+            self.child = Some(child);
+        }
+
+        let child_link: &mut ChildLink = self.child.get_mut_ref();
+        let shared_state: *mut SharedState = &mut *child_link.shared;
+
+        child_link.shared.count.fetch_add(1, SeqCst);
+
+        let child = ~JoinLatch {
+            parent: Some(ParentLink {
+                shared: shared_state,
+                chan: child_link.chan.clone()
+            }),
+            child: None,
+            closed: false
+        };
+        rtdebug!("NEW child latch %x", child.id());
+        return child;
+    }
+
+    pub fn release(~self, local_success: bool) {
+        // XXX: This should not block, but there's a bug in the below
+        // code that I can't figure out.
+        self.wait(local_success);
+    }
+
+    // XXX: Should not require ~self
+    fn release_broken(~self, local_success: bool) {
+        rtassert!(!self.closed);
+
+        rtdebug!("releasing %x", self.id());
+
+        let id = self.id();
+        let _ = id; // XXX: `id` is only used in debug statements so appears unused
+        let mut this = self;
+        let mut child_success = true;
+        let mut children_done = false;
+
+        if this.child.is_some() {
+            rtdebug!("releasing children");
+            let child_link: &mut ChildLink = this.child.get_mut_ref();
+            let shared: &mut SharedState = &mut *child_link.shared;
+
+            if !child_link.dropped_child {
+                let last_count = shared.count.fetch_sub(1, SeqCst);
+                rtdebug!("child count before sub %u %x", last_count, id);
+                if last_count == 1 {
+                    assert!(child_link.chan.try_send(ChildrenTerminated));
+                }
+                child_link.dropped_child = true;
+            }
+
+            // Wait for messages from children
+            let mut tombstones = ~[];
+            loop {
+                if child_link.port.peek() {
+                    match child_link.port.recv() {
+                        Tombstone(t) => {
+                            tombstones.push(t);
+                        },
+                        ChildrenTerminated => {
+                            children_done = true;
+                            break;
+                        }
+                    }
+                } else {
+                    break
+                }
+            }
+
+            rtdebug!("releasing %u tombstones %x", tombstones.len(), id);
+
+            // Try to release the tombstones. Those that still have
+            // outstanding will be re-enqueued.  When this task's
+            // parents release their latch we'll end up back here
+            // trying them again.
+            while !tombstones.is_empty() {
+                tombstones.pop().release(true);
+            }
+
+            if children_done {
+                let count = shared.count.load(SeqCst);
+                assert!(count == 0);
+                // self_count is the acquire-read barrier
+                child_success = shared.success;
+            }
+        } else {
+            children_done = true;
+        }
+
+        let total_success = local_success && child_success;
+
+        rtassert!(this.parent.is_some());
+
+        unsafe {
+            {
+                let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                let shared: *mut SharedState = parent_link.shared;
+
+                if !total_success {
+                    // parent_count is the write-wait barrier
+                    (*shared).success = false;
+                }
+            }
+
+            if children_done {
+                rtdebug!("children done");
+                do Local::borrow::<Scheduler, ()> |sched| {
+                    sched.metrics.release_tombstone += 1;
+                }
+                {
+                    rtdebug!("RELEASING parent %x", id);
+                    let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                    let shared: *mut SharedState = parent_link.shared;
+                    let last_count = (*shared).count.fetch_sub(1, SeqCst);
+                    rtdebug!("count before parent sub %u %x", last_count, id);
+                    if last_count == 1 {
+                        assert!(parent_link.chan.try_send(ChildrenTerminated));
+                    }
+                }
+                this.closed = true;
+                util::ignore(this);
+            } else {
+                rtdebug!("children not done");
+                rtdebug!("TOMBSTONING %x", id);
+                do Local::borrow::<Scheduler, ()> |sched| {
+                    sched.metrics.release_no_tombstone += 1;
+                }
+                let chan = {
+                    let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                    parent_link.chan.clone()
+                };
+                assert!(chan.try_send(Tombstone(this)));
+            }
+        }
+    }
+
+    // XXX: Should not require ~self
+    pub fn wait(~self, local_success: bool) -> bool {
+        rtassert!(!self.closed);
+
+        rtdebug!("WAITING %x", self.id());
+
+        let mut this = self;
+        let mut child_success = true;
+
+        if this.child.is_some() {
+            rtdebug!("waiting for children");
+            let child_link: &mut ChildLink = this.child.get_mut_ref();
+            let shared: &mut SharedState = &mut *child_link.shared;
+
+            if !child_link.dropped_child {
+                let last_count = shared.count.fetch_sub(1, SeqCst);
+                rtdebug!("child count before sub %u", last_count);
+                if last_count == 1 {
+                    assert!(child_link.chan.try_send(ChildrenTerminated));
+                }
+                child_link.dropped_child = true;
+            }
+
+            // Wait for messages from children
+            loop {
+                match child_link.port.recv() {
+                    Tombstone(t) => {
+                        t.wait(true);
+                    }
+                    ChildrenTerminated => break
+                }
+            }
+
+            let count = shared.count.load(SeqCst);
+            if count != 0 { ::io::println(fmt!("%u", count)); }
+            assert!(count == 0);
+            // self_count is the acquire-read barrier
+            child_success = shared.success;
+        }
+
+        let total_success = local_success && child_success;
+
+        if this.parent.is_some() {
+            rtdebug!("releasing parent");
+            unsafe {
+                let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                let shared: *mut SharedState = parent_link.shared;
+
+                if !total_success {
+                    // parent_count is the write-wait barrier
+                    (*shared).success = false;
+                }
+
+                let last_count = (*shared).count.fetch_sub(1, SeqCst);
+                rtdebug!("count before parent sub %u", last_count);
+                if last_count == 1 {
+                    assert!(parent_link.chan.try_send(ChildrenTerminated));
+                }
+            }
+        }
+
+        this.closed = true;
+        util::ignore(this);
+
+        return total_success;
+    }
+}
+
+impl Drop for JoinLatch {
+    fn finalize(&self) {
+        rtdebug!("DESTROYING %x", self.id());
+        rtassert!(self.closed);
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use cell::Cell;
+    use container::Container;
+    use iter::Times;
+    use old_iter::BaseIter;
+    use rt::test::*;
+    use rand;
+    use rand::RngUtil;
+    use vec::{CopyableVector, ImmutableVector};
+
+    #[test]
+    fn success_immediately() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let child_latch = latch.new_child();
+            let child_latch = Cell::new(child_latch);
+            do spawntask_immediately {
+                let child_latch = child_latch.take();
+                assert!(child_latch.wait(true));
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn success_later() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let child_latch = latch.new_child();
+            let child_latch = Cell::new(child_latch);
+            do spawntask_later {
+                let child_latch = child_latch.take();
+                assert!(child_latch.wait(true));
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_success() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            for 10.times {
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_random {
+                    let child_latch = child_latch.take();
+                    assert!(child_latch.wait(true));
+                }
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_failure() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let spawn = |status| {
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_random {
+                    let child_latch = child_latch.take();
+                    child_latch.wait(status);
+                }
+            };
+
+            for 10.times { spawn(true) }
+            spawn(false);
+            for 10.times { spawn(true) }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_multi_level_success() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            fn child(latch: &mut JoinLatch, i: int) {
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_random {
+                    let mut child_latch = child_latch.take();
+                    if i != 0 {
+                        child(&mut *child_latch, i - 1);
+                        child_latch.wait(true);
+                    } else {
+                        child_latch.wait(true);
+                    }
+                }
+            }
+
+            child(&mut *latch, 10);
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_multi_level_failure() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            fn child(latch: &mut JoinLatch, i: int) {
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_random {
+                    let mut child_latch = child_latch.take();
+                    if i != 0 {
+                        child(&mut *child_latch, i - 1);
+                        child_latch.wait(false);
+                    } else {
+                        child_latch.wait(true);
+                    }
+                }
+            }
+
+            child(&mut *latch, 10);
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell::new(child_latch);
+
+            do spawntask_immediately {
+                let latch = child_latch.take();
+                latch.release(false);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_tombstone() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell::new(child_latch);
+
+            do spawntask_immediately {
+                let mut latch = child_latch.take();
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_later {
+                    let latch = child_latch.take();
+                    latch.release(false);
+                }
+                latch.release(true);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_no_tombstone() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell::new(child_latch);
+
+            do spawntask_later {
+                let mut latch = child_latch.take();
+                let child_latch = latch.new_child();
+                let child_latch = Cell::new(child_latch);
+                do spawntask_immediately {
+                    let latch = child_latch.take();
+                    latch.release(false);
+                }
+                latch.release(true);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_tombstone_stress() {
+        fn rand_orders() -> ~[bool] {
+            let mut v = ~[false,.. 5];
+            v[0] = true;
+            let mut rng = rand::rng();
+            return rng.shuffle(v);
+        }
+
+        fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) {
+            if orders.is_empty() {
+                return (~[], ~[]);
+            } else if orders.len() <= 2 {
+                return (orders.to_owned(), ~[]);
+            }
+            let mut rng = rand::rng();
+            let n = rng.gen_uint_range(1, orders.len());
+            let first = orders.slice(0, n).to_owned();
+            let last = orders.slice(n, orders.len()).to_owned();
+            assert!(first.len() + last.len() == orders.len());
+            return (first, last);
+        }
+
+        for stress_factor().times {
+            do run_in_newsched_task {
+                fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) {
+                    let (my_orders, remaining_orders) = split_orders(orders);
+                    rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders));
+                    rtdebug!("depth: %u", depth);
+                    let mut remaining_orders = remaining_orders;
+                    let mut num = 0;
+                    for my_orders.each |&order| {
+                        let child_latch = latch.new_child();
+                        let child_latch = Cell::new(child_latch);
+                        let (child_orders, remaining) = split_orders(remaining_orders);
+                        rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining));
+                        remaining_orders = remaining;
+                        let child_orders = Cell::new(child_orders);
+                        let child_num = num;
+                        let _ = child_num; // XXX unused except in rtdebug!
+                        do spawntask_random {
+                            rtdebug!("depth %u num %u", depth, child_num);
+                            let mut child_latch = child_latch.take();
+                            let child_orders = child_orders.take();
+                            doit(&mut *child_latch, child_orders, depth + 1);
+                            child_latch.release(order);
+                        }
+
+                        num += 1;
+                    }
+                }
+
+                let mut latch = JoinLatch::new_root();
+                let orders = rand_orders();
+                rtdebug!("orders: %?", orders);
+
+                doit(&mut *latch, orders, 0);
+
+                assert!(!latch.wait(true));
+            }
+        }
+    }
+
+    #[test]
+    fn whateverman() {
+        struct Order {
+            immediate: bool,
+            succeed: bool,
+            orders: ~[Order]
+        }
+        fn next(latch: &mut JoinLatch, orders: ~[Order]) {
+            for orders.each |order| {
+                let suborders = copy order.orders;
+                let child_latch = Cell::new(latch.new_child());
+                let succeed = order.succeed;
+                if order.immediate {
+                    do spawntask_immediately {
+                        let mut child_latch = child_latch.take();
+                        next(&mut *child_latch, copy suborders);
+                        rtdebug!("immediate releasing");
+                        child_latch.release(succeed);
+                    }
+                } else {
+                    do spawntask_later {
+                        let mut child_latch = child_latch.take();
+                        next(&mut *child_latch, copy suborders);
+                        rtdebug!("later releasing");
+                        child_latch.release(succeed);
+                    }
+                }
+            }
+        }
+
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let orders = ~[ Order { // 0 0
+                immediate: true,
+                succeed: true,
+                orders: ~[ Order { // 1 0
+                    immediate: true,
+                    succeed: false,
+                    orders: ~[ Order { // 2 0
+                        immediate: false,
+                        succeed: false,
+                        orders: ~[ Order { // 3 0
+                            immediate: true,
+                            succeed: false,
+                            orders: ~[]
+                        }, Order { // 3 1
+                            immediate: false,
+                            succeed: false,
+                            orders: ~[]
+                        }]
+                    }]
+                }]
+            }];
+
+            next(&mut *latch, orders);
+            assert!(!latch.wait(true));
+        }
+    }
+}
diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs
index 313123c38b5..6e0fbda5ec9 100644
--- a/src/libstd/rt/local.rs
+++ b/src/libstd/rt/local.rs
@@ -18,7 +18,7 @@ pub trait Local {
     fn put(value: ~Self);
     fn take() -> ~Self;
     fn exists() -> bool;
-    fn borrow(f: &fn(&mut Self));
+    fn borrow<T>(f: &fn(&mut Self) -> T) -> T;
     unsafe fn unsafe_borrow() -> *mut Self;
     unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
 }
@@ -27,7 +27,20 @@ impl Local for Scheduler {
     fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
     fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
     fn exists() -> bool { local_ptr::exists() }
-    fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
+    fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T {
+        let mut res: Option<T> = None;
+        let res_ptr: *mut Option<T> = &mut res;
+        unsafe {
+            do local_ptr::borrow |sched| {
+                let result = f(sched);
+                *res_ptr = Some(result);
+            }
+        }
+        match res {
+            Some(r) => { r }
+            None => abort!("function failed!")
+        }
+    }
     unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
     unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
 }
@@ -36,8 +49,8 @@ impl Local for Task {
     fn put(_value: ~Task) { abort!("unimpl") }
     fn take() -> ~Task { abort!("unimpl") }
     fn exists() -> bool { abort!("unimpl") }
-    fn borrow(f: &fn(&mut Task)) {
-        do Local::borrow::<Scheduler> |sched| {
+    fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
+        do Local::borrow::<Scheduler, T> |sched| {
             match sched.current_task {
                 Some(~ref mut task) => {
                     f(&mut *task.task)
@@ -74,7 +87,7 @@ impl Local for IoFactoryObject {
     fn put(_value: ~IoFactoryObject) { abort!("unimpl") }
     fn take() -> ~IoFactoryObject { abort!("unimpl") }
     fn exists() -> bool { abort!("unimpl") }
-    fn borrow(_f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
+    fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { abort!("unimpl") }
     unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
         let sched = Local::unsafe_borrow::<Scheduler>();
         let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
@@ -85,34 +98,46 @@ impl Local for IoFactoryObject {
 
 #[cfg(test)]
 mod test {
+    use rt::test::*;
     use rt::sched::Scheduler;
-    use rt::uv::uvio::UvEventLoop;
     use super::*;
 
     #[test]
     fn thread_local_scheduler_smoke_test() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
     }
 
     #[test]
     fn thread_local_scheduler_two_instances() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
     }
 
     #[test]
     fn borrow_smoke_test() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         unsafe {
             let _scheduler: *mut Scheduler = Local::unsafe_borrow();
         }
         let _scheduler: ~Scheduler = Local::take();
     }
+
+    #[test]
+    fn borrow_with_return() {
+        let scheduler = ~new_test_uv_sched();
+        Local::put(scheduler);
+        let res = do Local::borrow::<Scheduler,bool> |_sched| {
+            true
+        };
+        assert!(res)
+        let _scheduler: ~Scheduler = Local::take();
+    }
+
 }
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
index 5b60543344d..734be808797 100644
--- a/src/libstd/rt/message_queue.rs
+++ b/src/libstd/rt/message_queue.rs
@@ -8,6 +8,9 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+//! A concurrent queue that supports multiple producers and a
+//! single consumer.
+
 use container::Container;
 use kinds::Owned;
 use vec::OwnedVector;
diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs
new file mode 100644
index 00000000000..b0c0fa5d708
--- /dev/null
+++ b/src/libstd/rt/metrics.rs
@@ -0,0 +1,98 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use to_str::ToStr;
+
+pub struct SchedMetrics {
+    // The number of times executing `run_sched_once`.
+    turns: uint,
+    // The number of turns that received a message.
+    messages_received: uint,
+    // The number of turns that ran a task from the queue.
+    tasks_resumed_from_queue: uint,
+    // The number of turns that found no work to perform.
+    wasted_turns: uint,
+    // The number of times the scheduler went to sleep.
+    sleepy_times: uint,
+    // Context switches from the scheduler into a task.
+    context_switches_sched_to_task: uint,
+    // Context switches from a task into the scheduler.
+    context_switches_task_to_sched: uint,
+    // Context switches from a task to a task.
+    context_switches_task_to_task: uint,
+    // Message sends that unblock the receiver
+    rendezvous_sends: uint,
+    // Message sends that do not unblock the receiver
+    non_rendezvous_sends: uint,
+    // Message receives that do not block the receiver
+    rendezvous_recvs: uint,
+    // Message receives that block the receiver
+    non_rendezvous_recvs: uint,
+    // JoinLatch releases that create tombstones
+    release_tombstone: uint,
+    // JoinLatch releases that do not create tombstones
+    release_no_tombstone: uint,
+}
+
+impl SchedMetrics {
+    pub fn new() -> SchedMetrics {
+        SchedMetrics {
+            turns: 0,
+            messages_received: 0,
+            tasks_resumed_from_queue: 0,
+            wasted_turns: 0,
+            sleepy_times: 0,
+            context_switches_sched_to_task: 0,
+            context_switches_task_to_sched: 0,
+            context_switches_task_to_task: 0,
+            rendezvous_sends: 0,
+            non_rendezvous_sends: 0,
+            rendezvous_recvs: 0,
+            non_rendezvous_recvs: 0,
+            release_tombstone: 0,
+            release_no_tombstone: 0
+        }
+    }
+}
+
+impl ToStr for SchedMetrics {
+    fn to_str(&self) -> ~str {
+        fmt!("turns: %u\n\
+              messages_received: %u\n\
+              tasks_resumed_from_queue: %u\n\
+              wasted_turns: %u\n\
+              sleepy_times: %u\n\
+              context_switches_sched_to_task: %u\n\
+              context_switches_task_to_sched: %u\n\
+              context_switches_task_to_task: %u\n\
+              rendezvous_sends: %u\n\
+              non_rendezvous_sends: %u\n\
+              rendezvous_recvs: %u\n\
+              non_rendezvous_recvs: %u\n\
+              release_tombstone: %u\n\
+              release_no_tombstone: %u\n\
+              ",
+             self.turns,
+             self.messages_received,
+             self.tasks_resumed_from_queue,
+             self.wasted_turns,
+             self.sleepy_times,
+             self.context_switches_sched_to_task,
+             self.context_switches_task_to_sched,
+             self.context_switches_task_to_task,
+             self.rendezvous_sends,
+             self.non_rendezvous_sends,
+             self.rendezvous_recvs,
+             self.non_rendezvous_recvs,
+             self.release_tombstone,
+             self.release_no_tombstone
+        )
+    }
+}
\ No newline at end of file
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index f9f433b9416..1724361cabc 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -55,7 +55,11 @@ Several modules in `core` are clients of `rt`:
 */
 
 #[doc(hidden)];
+#[deny(unused_imports)];
+#[deny(unused_mut)];
+#[deny(unused_variable)];
 
+use cell::Cell;
 use ptr::RawPtr;
 
 /// The global (exchange) heap.
@@ -88,6 +92,9 @@ mod work_queue;
 /// A parallel queue.
 mod message_queue;
 
+/// A parallel data structure for tracking sleeping schedulers.
+mod sleeper_list;
+
 /// Stack segments and caching.
 mod stack;
 
@@ -127,6 +134,11 @@ pub mod local_ptr;
 /// Bindings to pthread/windows thread-local storage.
 pub mod thread_local_storage;
 
+/// A concurrent data structure with which parent tasks wait on child tasks.
+pub mod join_latch;
+
+pub mod metrics;
+
 
 /// Set up a default runtime configuration, given compiler-supplied arguments.
 ///
@@ -145,13 +157,18 @@ pub mod thread_local_storage;
 pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
 
     use self::sched::{Scheduler, Coroutine};
+    use self::work_queue::WorkQueue;
     use self::uv::uvio::UvEventLoop;
+    use self::sleeper_list::SleeperList;
 
     init(crate_map);
 
     let loop_ = ~UvEventLoop::new();
-    let mut sched = ~Scheduler::new(loop_);
-    let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
+    let work_queue = WorkQueue::new();
+    let sleepers = SleeperList::new();
+    let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
+    sched.no_sleep = true;
+    let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main);
 
     sched.enqueue_task(main_task);
     sched.run();
@@ -194,8 +211,8 @@ pub fn context() -> RuntimeContext {
         return OldTaskContext;
     } else {
         if Local::exists::<Scheduler>() {
-            let context = ::cell::Cell::new_empty();
-            do Local::borrow::<Scheduler> |sched| {
+            let context = Cell::new_empty();
+            do Local::borrow::<Scheduler, ()> |sched| {
                 if sched.in_task_context() {
                     context.put_back(TaskContext);
                 } else {
@@ -218,23 +235,19 @@ pub fn context() -> RuntimeContext {
 fn test_context() {
     use unstable::run_in_bare_thread;
     use self::sched::{Scheduler, Coroutine};
-    use rt::uv::uvio::UvEventLoop;
-    use cell::Cell;
     use rt::local::Local;
+    use rt::test::new_test_uv_sched;
 
     assert_eq!(context(), OldTaskContext);
     do run_in_bare_thread {
         assert_eq!(context(), GlobalContext);
-        let mut sched = ~UvEventLoop::new_scheduler();
-        let task = ~do Coroutine::new(&mut sched.stack_pool) {
+        let mut sched = ~new_test_uv_sched();
+        let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
             assert_eq!(context(), TaskContext);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then() |task| {
+            do sched.deschedule_running_task_and_then() |sched, task| {
                 assert_eq!(context(), SchedulerContext);
-                let task = Cell::new(task);
-                do Local::borrow::<Scheduler> |sched| {
-                    sched.enqueue_task(task.take());
-                }
+                sched.enqueue_task(task);
             }
         };
         sched.enqueue_task(task);
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 4b5eda22ff5..fa657555f3a 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -18,6 +18,7 @@ use rt::uv::uvio;
 // XXX: ~object doesn't work currently so these are some placeholder
 // types to use instead
 pub type EventLoopObject = uvio::UvEventLoop;
+pub type RemoteCallbackObject = uvio::UvRemoteCallback;
 pub type IoFactoryObject = uvio::UvIoFactory;
 pub type RtioTcpStreamObject = uvio::UvTcpStream;
 pub type RtioTcpListenerObject = uvio::UvTcpListener;
@@ -26,10 +27,20 @@ pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
     fn callback_ms(&mut self, ms: u64, ~fn());
+    fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
     /// The asynchronous I/O services. Not all event loops may provide one
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
 }
 
+pub trait RemoteCallback {
+    /// Trigger the remote callback. Note that the number of times the callback
+    /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
+    /// the callback will be called at least once, but multiple callbacks may be coalesced
+    /// and callbacks may be called more often requested. Destruction also triggers the
+    /// callback.
+    fn fire(&mut self);
+}
+
 pub trait IoFactory {
     fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
     fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 929b44f79b5..be57247d514 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -12,21 +12,53 @@ use option::*;
 use sys;
 use cast::transmute;
 use cell::Cell;
+use clone::Clone;
 
+use super::sleeper_list::SleeperList;
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool, StackSegment};
-use super::rtio::{EventLoop, EventLoopObject};
+use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
 use super::context::Context;
 use super::task::Task;
+use super::message_queue::MessageQueue;
 use rt::local_ptr;
 use rt::local::Local;
+use rt::rtio::RemoteCallback;
+use rt::metrics::SchedMetrics;
+
+//use to_str::ToStr;
+
+/// To allow for using pointers as scheduler ids
+use borrow::{to_uint};
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
 /// thread local storage and the running task is owned by the
 /// scheduler.
+///
+/// XXX: This creates too many callbacks to run_sched_once, resulting
+/// in too much allocation and too many events.
 pub struct Scheduler {
+    /// A queue of available work. Under a work-stealing policy there
+    /// is one per Scheduler.
     priv work_queue: WorkQueue<~Coroutine>,
+    /// The queue of incoming messages from other schedulers.
+    /// These are enqueued by SchedHandles after which a remote callback
+    /// is triggered to handle the message.
+    priv message_queue: MessageQueue<SchedMessage>,
+    /// A shared list of sleeping schedulers. We'll use this to wake
+    /// up schedulers when pushing work onto the work queue.
+    priv sleeper_list: SleeperList,
+    /// Indicates that we have previously pushed a handle onto the
+    /// SleeperList but have not yet received the Wake message.
+    /// Being `true` does not necessarily mean that the scheduler is
+    /// not active since there are multiple event sources that may
+    /// wake the scheduler. It just prevents the scheduler from pushing
+    /// multiple handles onto the sleeper list.
+    priv sleepy: bool,
+    /// A flag to indicate we've received the shutdown message and should
+    /// no longer try to go to sleep, but exit instead.
+    no_sleep: bool,
     stack_pool: StackPool,
     /// The event loop used to drive the scheduler and perform I/O
     event_loop: ~EventLoopObject,
@@ -37,19 +69,40 @@ pub struct Scheduler {
     current_task: Option<~Coroutine>,
     /// An action performed after a context switch on behalf of the
     /// code running before the context switch
-    priv cleanup_job: Option<CleanupJob>
+    priv cleanup_job: Option<CleanupJob>,
+    metrics: SchedMetrics,
+    /// Should this scheduler run any task, or only pinned tasks?
+    run_anything: bool
 }
 
-// XXX: Some hacks to put a &fn in Scheduler without borrowck
-// complaining
-type UnsafeTaskReceiver = sys::Closure;
-trait ClosureConverter {
-    fn from_fn(&fn(~Coroutine)) -> Self;
-    fn to_fn(self) -> &fn(~Coroutine);
+pub struct SchedHandle {
+    priv remote: ~RemoteCallbackObject,
+    priv queue: MessageQueue<SchedMessage>,
+    sched_id: uint
 }
-impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
-    fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
+
+pub struct Coroutine {
+    /// The segment of stack on which the task is currently running or,
+    /// if the task is blocked, on which the task will resume execution
+    priv current_stack_segment: StackSegment,
+    /// These are always valid when the task is not running, unless
+    /// the task is dead
+    priv saved_context: Context,
+    /// The heap, GC, unwinding, local storage, logging
+    task: ~Task,
+}
+
+// A scheduler home is either a handle to the home scheduler, or an
+// explicit "AnySched".
+pub enum SchedHome {
+    AnySched,
+    Sched(SchedHandle)
+}
+
+pub enum SchedMessage {
+    Wake,
+    Shutdown,
+    PinnedTask(~Coroutine)
 }
 
 enum CleanupJob {
@@ -60,18 +113,39 @@ enum CleanupJob {
 impl Scheduler {
     pub fn in_task_context(&self) -> bool { self.current_task.is_some() }
 
-    pub fn new(event_loop: ~EventLoopObject) -> Scheduler {
+    pub fn sched_id(&self) -> uint { to_uint(self) }
+
+    pub fn new(event_loop: ~EventLoopObject,
+               work_queue: WorkQueue<~Coroutine>,
+               sleeper_list: SleeperList)
+        -> Scheduler {
+
+        Scheduler::new_special(event_loop, work_queue, sleeper_list, true)
+
+    }
+
+    pub fn new_special(event_loop: ~EventLoopObject,
+                       work_queue: WorkQueue<~Coroutine>,
+                       sleeper_list: SleeperList,
+                       run_anything: bool)
+        -> Scheduler {
 
         // Lazily initialize the runtime TLS key
         local_ptr::init_tls_key();
 
         Scheduler {
+            sleeper_list: sleeper_list,
+            message_queue: MessageQueue::new(),
+            sleepy: false,
+            no_sleep: false,
             event_loop: event_loop,
-            work_queue: WorkQueue::new(),
+            work_queue: work_queue,
             stack_pool: StackPool::new(),
             saved_context: Context::empty(),
             current_task: None,
-            cleanup_job: None
+            cleanup_job: None,
+            metrics: SchedMetrics::new(),
+            run_anything: run_anything
         }
     }
 
@@ -84,6 +158,11 @@ impl Scheduler {
 
         let mut self_sched = self;
 
+        // Always run through the scheduler loop at least once so that
+        // we enter the sleep state and can then be woken up by other
+        // schedulers.
+        self_sched.event_loop.callback(Scheduler::run_sched_once);
+
         unsafe {
             let event_loop: *mut ~EventLoopObject = {
                 let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
@@ -96,43 +175,266 @@ impl Scheduler {
             (*event_loop).run();
         }
 
+        rtdebug!("run taking sched");
         let sched = Local::take::<Scheduler>();
-        assert!(sched.work_queue.is_empty());
+        // XXX: Reenable this once we're using a per-task queue. With a shared
+        // queue this is not true
+        //assert!(sched.work_queue.is_empty());
+        rtdebug!("scheduler metrics: %s\n", {
+            use to_str::ToStr;
+            sched.metrics.to_str()
+        });
         return sched;
     }
 
+    fn run_sched_once() {
+
+        let mut sched = Local::take::<Scheduler>();
+        sched.metrics.turns += 1;
+
+        // First, check the message queue for instructions.
+        // XXX: perf. Check for messages without atomics.
+        // It's ok if we miss messages occasionally, as long as
+        // we sync and check again before sleeping.
+        if sched.interpret_message_queue() {
+            // We performed a scheduling action. There may be other work
+            // to do yet, so let's try again later.
+            rtdebug!("run_sched_once, interpret_message_queue taking sched");
+            let mut sched = Local::take::<Scheduler>();
+            sched.metrics.messages_received += 1;
+            sched.event_loop.callback(Scheduler::run_sched_once);
+            Local::put(sched);
+            return;
+        }
+
+        // Now, look in the work queue for tasks to run
+        rtdebug!("run_sched_once taking");
+        let sched = Local::take::<Scheduler>();
+        if sched.resume_task_from_queue() {
+            // We performed a scheduling action. There may be other work
+            // to do yet, so let's try again later.
+            let mut sched = Local::take::<Scheduler>();
+            sched.metrics.tasks_resumed_from_queue += 1;
+            sched.event_loop.callback(Scheduler::run_sched_once);
+            Local::put(sched);
+            return;
+        }
+
+        // If we got here then there was no work to do.
+        // Generate a SchedHandle and push it to the sleeper list so
+        // somebody can wake us up later.
+        rtdebug!("no work to do");
+        let mut sched = Local::take::<Scheduler>();
+        sched.metrics.wasted_turns += 1;
+        if !sched.sleepy && !sched.no_sleep {
+            rtdebug!("sleeping");
+            sched.metrics.sleepy_times += 1;
+            sched.sleepy = true;
+            let handle = sched.make_handle();
+            sched.sleeper_list.push(handle);
+        } else {
+            rtdebug!("not sleeping");
+        }
+        Local::put(sched);
+    }
+
+    pub fn make_handle(&mut self) -> SchedHandle {
+        let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
+
+        return SchedHandle {
+            remote: remote,
+            queue: self.message_queue.clone(),
+            sched_id: self.sched_id()
+        };
+    }
+
     /// Schedule a task to be executed later.
     ///
-    /// Pushes the task onto the work stealing queue and tells the event loop
-    /// to run it later. Always use this instead of pushing to the work queue
-    /// directly.
+    /// Pushes the task onto the work stealing queue and tells the
+    /// event loop to run it later. Always use this instead of pushing
+    /// to the work queue directly.
     pub fn enqueue_task(&mut self, task: ~Coroutine) {
-        self.work_queue.push(task);
-        self.event_loop.callback(resume_task_from_queue);
 
-        fn resume_task_from_queue() {
-            let scheduler = Local::take::<Scheduler>();
-            scheduler.resume_task_from_queue();
-        }
+        // We don't want to queue tasks that belong on other threads,
+        // so we send them home at enqueue time.
+
+        // The borrow checker doesn't like our disassembly of the
+        // Coroutine struct and partial use and mutation of the
+        // fields. So completely disassemble here and stop using?
+
+        // XXX perf: I think we might be able to shuffle this code to
+        // only destruct when we need to.
+
+        rtdebug!("a task was queued on: %u", self.sched_id());
+
+        let this = self;
+
+        // We push the task onto our local queue clone.
+        this.work_queue.push(task);
+        this.event_loop.callback(Scheduler::run_sched_once);
+
+        // We've made work available. Notify a
+        // sleeping scheduler.
+
+        // XXX: perf. Check for a sleeper without
+        // synchronizing memory.  It's not critical
+        // that we always find it.
+
+        // XXX: perf. If there's a sleeper then we
+        // might as well just send it the task
+        // directly instead of pushing it to the
+        // queue. That is essentially the intent here
+        // and it is less work.
+        match this.sleeper_list.pop() {
+            Some(handle) => {
+                let mut handle = handle;
+                handle.send(Wake)
+            }
+            None => { (/* pass */) }
+        };
     }
 
     // * Scheduler-context operations
 
-    pub fn resume_task_from_queue(~self) {
+    fn interpret_message_queue(~self) -> bool {
         assert!(!self.in_task_context());
 
-        rtdebug!("looking in work queue for task to schedule");
+        rtdebug!("looking for scheduler messages");
 
         let mut this = self;
-        match this.work_queue.pop() {
-            Some(task) => {
-                rtdebug!("resuming task from work queue");
+        match this.message_queue.pop() {
+            Some(PinnedTask(task)) => {
+                rtdebug!("recv BiasedTask message in sched: %u",
+                         this.sched_id());
+                let mut task = task;
+                task.task.home = Some(Sched(this.make_handle()));
                 this.resume_task_immediately(task);
+                return true;
+            }
+
+            Some(Wake) => {
+                rtdebug!("recv Wake message");
+                this.sleepy = false;
+                Local::put(this);
+                return true;
+            }
+            Some(Shutdown) => {
+                rtdebug!("recv Shutdown message");
+                if this.sleepy {
+                    // There may be an outstanding handle on the
+                    // sleeper list.  Pop them all to make sure that's
+                    // not the case.
+                    loop {
+                        match this.sleeper_list.pop() {
+                            Some(handle) => {
+                                let mut handle = handle;
+                                handle.send(Wake);
+                            }
+                            None => break
+                        }
+                    }
+                }
+                // No more sleeping. After there are no outstanding
+                // event loop references we will shut down.
+                this.no_sleep = true;
+                this.sleepy = false;
+                Local::put(this);
+                return true;
             }
             None => {
-                rtdebug!("no tasks in queue");
                 Local::put(this);
+                return false;
+            }
+        }
+    }
+
+    /// Given an input Coroutine sends it back to its home scheduler.
+    fn send_task_home(task: ~Coroutine) {
+        let mut task = task;
+        let mut home = task.task.home.swap_unwrap();
+        match home {
+            Sched(ref mut home_handle) => {
+                home_handle.send(PinnedTask(task));
+            }
+            AnySched => {
+                abort!("error: cannot send anysched task home");
+            }
+        }
+    }
+
+    // Resume a task from the queue - but also take into account that
+    // it might not belong here.
+    fn resume_task_from_queue(~self) -> bool {
+        assert!(!self.in_task_context());
+
+        rtdebug!("looking in work queue for task to schedule");
+        let mut this = self;
+
+        // The borrow checker imposes the possibly absurd requirement
+        // that we split this into two match expressions. This is due
+        // to the inspection of the internal bits of task, as that
+        // can't be in scope when we act on task.
+        match this.work_queue.pop() {
+            Some(task) => {
+                let action_id = {
+                    let home = &task.task.home;
+                    match home {
+                        &Some(Sched(ref home_handle))
+                        if home_handle.sched_id != this.sched_id() => {
+                            0
+                        }
+                        &Some(AnySched) if this.run_anything => {
+                            1
+                        }
+                        &Some(AnySched) => {
+                            2
+                        }
+                        &Some(Sched(_)) => {
+                            3
+                        }
+                        &None => {
+                            4
+                        }
+                    }
+                };
+
+                match action_id {
+                    0 => {
+                        rtdebug!("sending task home");
+                        Scheduler::send_task_home(task);
+                        Local::put(this);
+                        return false;
+                    }
+                    1 => {
+                        rtdebug!("resuming now");
+                        this.resume_task_immediately(task);
+                        return true;
+                    }
+                    2 => {
+                        rtdebug!("re-queueing")
+                        this.enqueue_task(task);
+                        Local::put(this);
+                        return false;
+                    }
+                    3 => {
+                        rtdebug!("resuming now");
+                        this.resume_task_immediately(task);
+                        return true;
+                    }
+                    4 => {
+                        abort!("task home was None!");
+                    }
+                    _ => {
+                        abort!("literally, you should not be here");
+                    }
+                }
             }
+
+            None => {
+               rtdebug!("no tasks in queue");
+               Local::put(this);
+               return false;
+           }
         }
     }
 
@@ -145,35 +447,40 @@ impl Scheduler {
 
         rtdebug!("ending running task");
 
-        do self.deschedule_running_task_and_then |dead_task| {
+        do self.deschedule_running_task_and_then |sched, dead_task| {
             let dead_task = Cell::new(dead_task);
-            do Local::borrow::<Scheduler> |sched| {
-                dead_task.take().recycle(&mut sched.stack_pool);
-            }
+            dead_task.take().recycle(&mut sched.stack_pool);
         }
 
         abort!("control reached end of task");
     }
 
-    pub fn schedule_new_task(~self, task: ~Coroutine) {
+    pub fn schedule_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
-            let last_task = Cell::new(last_task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(last_task.take());
-            }
-        }
-    }
+        // is the task home?
+        let is_home = task.is_home_no_tls(&self);
 
-    pub fn schedule_task(~self, task: ~Coroutine) {
-        assert!(self.in_task_context());
+        // does the task have a home?
+        let homed = task.homed();
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
-            let last_task = Cell::new(last_task);
-            do Local::borrow::<Scheduler> |sched| {
+        let mut this = self;
+
+        if is_home || (!homed && this.run_anything) {
+            // here we know we are home, execute now OR we know we
+            // aren't homed, and that this sched doesn't care
+            do this.switch_running_tasks_and_then(task) |sched, last_task| {
+                let last_task = Cell::new(last_task);
                 sched.enqueue_task(last_task.take());
             }
+        } else if !homed && !this.run_anything {
+            // the task isn't homed, but it can't be run here
+            this.enqueue_task(task);
+            Local::put(this);
+        } else {
+            // task isn't home, so don't run it here, send it home
+            Scheduler::send_task_home(task);
+            Local::put(this);
         }
     }
 
@@ -184,6 +491,7 @@ impl Scheduler {
         assert!(!this.in_task_context());
 
         rtdebug!("scheduling a task");
+        this.metrics.context_switches_sched_to_task += 1;
 
         // Store the task in the scheduler so it can be grabbed later
         this.current_task = Some(task);
@@ -217,15 +525,21 @@ impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    pub fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
+    ///
+    /// This passes a Scheduler pointer to the fn after the context switch
+    /// in order to prevent that fn from performing further scheduling operations.
+    /// Doing further scheduling could easily result in infinite recursion.
+    pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("blocking task");
+        this.metrics.context_switches_task_to_sched += 1;
 
         unsafe {
             let blocked_task = this.current_task.swap_unwrap();
-            let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
+            let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine),
+                                            &fn(&mut Scheduler, ~Coroutine)>(f);
             let f_opaque = ClosureConverter::from_fn(f_fake_region);
             this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
         }
@@ -247,16 +561,19 @@ impl Scheduler {
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    pub fn switch_running_tasks_and_then(~self,
-                                         next_task: ~Coroutine,
-                                         f: &fn(~Coroutine)) {
+    pub fn switch_running_tasks_and_then(~self, next_task: ~Coroutine,
+                                         f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("switching tasks");
+        this.metrics.context_switches_task_to_task += 1;
 
         let old_running_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
+        let f_fake_region = unsafe {
+            transmute::<&fn(&mut Scheduler, ~Coroutine),
+                        &fn(&mut Scheduler, ~Coroutine)>(f)
+        };
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
         this.current_task = Some(next_task);
@@ -293,7 +610,7 @@ impl Scheduler {
         let cleanup_job = self.cleanup_job.swap_unwrap();
         match cleanup_job {
             DoNothing => { }
-            GiveTask(task, f) => (f.to_fn())(task)
+            GiveTask(task, f) => (f.to_fn())(self, task)
         }
     }
 
@@ -337,40 +654,119 @@ impl Scheduler {
     }
 }
 
-static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
-
-pub struct Coroutine {
-    /// The segment of stack on which the task is currently running or,
-    /// if the task is blocked, on which the task will resume execution
-    priv current_stack_segment: StackSegment,
-    /// These are always valid when the task is not running, unless
-    /// the task is dead
-    priv saved_context: Context,
-    /// The heap, GC, unwinding, local storage, logging
-    task: ~Task
+impl SchedHandle {
+    pub fn send(&mut self, msg: SchedMessage) {
+        self.queue.push(msg);
+        self.remote.fire();
+    }
 }
 
 impl Coroutine {
-    pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
-        Coroutine::with_task(stack_pool, ~Task::new(), start)
+
+    /// This function checks that a coroutine is running "home".
+    pub fn is_home(&self) -> bool {
+        rtdebug!("checking if coroutine is home");
+        do Local::borrow::<Scheduler,bool> |sched| {
+            match self.task.home {
+                Some(AnySched) => { false }
+                Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
+                    *id == sched.sched_id()
+                }
+                None => { abort!("error: homeless task!"); }
+            }
+        }
     }
 
-    pub fn with_task(stack_pool: &mut StackPool,
-                     task: ~Task,
-                     start: ~fn()) -> Coroutine {
+    /// Without access to self, but with access to the "expected home
+    /// id", see if we are home.
+    fn is_home_using_id(id: uint) -> bool {
+        rtdebug!("checking if coroutine is home using id");
+        do Local::borrow::<Scheduler,bool> |sched| {
+            if sched.sched_id() == id {
+                true
+            } else {
+                false
+            }
+        }
+    }
+
+    /// Check if this coroutine has a home
+    fn homed(&self) -> bool {
+        rtdebug!("checking if this coroutine has a home");
+        match self.task.home {
+            Some(AnySched) => { false }
+            Some(Sched(_)) => { true }
+            None => { abort!("error: homeless task!");
+                    }
+        }
+    }
+
+    /// A version of is_home that does not need to use TLS, it instead
+    /// takes local scheduler as a parameter.
+    fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
+        rtdebug!("checking if coroutine is home without tls");
+        match self.task.home {
+            Some(AnySched) => { true }
+            Some(Sched(SchedHandle { sched_id: ref id, _})) => {
+                *id == sched.sched_id()
+            }
+            None => { abort!("error: homeless task!"); }
+        }
+    }
+
+    /// Check TLS for the scheduler to see if we are on a special
+    /// scheduler.
+    pub fn on_special() -> bool {
+        rtdebug!("checking if coroutine is executing on special sched");
+        do Local::borrow::<Scheduler,bool>() |sched| {
+            !sched.run_anything
+        }
+    }
+
+    // Created new variants of "new" that takes a home scheduler
+    // parameter. The original with_task now calls with_task_homed
+    // using the AnySched paramter.
+
+    pub fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine {
+        Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home)
+    }
+
+    pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
+        Coroutine::with_task(stack_pool, ~Task::new_root(), start)
+    }
+
+    pub fn with_task_homed(stack_pool: &mut StackPool,
+                           task: ~Task,
+                           start: ~fn(),
+                           home: SchedHome) -> Coroutine {
+
+        static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack
+
         let start = Coroutine::build_start_wrapper(start);
         let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
         // NB: Context holds a pointer to that ~fn
         let initial_context = Context::new(start, &mut stack);
-        return Coroutine {
+        let mut crt = Coroutine {
             current_stack_segment: stack,
             saved_context: initial_context,
-            task: task
+            task: task,
         };
+        crt.task.home = Some(home);
+        return crt;
+    }
+
+    pub fn with_task(stack_pool: &mut StackPool,
+                 task: ~Task,
+                 start: ~fn()) -> Coroutine {
+        Coroutine::with_task_homed(stack_pool,
+                                   task,
+                                   start,
+                                   AnySched)
     }
 
     fn build_start_wrapper(start: ~fn()) -> ~fn() {
         // XXX: The old code didn't have this extra allocation
+        let start_cell = Cell::new(start);
         let wrapper: ~fn() = || {
             // This is the first code to execute after the initial
             // context switch to the task. The previous context may
@@ -381,8 +777,23 @@ impl Coroutine {
 
                 let sched = Local::unsafe_borrow::<Scheduler>();
                 let task = (*sched).current_task.get_mut_ref();
-                // FIXME #6141: shouldn't neet to put `start()` in another closure
-                task.task.run(||start());
+                // FIXME #6141: shouldn't neet to put `start()` in
+                // another closure
+                let start_cell = Cell::new(start_cell.take());
+                do task.task.run {
+                    // N.B. Removing `start` from the start wrapper
+                    // closure by emptying a cell is critical for
+                    // correctness. The ~Task pointer, and in turn the
+                    // closure used to initialize the first call
+                    // frame, is destroyed in scheduler context, not
+                    // task context.  So any captured closures must
+                    // not contain user-definable dtors that expect to
+                    // be in task context. By moving `start` out of
+                    // the closure, all the user code goes out of
+                    // scope while the task is still running.
+                    let start = start_cell.take();
+                    start();
+                };
             }
 
             let sched = Local::take::<Scheduler>();
@@ -401,16 +812,328 @@ impl Coroutine {
     }
 }
 
+// XXX: Some hacks to put a &fn in Scheduler without borrowck
+// complaining
+type UnsafeTaskReceiver = sys::Closure;
+trait ClosureConverter {
+    fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self;
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine);
+}
+impl ClosureConverter for UnsafeTaskReceiver {
+    fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } }
+}
+
 #[cfg(test)]
 mod test {
     use int;
     use cell::Cell;
-    use rt::uv::uvio::UvEventLoop;
+    use iterator::IteratorUtil;
     use unstable::run_in_bare_thread;
     use task::spawn;
     use rt::local::Local;
     use rt::test::*;
     use super::*;
+    use rt::thread::Thread;
+    use ptr::to_uint;
+    use vec::MutableVector;
+
+    // Confirm that a sched_id actually is the uint form of the
+    // pointer to the scheduler struct.
+
+    #[test]
+    fn simple_sched_id_test() {
+        do run_in_bare_thread {
+            let sched = ~new_test_uv_sched();
+            assert!(to_uint(sched) == sched.sched_id());
+        }
+    }
+
+    // Compare two scheduler ids that are different, this should never
+    // fail but may catch a mistake someday.
+
+    #[test]
+    fn compare_sched_id_test() {
+        do run_in_bare_thread {
+            let sched_one = ~new_test_uv_sched();
+            let sched_two = ~new_test_uv_sched();
+            assert!(sched_one.sched_id() != sched_two.sched_id());
+        }
+    }
+
+    // A simple test to check if a homed task run on a single
+    // scheduler ends up executing while home.
+
+    #[test]
+    fn test_home_sched() {
+        do run_in_bare_thread {
+            let mut task_ran = false;
+            let task_ran_ptr: *mut bool = &mut task_ran;
+            let mut sched = ~new_test_uv_sched();
+
+            let sched_handle = sched.make_handle();
+            let sched_id = sched.sched_id();
+
+            let task = ~do Coroutine::new_homed(&mut sched.stack_pool,
+                                                Sched(sched_handle)) {
+                unsafe { *task_ran_ptr = true };
+                let sched = Local::take::<Scheduler>();
+                assert!(sched.sched_id() == sched_id);
+                Local::put::<Scheduler>(sched);
+            };
+            sched.enqueue_task(task);
+            sched.run();
+            assert!(task_ran);
+        }
+    }
+
+    // A test for each state of schedule_task
+
+    #[test]
+    fn test_schedule_home_states() {
+
+        use rt::uv::uvio::UvEventLoop;
+        use rt::sched::Shutdown;
+        use rt::sleeper_list::SleeperList;
+        use rt::work_queue::WorkQueue;
+
+        do run_in_bare_thread {
+//            let nthreads = 2;
+
+            let sleepers = SleeperList::new();
+            let work_queue = WorkQueue::new();
+
+            // our normal scheduler
+            let mut normal_sched = ~Scheduler::new(
+                ~UvEventLoop::new(),
+                work_queue.clone(),
+                sleepers.clone());
+
+            let normal_handle = Cell::new(normal_sched.make_handle());
+
+            // our special scheduler
+            let mut special_sched = ~Scheduler::new_special(
+                ~UvEventLoop::new(),
+                work_queue.clone(),
+                sleepers.clone(),
+                true);
+
+            let special_handle = Cell::new(special_sched.make_handle());
+            let special_handle2 = Cell::new(special_sched.make_handle());
+            let special_id = special_sched.sched_id();
+            let t1_handle = special_sched.make_handle();
+            let t4_handle = special_sched.make_handle();
+
+            let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool,
+                                            Sched(t1_handle)) {
+                let is_home = Coroutine::is_home_using_id(special_id);
+                rtdebug!("t1 should be home: %b", is_home);
+                assert!(is_home);
+            };
+            let t1f = Cell::new(t1f);
+
+            let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
+                let on_special = Coroutine::on_special();
+                rtdebug!("t2 should not be on special: %b", on_special);
+                assert!(!on_special);
+            };
+            let t2f = Cell::new(t2f);
+
+            let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
+                // not on special
+                let on_special = Coroutine::on_special();
+                rtdebug!("t3 should not be on special: %b", on_special);
+                assert!(!on_special);
+            };
+            let t3f = Cell::new(t3f);
+
+            let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool,
+                                            Sched(t4_handle)) {
+                // is home
+                let home = Coroutine::is_home_using_id(special_id);
+                rtdebug!("t4 should be home: %b", home);
+                assert!(home);
+            };
+            let t4f = Cell::new(t4f);
+
+            // we have four tests, make them as closures
+            let t1: ~fn() = || {
+                // task is home on special
+                let task = t1f.take();
+                let sched = Local::take::<Scheduler>();
+                sched.schedule_task(task);
+            };
+            let t2: ~fn() = || {
+                // not homed, task doesn't care
+                let task = t2f.take();
+                let sched = Local::take::<Scheduler>();
+                sched.schedule_task(task);
+            };
+            let t3: ~fn() = || {
+                // task not homed, must leave
+                let task = t3f.take();
+                let sched = Local::take::<Scheduler>();
+                sched.schedule_task(task);
+            };
+            let t4: ~fn() = || {
+                // task not home, send home
+                let task = t4f.take();
+                let sched = Local::take::<Scheduler>();
+                sched.schedule_task(task);
+            };
+
+            let t1 = Cell::new(t1);
+            let t2 = Cell::new(t2);
+            let t3 = Cell::new(t3);
+            let t4 = Cell::new(t4);
+
+            // build a main task that runs our four tests
+            let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
+                // the two tasks that require a normal start location
+                t2.take()();
+                t4.take()();
+                normal_handle.take().send(Shutdown);
+                special_handle.take().send(Shutdown);
+            };
+
+            // task to run the two "special start" tests
+            let special_task = ~do Coroutine::new_homed(
+                &mut special_sched.stack_pool,
+                Sched(special_handle2.take())) {
+                t1.take()();
+                t3.take()();
+            };
+
+            // enqueue the main tasks
+            normal_sched.enqueue_task(special_task);
+            normal_sched.enqueue_task(main_task);
+
+            let nsched_cell = Cell::new(normal_sched);
+            let normal_thread = do Thread::start {
+                let sched = nsched_cell.take();
+                sched.run();
+            };
+
+            let ssched_cell = Cell::new(special_sched);
+            let special_thread = do Thread::start {
+                let sched = ssched_cell.take();
+                sched.run();
+            };
+
+            // wait for the end
+            let _thread1 = normal_thread;
+            let _thread2 = special_thread;
+
+        }
+    }
+
+    // The following test is a bit of a mess, but it trys to do
+    // something tricky so I'm not sure how to get around this in the
+    // short term.
+
+    // A number of schedulers are created, and then a task is created
+    // and assigned a home scheduler. It is then "started" on a
+    // different scheduler. The scheduler it is started on should
+    // observe that the task is not home, and send it home.
+
+    // This test is light in that it does very little.
+
+    #[test]
+    fn test_transfer_task_home() {
+
+        use rt::uv::uvio::UvEventLoop;
+        use rt::sched::Shutdown;
+        use rt::sleeper_list::SleeperList;
+        use rt::work_queue::WorkQueue;
+        use uint;
+        use container::Container;
+        use vec::OwnedVector;
+
+        do run_in_bare_thread {
+
+            static N: uint = 8;
+
+            let sleepers = SleeperList::new();
+            let work_queue = WorkQueue::new();
+
+            let mut handles = ~[];
+            let mut scheds = ~[];
+
+            for uint::range(0, N) |_| {
+                let loop_ = ~UvEventLoop::new();
+                let mut sched = ~Scheduler::new(loop_,
+                                                work_queue.clone(),
+                                                sleepers.clone());
+                let handle = sched.make_handle();
+                rtdebug!("sched id: %u", handle.sched_id);
+                handles.push(handle);
+                scheds.push(sched);
+            };
+
+            let handles = Cell::new(handles);
+
+            let home_handle = scheds[6].make_handle();
+            let home_id = home_handle.sched_id;
+            let home = Sched(home_handle);
+
+            let main_task = ~do Coroutine::new_homed(&mut scheds[1].stack_pool, home) {
+
+                // Here we check if the task is running on its home.
+                let sched = Local::take::<Scheduler>();
+                rtdebug!("run location scheduler id: %u, home: %u",
+                         sched.sched_id(),
+                         home_id);
+                assert!(sched.sched_id() == home_id);
+                Local::put::<Scheduler>(sched);
+
+                let mut handles = handles.take();
+                for handles.mut_iter().advance |handle| {
+                    handle.send(Shutdown);
+                }
+            };
+
+            scheds[0].enqueue_task(main_task);
+
+            let mut threads = ~[];
+
+            while !scheds.is_empty() {
+                let sched = scheds.pop();
+                let sched_cell = Cell::new(sched);
+                let thread = do Thread::start {
+                    let sched = sched_cell.take();
+                    sched.run();
+                };
+                threads.push(thread);
+            }
+
+            let _threads = threads;
+        }
+    }
+
+    // Do it a lot
+
+    #[test]
+    fn test_stress_schedule_task_states() {
+        let n = stress_factor() * 120;
+        for int::range(0,n as int) |_| {
+            test_schedule_home_states();
+        }
+    }
+
+    // The goal is that this is the high-stress test for making sure
+    // homing is working. It allocates RUST_RT_STRESS tasks that
+    // do nothing but assert that they are home at execution
+    // time. These tasks are queued to random schedulers, so sometimes
+    // they are home and sometimes not. It also runs RUST_RT_STRESS
+    // times.
+
+    #[test]
+    fn test_stress_homed_tasks() {
+        let n = stress_factor();
+        for int::range(0,n as int) |_| {
+            run_in_mt_newsched_task_random_homed();
+        }
+    }
 
     #[test]
     fn test_simple_scheduling() {
@@ -418,8 +1141,8 @@ mod test {
             let mut task_ran = false;
             let task_ran_ptr: *mut bool = &mut task_ran;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
-            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let mut sched = ~new_test_uv_sched();
+            let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 unsafe { *task_ran_ptr = true; }
             };
             sched.enqueue_task(task);
@@ -435,9 +1158,9 @@ mod test {
             let mut task_count = 0;
             let task_count_ptr: *mut int = &mut task_count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
             for int::range(0, total) |_| {
-                let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                     unsafe { *task_count_ptr = *task_count_ptr + 1; }
                 };
                 sched.enqueue_task(task);
@@ -453,19 +1176,17 @@ mod test {
             let mut count = 0;
             let count_ptr: *mut int = &mut count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
-            let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
+            let mut sched = ~new_test_uv_sched();
+            let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
                 let mut sched = Local::take::<Scheduler>();
-                let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
+                let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) {
                     unsafe { *count_ptr = *count_ptr + 1; }
                 };
                 // Context switch directly to the new task
-                do sched.switch_running_tasks_and_then(task2) |task1| {
+                do sched.switch_running_tasks_and_then(task2) |sched, task1| {
                     let task1 = Cell::new(task1);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(task1.take());
-                    }
+                    sched.enqueue_task(task1.take());
                 }
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -482,9 +1203,9 @@ mod test {
             let mut count = 0;
             let count_ptr: *mut int = &mut count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
 
-            let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 run_task(count_ptr);
             };
             sched.enqueue_task(start_task);
@@ -493,8 +1214,8 @@ mod test {
             assert_eq!(count, MAX);
 
             fn run_task(count_ptr: *mut int) {
-                do Local::borrow::<Scheduler> |sched| {
-                    let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                do Local::borrow::<Scheduler, ()> |sched| {
+                    let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                         unsafe {
                             *count_ptr = *count_ptr + 1;
                             if *count_ptr != MAX {
@@ -511,16 +1232,14 @@ mod test {
     #[test]
     fn test_block_task() {
         do run_in_bare_thread {
-            let mut sched = ~UvEventLoop::new_scheduler();
-            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let mut sched = ~new_test_uv_sched();
+            let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 let sched = Local::take::<Scheduler>();
                 assert!(sched.in_task_context());
-                do sched.deschedule_running_task_and_then() |task| {
+                do sched.deschedule_running_task_and_then() |sched, task| {
                     let task = Cell::new(task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        assert!(!sched.in_task_context());
-                        sched.enqueue_task(task.take());
-                    }
+                    assert!(!sched.in_task_context());
+                    sched.enqueue_task(task.take());
                 }
             };
             sched.enqueue_task(task);
@@ -537,8 +1256,7 @@ mod test {
         do run_in_newsched_task {
             do spawn {
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
-                    let mut sched = Local::take::<Scheduler>();
+                do sched.deschedule_running_task_and_then |sched, task| {
                     let task = Cell::new(task);
                     do sched.event_loop.callback_ms(10) {
                         rtdebug!("in callback");
@@ -546,9 +1264,149 @@ mod test {
                         sched.enqueue_task(task.take());
                         Local::put(sched);
                     }
-                    Local::put(sched);
                 }
             }
         }
     }
+
+    #[test]
+    fn handle() {
+        use rt::comm::*;
+
+        do run_in_bare_thread {
+            let (port, chan) = oneshot::<()>();
+            let port_cell = Cell::new(port);
+            let chan_cell = Cell::new(chan);
+            let mut sched1 = ~new_test_uv_sched();
+            let handle1 = sched1.make_handle();
+            let handle1_cell = Cell::new(handle1);
+            let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) {
+                chan_cell.take().send(());
+            };
+            sched1.enqueue_task(task1);
+
+            let mut sched2 = ~new_test_uv_sched();
+            let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) {
+                port_cell.take().recv();
+                // Release the other scheduler's handle so it can exit
+                handle1_cell.take();
+            };
+            sched2.enqueue_task(task2);
+
+            let sched1_cell = Cell::new(sched1);
+            let _thread1 = do Thread::start {
+                let sched1 = sched1_cell.take();
+                sched1.run();
+            };
+
+            let sched2_cell = Cell::new(sched2);
+            let _thread2 = do Thread::start {
+                let sched2 = sched2_cell.take();
+                sched2.run();
+            };
+        }
+    }
+
+    #[test]
+    fn multithreading() {
+        use rt::comm::*;
+        use iter::Times;
+        use vec::OwnedVector;
+        use container::Container;
+
+        do run_in_mt_newsched_task {
+            let mut ports = ~[];
+            for 10.times {
+                let (port, chan) = oneshot();
+                let chan_cell = Cell::new(chan);
+                do spawntask_later {
+                    chan_cell.take().send(());
+                }
+                ports.push(port);
+            }
+
+            while !ports.is_empty() {
+                ports.pop().recv();
+            }
+        }
+    }
+
+    #[test]
+    fn thread_ring() {
+        use rt::comm::*;
+        use comm::{GenericPort, GenericChan};
+
+        do run_in_mt_newsched_task {
+            let (end_port, end_chan) = oneshot();
+
+            let n_tasks = 10;
+            let token = 2000;
+
+            let mut (p, ch1) = stream();
+            ch1.send((token, end_chan));
+            let mut i = 2;
+            while i <= n_tasks {
+                let (next_p, ch) = stream();
+                let imm_i = i;
+                let imm_p = p;
+                do spawntask_random {
+                    roundtrip(imm_i, n_tasks, &imm_p, &ch);
+                };
+                p = next_p;
+                i += 1;
+            }
+            let imm_p = p;
+            let imm_ch = ch1;
+            do spawntask_random {
+                roundtrip(1, n_tasks, &imm_p, &imm_ch);
+            }
+
+            end_port.recv();
+        }
+
+        fn roundtrip(id: int, n_tasks: int,
+                     p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
+            while (true) {
+                match p.recv() {
+                    (1, end_chan) => {
+                        debug!("%d\n", id);
+                        end_chan.send(());
+                        return;
+                    }
+                    (token, end_chan) => {
+                        debug!("thread: %d   got token: %d", id, token);
+                        ch.send((token - 1, end_chan));
+                        if token <= n_tasks {
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    #[test]
+    fn start_closure_dtor() {
+        use ops::Drop;
+
+        // Regression test that the `start` task entrypoint can
+        // contain dtors that use task resources
+        do run_in_newsched_task {
+            struct S { field: () }
+
+            impl Drop for S {
+                fn finalize(&self) {
+                    let _foo = @0;
+                }
+            }
+
+            let s = S { field: () };
+
+            do spawntask {
+                let _ss = &s;
+            }
+        }
+    }
+
 }
diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs
new file mode 100644
index 00000000000..3d6e9ef5635
--- /dev/null
+++ b/src/libstd/rt/sleeper_list.rs
@@ -0,0 +1,59 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Maintains a shared list of sleeping schedulers. Schedulers
+//! use this to wake each other up.
+
+use container::Container;
+use vec::OwnedVector;
+use option::{Option, Some, None};
+use cell::Cell;
+use unstable::sync::{Exclusive, exclusive};
+use rt::sched::SchedHandle;
+use clone::Clone;
+
+pub struct SleeperList {
+    priv stack: ~Exclusive<~[SchedHandle]>
+}
+
+impl SleeperList {
+    pub fn new() -> SleeperList {
+        SleeperList {
+            stack: ~exclusive(~[])
+        }
+    }
+
+    pub fn push(&mut self, handle: SchedHandle) {
+        let handle = Cell::new(handle);
+        unsafe {
+            self.stack.with(|s| s.push(handle.take()));
+        }
+    }
+
+    pub fn pop(&mut self) -> Option<SchedHandle> {
+        unsafe {
+            do self.stack.with |s| {
+                if !s.is_empty() {
+                    Some(s.pop())
+                } else {
+                    None
+                }
+            }
+        }
+    }
+}
+
+impl Clone for SleeperList {
+    fn clone(&self) -> SleeperList {
+        SleeperList {
+            stack: self.stack.clone()
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 41390aec80c..e7f87906fe5 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -18,16 +18,22 @@ use cast::transmute;
 use libc::{c_void, uintptr_t};
 use ptr;
 use prelude::*;
+use option::{Option, Some, None};
 use rt::local::Local;
 use rt::logging::StdErrLogger;
 use super::local_heap::LocalHeap;
+use rt::sched::{SchedHome, AnySched};
+use rt::join_latch::JoinLatch;
 
 pub struct Task {
     heap: LocalHeap,
     gc: GarbageCollector,
     storage: LocalStorage,
     logger: StdErrLogger,
-    unwinder: Option<Unwinder>,
+    unwinder: Unwinder,
+    home: Option<SchedHome>,
+    join_latch: Option<~JoinLatch>,
+    on_exit: Option<~fn(bool)>,
     destroyed: bool
 }
 
@@ -39,49 +45,63 @@ pub struct Unwinder {
 }
 
 impl Task {
-    pub fn new() -> Task {
+    pub fn new_root() -> Task {
         Task {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
-            unwinder: Some(Unwinder { unwinding: false }),
+            unwinder: Unwinder { unwinding: false },
+            home: Some(AnySched),
+            join_latch: Some(JoinLatch::new_root()),
+            on_exit: None,
             destroyed: false
         }
     }
 
-    pub fn without_unwinding() -> Task {
+    pub fn new_child(&mut self) -> Task {
         Task {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
-            unwinder: None,
+            home: Some(AnySched),
+            unwinder: Unwinder { unwinding: false },
+            join_latch: Some(self.join_latch.get_mut_ref().new_child()),
+            on_exit: None,
             destroyed: false
         }
     }
 
+    pub fn give_home(&mut self, new_home: SchedHome) {
+        self.home = Some(new_home);
+    }
+
     pub fn run(&mut self, f: &fn()) {
         // This is just an assertion that `run` was called unsafely
         // and this instance of Task is still accessible.
-        do Local::borrow::<Task> |task| {
+        do Local::borrow::<Task, ()> |task| {
             assert!(borrow::ref_eq(task, self));
         }
 
-        match self.unwinder {
-            Some(ref mut unwinder) => {
-                // If there's an unwinder then set up the catch block
-                unwinder.try(f);
+        self.unwinder.try(f);
+        self.destroy();
+
+        // Wait for children. Possibly report the exit status.
+        let local_success = !self.unwinder.unwinding;
+        let join_latch = self.join_latch.swap_unwrap();
+        match self.on_exit {
+            Some(ref on_exit) => {
+                let success = join_latch.wait(local_success);
+                (*on_exit)(success);
             }
             None => {
-                // Otherwise, just run the body
-                f()
+                join_latch.release(local_success);
             }
         }
-        self.destroy();
     }
 
-    /// Must be called manually before finalization to clean up
+    /// must be called manually before finalization to clean up
     /// thread-local resources. Some of the routines here expect
     /// Task to be available recursively so this must be
     /// called unsafely, without removing Task from
@@ -89,7 +109,7 @@ impl Task {
     fn destroy(&mut self) {
         // This is just an assertion that `destroy` was called unsafely
         // and this instance of Task is still accessible.
-        do Local::borrow::<Task> |task| {
+        do Local::borrow::<Task, ()> |task| {
             assert!(borrow::ref_eq(task, self));
         }
         match self.storage {
@@ -227,4 +247,14 @@ mod test {
             assert!(port.recv() == 10);
         }
     }
+
+    #[test]
+    fn linked_failure() {
+        do run_in_newsched_task() {
+            let res = do spawntask_try {
+                spawntask_random(|| fail!());
+            };
+            assert!(res.is_err());
+        }
+    }
 }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index fe08d85c947..6e4fb9b1d94 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -9,13 +9,33 @@
 // except according to those terms.
 
 use uint;
-use option::*;
+use option::{Some, None};
 use cell::Cell;
+use clone::Clone;
+use container::Container;
+use iterator::IteratorUtil;
+use vec::{OwnedVector, MutableVector};
 use result::{Result, Ok, Err};
+use unstable::run_in_bare_thread;
 use super::io::net::ip::{IpAddr, Ipv4};
+use rt::comm::oneshot;
 use rt::task::Task;
 use rt::thread::Thread;
 use rt::local::Local;
+use rt::sched::{Scheduler, Coroutine};
+use rt::sleeper_list::SleeperList;
+use rt::work_queue::WorkQueue;
+
+pub fn new_test_uv_sched() -> Scheduler {
+    use rt::uv::uvio::UvEventLoop;
+    use rt::work_queue::WorkQueue;
+    use rt::sleeper_list::SleeperList;
+
+    let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new());
+    // Don't wait for the Shutdown message
+    sched.no_sleep = true;
+    return sched;
+}
 
 /// Creates a new scheduler in a new thread and runs a task in it,
 /// then waits for the scheduler to exit. Failure of the task
@@ -23,48 +43,240 @@ use rt::local::Local;
 pub fn run_in_newsched_task(f: ~fn()) {
     use super::sched::*;
     use unstable::run_in_bare_thread;
-    use rt::uv::uvio::UvEventLoop;
 
     let f = Cell::new(f);
 
     do run_in_bare_thread {
-        let mut sched = ~UvEventLoop::new_scheduler();
+        let mut sched = ~new_test_uv_sched();
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
+        new_task.on_exit = Some(on_exit);
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                         ~Task::without_unwinding(),
+                                         new_task,
                                          f.take());
         sched.enqueue_task(task);
         sched.run();
     }
 }
 
+/// Create more than one scheduler and run a function in a task
+/// in one of the schedulers. The schedulers will stay alive
+/// until the function `f` returns.
+pub fn run_in_mt_newsched_task(f: ~fn()) {
+    use libc;
+    use os;
+    use from_str::FromStr;
+    use rt::uv::uvio::UvEventLoop;
+    use rt::sched::Shutdown;
+
+    let f_cell = Cell::new(f);
+
+    do run_in_bare_thread {
+        let nthreads = match os::getenv("RUST_TEST_THREADS") {
+            Some(nstr) => FromStr::from_str(nstr).get(),
+            None => unsafe {
+                // Using more threads than cores in test code
+                // to force the OS to preempt them frequently.
+                // Assuming that this help stress test concurrent types.
+                rust_get_num_cpus() * 2
+            }
+        };
+
+        let sleepers = SleeperList::new();
+        let work_queue = WorkQueue::new();
+
+        let mut handles = ~[];
+        let mut scheds = ~[];
+
+        for uint::range(0, nthreads) |_| {
+            let loop_ = ~UvEventLoop::new();
+            let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+            let handle = sched.make_handle();
+
+            handles.push(handle);
+            scheds.push(sched);
+        }
+
+        let f_cell = Cell::new(f_cell.take());
+        let handles = Cell::new(handles);
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| {
+
+            let mut handles = handles.take();
+            // Tell schedulers to exit
+            for handles.mut_iter().advance |handle| {
+                handle.send(Shutdown);
+            }
+
+            rtassert!(exit_status);
+        };
+        new_task.on_exit = Some(on_exit);
+        let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
+                                              new_task, f_cell.take());
+        scheds[0].enqueue_task(main_task);
+
+        let mut threads = ~[];
+
+        while !scheds.is_empty() {
+            let sched = scheds.pop();
+            let sched_cell = Cell::new(sched);
+            let thread = do Thread::start {
+                let sched = sched_cell.take();
+                sched.run();
+            };
+
+            threads.push(thread);
+        }
+
+        // Wait for schedulers
+        let _threads = threads;
+    }
+
+    extern {
+        fn rust_get_num_cpus() -> libc::uintptr_t;
+    }
+}
+
+// THIS IS AWFUL. Copy-pasted the above initialization function but
+// with a number of hacks to make it spawn tasks on a variety of
+// schedulers with a variety of homes using the new spawn.
+
+pub fn run_in_mt_newsched_task_random_homed() {
+    use libc;
+    use os;
+    use from_str::FromStr;
+    use rt::uv::uvio::UvEventLoop;
+    use rt::sched::Shutdown;
+
+    do run_in_bare_thread {
+        let nthreads = match os::getenv("RUST_TEST_THREADS") {
+            Some(nstr) => FromStr::from_str(nstr).get(),
+            None => unsafe {
+                // Using more threads than cores in test code to force
+                // the OS to preempt them frequently.  Assuming that
+                // this help stress test concurrent types.
+                rust_get_num_cpus() * 2
+            }
+        };
+
+        let sleepers = SleeperList::new();
+        let work_queue = WorkQueue::new();
+
+        let mut handles = ~[];
+        let mut scheds = ~[];
+
+        // create a few special schedulers, those with even indicies
+        // will be pinned-only
+        for uint::range(0, nthreads) |i| {
+            let special = (i % 2) == 0;
+            let loop_ = ~UvEventLoop::new();
+            let mut sched = ~Scheduler::new_special(
+                loop_, work_queue.clone(), sleepers.clone(), special);
+            let handle = sched.make_handle();
+            handles.push(handle);
+            scheds.push(sched);
+        }
+
+        // Schedule a pile o tasks
+        let n = 5*stress_factor();
+        for uint::range(0,n) |_i| {
+                rtdebug!("creating task: %u", _i);
+                let hf: ~fn() = || { assert!(true) };
+                spawntask_homed(&mut scheds, hf);
+            }
+
+        // Now we want another pile o tasks that do not ever run on a
+        // special scheduler, because they are normal tasks. Because
+        // we can we put these in the "main" task.
+
+        let n = 5*stress_factor();
+
+        let f: ~fn() = || {
+            for uint::range(0,n) |_| {
+                let f: ~fn()  = || {
+                    // Borrow the scheduler we run on and check if it is
+                    // privileged.
+                    do Local::borrow::<Scheduler,()> |sched| {
+                        assert!(sched.run_anything);
+                    };
+                };
+                spawntask_random(f);
+            };
+        };
+
+        let f_cell = Cell::new(f);
+        let handles = Cell::new(handles);
+
+        rtdebug!("creating main task");
+
+        let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
+            f_cell.take()();
+            let mut handles = handles.take();
+            // Tell schedulers to exit
+            for handles.mut_iter().advance |handle| {
+                handle.send(Shutdown);
+            }
+        };
+
+        rtdebug!("queuing main task")
+
+        scheds[0].enqueue_task(main_task);
+
+        let mut threads = ~[];
+
+        while !scheds.is_empty() {
+            let sched = scheds.pop();
+            let sched_cell = Cell::new(sched);
+            let thread = do Thread::start {
+                let sched = sched_cell.take();
+                rtdebug!("running sched: %u", sched.sched_id());
+                sched.run();
+            };
+
+            threads.push(thread);
+        }
+
+        rtdebug!("waiting on scheduler threads");
+
+        // Wait for schedulers
+        let _threads = threads;
+    }
+
+    extern {
+        fn rust_get_num_cpus() -> libc::uintptr_t;
+    }
+}
+
+
 /// Test tasks will abort on failure instead of unwinding
 pub fn spawntask(f: ~fn()) {
     use super::sched::*;
 
+    rtdebug!("spawntask taking the scheduler from TLS")
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell::new(task);
-        let sched = Local::take::<Scheduler>();
-        sched.schedule_new_task(task.take());
-    }
+                                     task, f);
+    rtdebug!("spawntask scheduling the new task");
+    sched.schedule_task(task);
 }
 
 /// Create a new task and run it right now. Aborts on failure
 pub fn spawntask_immediately(f: ~fn()) {
     use super::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell::new(task);
-        do Local::borrow::<Scheduler> |sched| {
-            sched.enqueue_task(task.take());
-        }
+                                     task, f);
+    do sched.switch_running_tasks_and_then(task) |sched, task| {
+        sched.enqueue_task(task);
     }
 }
 
@@ -72,10 +284,13 @@ pub fn spawntask_immediately(f: ~fn()) {
 pub fn spawntask_later(f: ~fn()) {
     use super::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
 
     sched.enqueue_task(task);
     Local::put(sched);
@@ -86,20 +301,20 @@ pub fn spawntask_random(f: ~fn()) {
     use super::sched::*;
     use rand::{Rand, rng};
 
-    let mut rng = rng();
-    let run_now: bool = Rand::rand(&mut rng);
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
 
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
+
+    let mut rng = rng();
+    let run_now: bool = Rand::rand(&mut rng);
 
     if run_now {
-        do sched.switch_running_tasks_and_then(task) |task| {
-            let task = Cell::new(task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(task.take());
-            }
+        do sched.switch_running_tasks_and_then(task) |sched, task| {
+            sched.enqueue_task(task);
         }
     } else {
         sched.enqueue_task(task);
@@ -107,57 +322,75 @@ pub fn spawntask_random(f: ~fn()) {
     }
 }
 
+/// Spawn a task, with the current scheduler as home, and queue it to
+/// run later.
+pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
+    use super::sched::*;
+    use rand::{rng, RngUtil};
+    let mut rng = rng();
+
+    let task = {
+        let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
+        let handle = sched.make_handle();
+        let home_id = handle.sched_id;
+
+        // now that we know where this is going, build a new function
+        // that can assert it is in the right place
+        let af: ~fn() = || {
+            do Local::borrow::<Scheduler,()>() |sched| {
+                rtdebug!("home_id: %u, runtime loc: %u",
+                         home_id,
+                         sched.sched_id());
+                assert!(home_id == sched.sched_id());
+            };
+            f()
+        };
+
+        ~Coroutine::with_task_homed(&mut sched.stack_pool,
+                                    ~Task::new_root(),
+                                    af,
+                                    Sched(handle))
+    };
+    let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
+    // enqueue it for future execution
+    dest_sched.enqueue_task(task);
+}
 
 /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
 pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     use cell::Cell;
     use super::sched::*;
-    use task;
-    use unstable::finally::Finally;
-
-    // Our status variables will be filled in from the scheduler context
-    let mut failed = false;
-    let failed_ptr: *mut bool = &mut failed;
-
-    // Switch to the scheduler
-    let f = Cell::new(Cell::new(f));
-    let sched = Local::take::<Scheduler>();
-    do sched.deschedule_running_task_and_then() |old_task| {
-        let old_task = Cell::new(old_task);
-        let f = f.take();
-        let mut sched = Local::take::<Scheduler>();
-        let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
-            do (|| {
-                (f.take())()
-            }).finally {
-                // Check for failure then resume the parent task
-                unsafe { *failed_ptr = task::failing(); }
-                let sched = Local::take::<Scheduler>();
-                do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
-                    let new_task = Cell::new(new_task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(new_task.take());
-                    }
-                }
-            }
-        };
 
-        sched.resume_task_immediately(new_task);
+    let (port, chan) = oneshot();
+    let chan = Cell::new(chan);
+    let mut new_task = ~Task::new_root();
+    let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
+    new_task.on_exit = Some(on_exit);
+    let mut sched = Local::take::<Scheduler>();
+    let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
+                                         new_task, f);
+    do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
+        sched.enqueue_task(old_task);
     }
 
-    if !failed { Ok(()) } else { Err(()) }
+    let exit_status = port.recv();
+    if exit_status { Ok(()) } else { Err(()) }
 }
 
 // Spawn a new task in a new scheduler and return a thread handle.
 pub fn spawntask_thread(f: ~fn()) -> Thread {
     use rt::sched::*;
-    use rt::uv::uvio::UvEventLoop;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
+    let task = Cell::new(task);
     let f = Cell::new(f);
     let thread = do Thread::start {
-        let mut sched = ~UvEventLoop::new_scheduler();
+        let mut sched = ~new_test_uv_sched();
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                         ~Task::without_unwinding(),
+                                         task.take(),
                                          f.take());
         sched.enqueue_task(task);
         sched.run();
diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs
index 03e11dfad1d..89f3d10b5e4 100644
--- a/src/libstd/rt/tube.rs
+++ b/src/libstd/rt/tube.rs
@@ -72,7 +72,7 @@ impl<T> Tube<T> {
                 assert!(self.p.refcount() > 1); // There better be somebody to wake us up
                 assert!((*state).blocked_task.is_none());
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
+                do sched.deschedule_running_task_and_then |_, task| {
                     (*state).blocked_task = Some(task);
                 }
                 rtdebug!("waking after tube recv");
@@ -107,11 +107,10 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone_cell = Cell::new(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 let mut tube_clone = tube_clone_cell.take();
                 tube_clone.send(1);
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -123,21 +122,17 @@ mod test {
         do run_in_newsched_task {
             let mut tube: Tube<int> = Tube::new();
             let tube_clone = tube.clone();
-            let tube_clone = Cell::new(Cell::new(Cell::new(tube_clone)));
+            let tube_clone = Cell::new(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
-                let tube_clone = tube_clone.take();
-                do Local::borrow::<Scheduler> |sched| {
-                    let tube_clone = tube_clone.take();
-                    do sched.event_loop.callback {
-                        let mut tube_clone = tube_clone.take();
-                        // The task should be blocked on this now and
-                        // sending will wake it up.
-                        tube_clone.send(1);
-                    }
+            do sched.deschedule_running_task_and_then |sched, task| {
+                let tube_clone = Cell::new(tube_clone.take());
+                do sched.event_loop.callback {
+                    let mut tube_clone = tube_clone.take();
+                    // The task should be blocked on this now and
+                    // sending will wake it up.
+                    tube_clone.send(1);
                 }
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -153,14 +148,14 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone = Cell::new(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 callback_send(tube_clone.take(), 0);
 
                 fn callback_send(tube: Tube<int>, i: int) {
                     if i == 100 { return; }
 
                     let tube = Cell::new(Cell::new(tube));
-                    do Local::borrow::<Scheduler> |sched| {
+                    do Local::borrow::<Scheduler, ()> |sched| {
                         let tube = tube.take();
                         do sched.event_loop.callback {
                             let mut tube = tube.take();
@@ -172,8 +167,7 @@ mod test {
                     }
                 }
 
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             for int::range(0, MAX) |i| {
diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs
new file mode 100644
index 00000000000..f3d1024024f
--- /dev/null
+++ b/src/libstd/rt/uv/async.rs
@@ -0,0 +1,105 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::{c_int, c_void};
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::uvll::UV_ASYNC;
+use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
+use rt::uv::WatcherInterop;
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct AsyncWatcher(*uvll::uv_async_t);
+impl Watcher for AsyncWatcher { }
+
+impl AsyncWatcher {
+    pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
+        unsafe {
+            let handle = uvll::malloc_handle(UV_ASYNC);
+            assert!(handle.is_not_null());
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            let data = watcher.get_watcher_data();
+            data.async_cb = Some(cb);
+            assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb));
+            return watcher;
+        }
+
+        extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            let status = status_to_maybe_uv_error(watcher.native_handle(), status);
+            let data = watcher.get_watcher_data();
+            let cb = data.async_cb.get_ref();
+            (*cb)(watcher, status);
+        }
+    }
+
+    pub fn send(&mut self) {
+        unsafe {
+            let handle = self.native_handle();
+            uvll::async_send(handle);
+        }
+    }
+
+    pub fn close(self, cb: NullCallback) {
+        let mut this = self;
+        let data = this.get_watcher_data();
+        assert!(data.close_cb.is_none());
+        data.close_cb = Some(cb);
+
+        unsafe {
+            uvll::close(self.native_handle(), close_cb);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_stream_t) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            watcher.drop_watcher_data();
+            unsafe { uvll::free_handle(handle as *c_void); }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
+    fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher {
+        AsyncWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_async_t {
+        match self { &AsyncWatcher(ptr) => ptr }
+    }
+}
+
+#[cfg(test)]
+mod test {
+
+    use super::*;
+    use rt::uv::Loop;
+    use unstable::run_in_bare_thread;
+    use rt::thread::Thread;
+    use cell::Cell;
+
+    #[test]
+    fn smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
+            let watcher_cell = Cell::new(watcher);
+            let _thread = do Thread::start {
+                let mut watcher = watcher_cell.take();
+                watcher.send();
+            };
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
index e1def9ffd50..a3630c9b9bf 100644
--- a/src/libstd/rt/uv/idle.rs
+++ b/src/libstd/rt/uv/idle.rs
@@ -90,3 +90,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
         match self { &IdleWatcher(ptr) => ptr }
     }
 }
+
+#[cfg(test)]
+mod test {
+
+    use rt::uv::Loop;
+    use super::*;
+    use unstable::run_in_bare_thread;
+
+    #[test]
+    #[ignore(reason = "valgrind - loop destroyed before watcher?")]
+    fn idle_new_then_close() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let idle_watcher = { IdleWatcher::new(&mut loop_) };
+            idle_watcher.close(||());
+        }
+    }
+
+    #[test]
+    fn idle_smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            let mut count = 10;
+            let count_ptr: *mut int = &mut count;
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                if unsafe { *count_ptr == 10 } {
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                } else {
+                    unsafe { *count_ptr = *count_ptr + 1; }
+                }
+            }
+            loop_.run();
+            loop_.close();
+            assert_eq!(count, 10);
+        }
+    }
+
+    #[test]
+    fn idle_start_stop_start() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                idle_watcher.stop();
+                do idle_watcher.start |idle_watcher, status| {
+                    assert!(status.is_none());
+                    let mut idle_watcher = idle_watcher;
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                }
+            }
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs
index f50efc079a7..883eda0057f 100644
--- a/src/libstd/rt/uv/mod.rs
+++ b/src/libstd/rt/uv/mod.rs
@@ -55,6 +55,7 @@ pub use self::file::FsRequest;
 pub use self::net::{StreamWatcher, TcpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
+pub use self::async::AsyncWatcher;
 
 /// The implementation of `rtio` for libuv
 pub mod uvio;
@@ -66,6 +67,7 @@ pub mod file;
 pub mod net;
 pub mod idle;
 pub mod timer;
+pub mod async;
 
 /// XXX: Loop(*handle) is buggy with destructors. Normal structs
 /// with dtors may not be destructured, but tuple structs can,
@@ -123,6 +125,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
 pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(FsRequest, Option<UvError>);
 pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
+pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
 
 
 /// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@@ -133,7 +136,8 @@ struct WatcherData {
     close_cb: Option<NullCallback>,
     alloc_cb: Option<AllocCallback>,
     idle_cb: Option<IdleCallback>,
-    timer_cb: Option<TimerCallback>
+    timer_cb: Option<TimerCallback>,
+    async_cb: Option<AsyncCallback>
 }
 
 pub trait WatcherInterop {
@@ -162,7 +166,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
                 close_cb: None,
                 alloc_cb: None,
                 idle_cb: None,
-                timer_cb: None
+                timer_cb: None,
+                async_cb: None
             };
             let data = transmute::<~WatcherData, *c_void>(data);
             uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -347,57 +352,3 @@ fn loop_smoke_test() {
         loop_.close();
     }
 }
-
-#[test]
-#[ignore(reason = "valgrind - loop destroyed before watcher?")]
-fn idle_new_then_close() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close(||());
-    }
-}
-
-#[test]
-fn idle_smoke_test() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        let mut count = 10;
-        let count_ptr: *mut int = &mut count;
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            if unsafe { *count_ptr == 10 } {
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            } else {
-                unsafe { *count_ptr = *count_ptr + 1; }
-            }
-        }
-        loop_.run();
-        loop_.close();
-        assert_eq!(count, 10);
-    }
-}
-
-#[test]
-fn idle_start_stop_start() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            idle_watcher.stop();
-            do idle_watcher.start |idle_watcher, status| {
-                assert!(status.is_none());
-                let mut idle_watcher = idle_watcher;
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            }
-        }
-        loop_.run();
-        loop_.close();
-    }
-}
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 964ee460c1d..298277b3df0 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -12,6 +12,7 @@ use option::*;
 use result::*;
 use ops::Drop;
 use cell::Cell;
+use cast;
 use cast::transmute;
 use clone::Clone;
 use rt::io::IoError;
@@ -23,6 +24,7 @@ use rt::sched::Scheduler;
 use rt::io::{standard_error, OtherIoError};
 use rt::tube::Tube;
 use rt::local::Local;
+use unstable::sync::{Exclusive, exclusive};
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use uint;
@@ -39,11 +41,6 @@ impl UvEventLoop {
             uvio: UvIoFactory(Loop::new())
         }
     }
-
-    /// A convenience constructor
-    pub fn new_scheduler() -> Scheduler {
-        Scheduler::new(~UvEventLoop::new())
-    }
 }
 
 impl Drop for UvEventLoop {
@@ -81,6 +78,10 @@ impl EventLoop for UvEventLoop {
         }
     }
 
+    fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
+        ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
+    }
+
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
         Some(&mut self.uvio)
     }
@@ -100,6 +101,89 @@ fn test_callback_run_once() {
     }
 }
 
+pub struct UvRemoteCallback {
+    // The uv async handle for triggering the callback
+    async: AsyncWatcher,
+    // A flag to tell the callback to exit, set from the dtor. This is
+    // almost never contested - only in rare races with the dtor.
+    exit_flag: Exclusive<bool>
+}
+
+impl UvRemoteCallback {
+    pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
+        let exit_flag = exclusive(false);
+        let exit_flag_clone = exit_flag.clone();
+        let async = do AsyncWatcher::new(loop_) |watcher, status| {
+            assert!(status.is_none());
+            f();
+            unsafe {
+                do exit_flag_clone.with_imm |&should_exit| {
+                    if should_exit {
+                        watcher.close(||());
+                    }
+                }
+            }
+        };
+        UvRemoteCallback {
+            async: async,
+            exit_flag: exit_flag
+        }
+    }
+}
+
+impl RemoteCallback for UvRemoteCallback {
+    fn fire(&mut self) { self.async.send() }
+}
+
+impl Drop for UvRemoteCallback {
+    fn finalize(&self) {
+        unsafe {
+            let this: &mut UvRemoteCallback = cast::transmute_mut(self);
+            do this.exit_flag.with |should_exit| {
+                // NB: These two things need to happen atomically. Otherwise
+                // the event handler could wake up due to a *previous*
+                // signal and see the exit flag, destroying the handle
+                // before the final send.
+                *should_exit = true;
+                this.async.send();
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test_remote {
+    use cell::Cell;
+    use rt::test::*;
+    use rt::thread::Thread;
+    use rt::tube::Tube;
+    use rt::rtio::EventLoop;
+    use rt::local::Local;
+    use rt::sched::Scheduler;
+
+    #[test]
+    fn test_uv_remote() {
+        do run_in_newsched_task {
+            let mut tube = Tube::new();
+            let tube_clone = tube.clone();
+            let remote_cell = Cell::new_empty();
+            do Local::borrow::<Scheduler, ()>() |sched| {
+                let tube_clone = tube_clone.clone();
+                let tube_clone_cell = Cell::new(tube_clone);
+                let remote = do sched.event_loop.remote_callback {
+                    tube_clone_cell.take().send(1);
+                };
+                remote_cell.put_back(remote);
+            }
+            let _thread = do Thread::start {
+                remote_cell.take().fire();
+            };
+
+            assert!(tube.recv() == 1);
+        }
+    }
+}
+
 pub struct UvIoFactory(Loop);
 
 impl UvIoFactory {
@@ -122,12 +206,10 @@ impl IoFactory for UvIoFactory {
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
 
             rtdebug!("connect: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell::new(task);
 
@@ -167,7 +249,7 @@ impl IoFactory for UvIoFactory {
             Ok(_) => Ok(~UvTcpListener::new(watcher)),
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |_, task| {
                     let task_cell = Cell::new(task);
                     do watcher.as_stream().close {
                         let scheduler = Local::take::<Scheduler>();
@@ -203,7 +285,7 @@ impl Drop for UvTcpListener {
     fn finalize(&self) {
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell::new(task);
             do watcher.as_stream().close {
                 let scheduler = Local::take::<Scheduler>();
@@ -265,7 +347,7 @@ impl Drop for UvTcpStream {
         rtdebug!("closing tcp stream");
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell::new(task);
             do watcher.close {
                 let scheduler = Local::take::<Scheduler>();
@@ -284,11 +366,9 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
             rtdebug!("read: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut watcher = watcher;
             let task_cell = Cell::new(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -330,7 +410,7 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let mut watcher = watcher;
             let task_cell = Cell::new(task);
             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@@ -424,11 +504,9 @@ fn test_read_and_block() {
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |sched, task| {
                     let task = Cell::new(task);
-                    do Local::borrow::<Scheduler> |scheduler| {
-                        scheduler.enqueue_task(task.take());
-                    }
+                    sched.enqueue_task(task.take());
                 }
             }
 
diff --git a/src/libstd/rt/uvll.rs b/src/libstd/rt/uvll.rs
deleted file mode 100644
index 0d298bde6b5..00000000000
--- a/src/libstd/rt/uvll.rs
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-/*!
- * Low-level bindings to the libuv library.
- *
- * This module contains a set of direct, 'bare-metal' wrappers around
- * the libuv C-API.
- *
- * We're not bothering yet to redefine uv's structs as Rust structs
- * because they are quite large and change often between versions.
- * The maintenance burden is just too high. Instead we use the uv's
- * `uv_handle_size` and `uv_req_size` to find the correct size of the
- * structs and allocate them on the heap. This can be revisited later.
- *
- * There are also a collection of helper functions to ease interacting
- * with the low-level API.
- *
- * As new functionality, existant in uv.h, is added to the rust stdlib,
- * the mappings should be added in this module.
- */
-
-#[allow(non_camel_case_types)]; // C types
-
-use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
-use libc::{malloc, free};
-use prelude::*;
-
-pub struct uv_err_t {
-    code: c_int,
-    sys_errno_: c_int
-}
-
-pub struct uv_buf_t {
-    base: *u8,
-    len: libc::size_t,
-}
-
-pub type uv_handle_t = c_void;
-pub type uv_loop_t = c_void;
-pub type uv_idle_t = c_void;
-pub type uv_tcp_t = c_void;
-pub type uv_connect_t = c_void;
-pub type uv_write_t = c_void;
-pub type uv_async_t = c_void;
-pub type uv_timer_t = c_void;
-pub type uv_stream_t = c_void;
-pub type uv_fs_t = c_void;
-
-pub type uv_idle_cb = *u8;
-
-pub type sockaddr_in = c_void;
-pub type sockaddr_in6 = c_void;
-
-#[deriving(Eq)]
-pub enum uv_handle_type {
-    UV_UNKNOWN_HANDLE,
-    UV_ASYNC,
-    UV_CHECK,
-    UV_FS_EVENT,
-    UV_FS_POLL,
-    UV_HANDLE,
-    UV_IDLE,
-    UV_NAMED_PIPE,
-    UV_POLL,
-    UV_PREPARE,
-    UV_PROCESS,
-    UV_STREAM,
-    UV_TCP,
-    UV_TIMER,
-    UV_TTY,
-    UV_UDP,
-    UV_SIGNAL,
-    UV_FILE,
-    UV_HANDLE_TYPE_MAX
-}
-
-#[deriving(Eq)]
-pub enum uv_req_type {
-    UV_UNKNOWN_REQ,
-    UV_REQ,
-    UV_CONNECT,
-    UV_WRITE,
-    UV_SHUTDOWN,
-    UV_UDP_SEND,
-    UV_FS,
-    UV_WORK,
-    UV_GETADDRINFO,
-    UV_REQ_TYPE_MAX
-}
-
-pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void {
-    assert!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX);
-    let size = rust_uv_handle_size(handle as uint);
-    let p = malloc(size);
-    assert!(p.is_not_null());
-    return p;
-}
-
-pub unsafe fn free_handle(v: *c_void) {
-    free(v)
-}
-
-pub unsafe fn malloc_req(req: uv_req_type) -> *c_void {
-    assert!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX);
-    let size = rust_uv_req_size(req as uint);
-    let p = malloc(size);
-    assert!(p.is_not_null());
-    return p;
-}
-
-pub unsafe fn free_req(v: *c_void) {
-    free(v)
-}
-
-#[test]
-fn handle_sanity_check() {
-    unsafe {
-        assert!(UV_HANDLE_TYPE_MAX as uint == rust_uv_handle_type_max());
-    }
-}
-
-#[test]
-fn request_sanity_check() {
-    unsafe {
-        assert!(UV_REQ_TYPE_MAX as uint == rust_uv_req_type_max());
-    }
-}
-
-pub unsafe fn loop_new() -> *c_void {
-    return rust_uv_loop_new();
-}
-
-pub unsafe fn loop_delete(loop_handle: *c_void) {
-    rust_uv_loop_delete(loop_handle);
-}
-
-pub unsafe fn run(loop_handle: *c_void) {
-    rust_uv_run(loop_handle);
-}
-
-pub unsafe fn close<T>(handle: *T, cb: *u8) {
-    rust_uv_close(handle as *c_void, cb);
-}
-
-pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) {
-    rust_uv_walk(loop_handle, cb, arg);
-}
-
-pub unsafe fn idle_new() -> *uv_idle_t {
-    rust_uv_idle_new()
-}
-
-pub unsafe fn idle_delete(handle: *uv_idle_t) {
-    rust_uv_idle_delete(handle)
-}
-
-pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int {
-    rust_uv_idle_init(loop_handle, handle)
-}
-
-pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int {
-    rust_uv_idle_start(handle, cb)
-}
-
-pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int {
-    rust_uv_idle_stop(handle)
-}
-
-pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int {
-    return rust_uv_tcp_init(loop_handle, handle);
-}
-
-// FIXME ref #2064
-pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
-                          tcp_handle_ptr: *uv_tcp_t,
-                          addr_ptr: *sockaddr_in,
-                          after_connect_cb: *u8) -> c_int {
-    return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
-                                       after_connect_cb, addr_ptr);
-}
-// FIXME ref #2064
-pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t,
-                           tcp_handle_ptr: *uv_tcp_t,
-                           addr_ptr: *sockaddr_in6,
-                           after_connect_cb: *u8) -> c_int {
-    return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr,
-                                        after_connect_cb, addr_ptr);
-}
-// FIXME ref #2064
-pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int {
-    return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr);
-}
-// FIXME ref #2064
-pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int {
-    return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
-}
-
-pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int {
-    return rust_uv_tcp_getpeername(tcp_handle_ptr, name);
-}
-
-pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int {
-    return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
-}
-
-pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int {
-    return rust_uv_listen(stream as *c_void, backlog, cb);
-}
-
-pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int {
-    return rust_uv_accept(server as *c_void, client as *c_void);
-}
-
-pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u8) -> c_int {
-    let buf_ptr = vec::raw::to_ptr(buf_in);
-    let buf_cnt = buf_in.len() as i32;
-    return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
-}
-pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {
-    return rust_uv_read_start(stream as *c_void, on_alloc, on_read);
-}
-
-pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int {
-    return rust_uv_read_stop(stream as *c_void);
-}
-
-pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t {
-    return rust_uv_last_error(loop_handle);
-}
-
-pub unsafe fn strerror(err: *uv_err_t) -> *c_char {
-    return rust_uv_strerror(err);
-}
-pub unsafe fn err_name(err: *uv_err_t) -> *c_char {
-    return rust_uv_err_name(err);
-}
-
-pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int {
-    return rust_uv_async_init(loop_handle, async_handle, cb);
-}
-
-pub unsafe fn async_send(async_handle: *uv_async_t) {
-    return rust_uv_async_send(async_handle);
-}
-pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
-    let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t };
-    let out_buf_ptr = ptr::to_unsafe_ptr(&out_buf);
-    rust_uv_buf_init(out_buf_ptr, input, len as size_t);
-    return out_buf;
-}
-
-pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
-    return rust_uv_timer_init(loop_ptr, timer_ptr);
-}
-pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
-                          repeat: uint) -> c_int {
-    return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint);
-}
-pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
-    return rust_uv_timer_stop(timer_ptr);
-}
-
-pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
-    do str::as_c_str(ip) |ip_buf| {
-        rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
-    }
-}
-pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 {
-    do str::as_c_str(ip) |ip_buf| {
-        rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int)
-    }
-}
-
-pub unsafe fn free_ip4_addr(addr: *sockaddr_in) {
-    rust_uv_free_ip4_addr(addr);
-}
-
-pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) {
-    rust_uv_free_ip6_addr(addr);
-}
-
-// data access helpers
-pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
-    return rust_uv_get_loop_for_uv_handle(handle as *c_void);
-}
-pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t {
-    return rust_uv_get_stream_handle_from_connect_req(connect);
-}
-pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t {
-    return rust_uv_get_stream_handle_from_write_req(write_req);
-}
-pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void {
-    rust_uv_get_data_for_uv_loop(loop_ptr)
-}
-pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) {
-    rust_uv_set_data_for_uv_loop(loop_ptr, data);
-}
-pub unsafe fn get_data_for_uv_handle<T>(handle: *T) -> *c_void {
-    return rust_uv_get_data_for_uv_handle(handle as *c_void);
-}
-pub unsafe fn set_data_for_uv_handle<T, U>(handle: *T, data: *U) {
-    rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void);
-}
-pub unsafe fn get_data_for_req<T>(req: *T) -> *c_void {
-    return rust_uv_get_data_for_req(req as *c_void);
-}
-pub unsafe fn set_data_for_req<T, U>(req: *T, data: *U) {
-    rust_uv_set_data_for_req(req as *c_void, data as *c_void);
-}
-pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 {
-    return rust_uv_get_base_from_buf(buf);
-}
-pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t {
-    return rust_uv_get_len_from_buf(buf);
-}
-pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 {
-    return rust_uv_malloc_buf_base_of(suggested_size);
-}
-pub unsafe fn free_base_of_buf(buf: uv_buf_t) {
-    rust_uv_free_base_of_buf(buf);
-}
-
-pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str {
-    let err = last_error(uv_loop);
-    let err_ptr = ptr::to_unsafe_ptr(&err);
-    let err_name = str::raw::from_c_str(err_name(err_ptr));
-    let err_msg = str::raw::from_c_str(strerror(err_ptr));
-    return fmt!("LIBUV ERROR: name: %s msg: %s",
-                    err_name, err_msg);
-}
-
-pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data {
-    let err = last_error(uv_loop);
-    let err_ptr = ptr::to_unsafe_ptr(&err);
-    let err_name = str::raw::from_c_str(err_name(err_ptr));
-    let err_msg = str::raw::from_c_str(strerror(err_ptr));
-    uv_err_data { err_name: err_name, err_msg: err_msg }
-}
-
-pub struct uv_err_data {
-    err_name: ~str,
-    err_msg: ~str,
-}
-
-extern {
-
-    fn rust_uv_handle_size(type_: uintptr_t) -> size_t;
-    fn rust_uv_req_size(type_: uintptr_t) -> size_t;
-    fn rust_uv_handle_type_max() -> uintptr_t;
-    fn rust_uv_req_type_max() -> uintptr_t;
-
-    // libuv public API
-    fn rust_uv_loop_new() -> *c_void;
-    fn rust_uv_loop_delete(lp: *c_void);
-    fn rust_uv_run(loop_handle: *c_void);
-    fn rust_uv_close(handle: *c_void, cb: *u8);
-    fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void);
-
-    fn rust_uv_idle_new() -> *uv_idle_t;
-    fn rust_uv_idle_delete(handle: *uv_idle_t);
-    fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int;
-    fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int;
-    fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int;
-
-    fn rust_uv_async_send(handle: *uv_async_t);
-    fn rust_uv_async_init(loop_handle: *c_void,
-                          async_handle: *uv_async_t,
-                          cb: *u8) -> c_int;
-    fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
-    // FIXME ref #2604 .. ?
-    fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
-    fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
-    // FIXME ref #2064
-    fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
-    // FIXME ref #2064
-    fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
-    fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
-    fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
-    fn rust_uv_free_ip4_addr(addr: *sockaddr_in);
-    fn rust_uv_free_ip6_addr(addr: *sockaddr_in6);
-    fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int;
-    fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int;
-    fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint;
-    fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint;
-    // FIXME ref #2064
-    fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
-                           tcp_handle_ptr: *uv_tcp_t,
-                           after_cb: *u8,
-                           addr: *sockaddr_in) -> c_int;
-    // FIXME ref #2064
-    fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int;
-    // FIXME ref #2064
-    fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
-                            tcp_handle_ptr: *uv_tcp_t,
-                            after_cb: *u8,
-                            addr: *sockaddr_in6) -> c_int;
-    // FIXME ref #2064
-    fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int;
-    fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
-                               name: *sockaddr_in) -> c_int;
-    fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
-                                name: *sockaddr_in6) ->c_int;
-    fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
-    fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;
-    fn rust_uv_write(req: *c_void,
-                     stream: *c_void,
-                     buf_in: *uv_buf_t,
-                     buf_cnt: c_int,
-                     cb: *u8) -> c_int;
-    fn rust_uv_read_start(stream: *c_void,
-                          on_alloc: *u8,
-                          on_read: *u8) -> c_int;
-    fn rust_uv_read_stop(stream: *c_void) -> c_int;
-    fn rust_uv_timer_init(loop_handle: *c_void,
-                          timer_handle: *uv_timer_t) -> c_int;
-    fn rust_uv_timer_start(timer_handle: *uv_timer_t,
-                           cb: *u8,
-                           timeout: c_uint,
-                           repeat: c_uint) -> c_int;
-    fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
-
-    fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;
-    fn rust_uv_free_base_of_buf(buf: uv_buf_t);
-    fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t;
-    fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t;
-    fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void;
-    fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void;
-    fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void);
-    fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void;
-    fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void);
-    fn rust_uv_get_data_for_req(req: *c_void) -> *c_void;
-    fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
-    fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8;
-    fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t;
-}
diff --git a/src/libstd/sys.rs b/src/libstd/sys.rs
index 87e13e494aa..e49ad348542 100644
--- a/src/libstd/sys.rs
+++ b/src/libstd/sys.rs
@@ -213,11 +213,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
                 gc::cleanup_stack_for_failure();
 
                 let task = Local::unsafe_borrow::<Task>();
-                let unwinder: &mut Option<Unwinder> = &mut (*task).unwinder;
-                match *unwinder {
-                    Some(ref mut unwinder) => unwinder.begin_unwind(),
-                    None => abort!("failure without unwinder. aborting process")
-                }
+                (*task).unwinder.begin_unwind();
             }
         }
     }
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index 223afbce091..99858feab22 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -520,20 +520,9 @@ pub fn failing() -> bool {
             }
         }
         _ => {
-            let mut unwinding = false;
-            do Local::borrow::<Task> |local| {
-                unwinding = match local.unwinder {
-                    Some(unwinder) => {
-                        unwinder.unwinding
-                    }
-                    None => {
-                        // Because there is no unwinder we can't be unwinding.
-                        // (The process will abort on failure)
-                        false
-                    }
-                }
+            do Local::borrow::<Task, bool> |local| {
+                local.unwinder.unwinding
             }
-            return unwinding;
         }
     }
 }
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs
index 30ad4ee2a89..9df93d19d21 100644
--- a/src/libstd/task/spawn.rs
+++ b/src/libstd/task/spawn.rs
@@ -93,6 +93,7 @@ use util;
 use unstable::sync::{Exclusive, exclusive};
 use rt::local::Local;
 use iterator::{IteratorUtil};
+use rt::task::Task;
 
 #[cfg(test)] use task::default_task_opts;
 #[cfg(test)] use comm;
@@ -585,9 +586,14 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
 fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
     use rt::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
-    let task = ~Coroutine::new(&mut sched.stack_pool, f);
-    sched.schedule_new_task(task);
+    let task = ~Coroutine::with_task(&mut sched.stack_pool,
+                                     task, f);
+    sched.schedule_task(task);
 }
 
 fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {
diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs
index e75cf2c01c6..3d61c1fe144 100644
--- a/src/libstd/unstable/lang.rs
+++ b/src/libstd/unstable/lang.rs
@@ -245,7 +245,7 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
         }
         _ => {
             let mut alloc = ::ptr::null();
-            do Local::borrow::<Task> |task| {
+            do Local::borrow::<Task,()> |task| {
                 alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
             }
             return alloc;
@@ -263,7 +263,7 @@ pub unsafe fn local_free(ptr: *c_char) {
             rustrt::rust_upcall_free_noswitch(ptr);
         }
         _ => {
-            do Local::borrow::<Task> |task| {
+            do Local::borrow::<Task,()> |task| {
                 task.heap.free(ptr as *c_void);
             }
         }
diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs
index f0b178c6670..3cf46bec41a 100644
--- a/src/libstd/unstable/sync.rs
+++ b/src/libstd/unstable/sync.rs
@@ -205,8 +205,53 @@ extern {
     fn rust_unlock_little_lock(lock: rust_little_lock);
 }
 
+/* *********************************************************************/
+
+//FIXME: #5042 This should be replaced by proper atomic type
+pub struct AtomicUint {
+    priv inner: uint
+}
+
+impl AtomicUint {
+    pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } }
+    pub fn load(&self) -> uint {
+        unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint }
+    }
+    pub fn store(&mut self, val: uint) {
+        unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); }
+    }
+    pub fn add(&mut self, val: int) -> uint {
+        unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint }
+    }
+    pub fn cas(&mut self, old:uint, new: uint) -> uint {
+        unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint }
+    }
+}
+
+pub struct AtomicInt {
+    priv inner: int
+}
+
+impl AtomicInt {
+    pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } }
+    pub fn load(&self) -> int {
+        unsafe { intrinsics::atomic_load(&self.inner) }
+    }
+    pub fn store(&mut self, val: int) {
+        unsafe { intrinsics::atomic_store(&mut self.inner, val); }
+    }
+    pub fn add(&mut self, val: int) -> int {
+        unsafe { intrinsics::atomic_xadd(&mut self.inner, val) }
+    }
+    pub fn cas(&mut self, old: int, new: int) -> int {
+        unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) }
+    }
+}
+
+
 #[cfg(test)]
 mod tests {
+    use super::*;
     use comm;
     use super::exclusive;
     use task;
@@ -262,4 +307,28 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn atomic_int_smoke_test() {
+        let mut i = AtomicInt::new(0);
+        i.store(10);
+        assert!(i.load() == 10);
+        assert!(i.add(1) == 10);
+        assert!(i.load() == 11);
+        assert!(i.cas(11, 12) == 11);
+        assert!(i.cas(11, 13) == 12);
+        assert!(i.load() == 12);
+    }
+
+    #[test]
+    fn atomic_uint_smoke_test() {
+        let mut i = AtomicUint::new(0);
+        i.store(10);
+        assert!(i.load() == 10);
+        assert!(i.add(1) == 10);
+        assert!(i.load() == 11);
+        assert!(i.cas(11, 12) == 11);
+        assert!(i.cas(11, 13) == 12);
+        assert!(i.load() == 12);
+    }
 }
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 5e7357c9b7b..fe4e75fb8d2 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -930,6 +930,13 @@ rust_begin_unwind(uintptr_t token) {
 #endif
 }
 
+extern int get_num_cpus();
+
+extern "C" CDECL uintptr_t
+rust_get_num_cpus() {
+    return get_num_cpus();
+}
+
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_env.cpp b/src/rt/rust_env.cpp
index eaccb3b0a44..ff03ea817b8 100644
--- a/src/rt/rust_env.cpp
+++ b/src/rt/rust_env.cpp
@@ -40,7 +40,7 @@ rust_drop_env_lock() {
 }
 
 #if defined(__WIN32__)
-static int
+int
 get_num_cpus() {
     SYSTEM_INFO sysinfo;
     GetSystemInfo(&sysinfo);
@@ -48,7 +48,7 @@ get_num_cpus() {
     return (int) sysinfo.dwNumberOfProcessors;
 }
 #elif defined(__BSD__)
-static int
+int
 get_num_cpus() {
     /* swiped from http://stackoverflow.com/questions/150355/
        programmatically-find-the-number-of-cores-on-a-machine */
@@ -75,7 +75,7 @@ get_num_cpus() {
     return numCPU;
 }
 #elif defined(__GNUC__)
-static int
+int
 get_num_cpus() {
     return sysconf(_SC_NPROCESSORS_ONLN);
 }
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index e3e522aa7ce..9b49583519e 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -239,3 +239,4 @@ rust_valgrind_stack_deregister
 rust_take_env_lock
 rust_drop_env_lock
 rust_update_log_settings
+rust_get_num_cpus
\ No newline at end of file