about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-29 18:22:28 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-29 18:22:28 -0700
commitf4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4 (patch)
tree45f24486e6f2d6d39928462e40d2c4b5f3de2154 /src/libstd/rt
parentbd30285c8467b33b6fea16be79198f7492107af3 (diff)
parent134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 (diff)
downloadrust-f4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4.tar.gz
rust-f4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4.zip
Merge remote-tracking branch 'brson/io' into incoming
Conflicts:
	src/libstd/rt/sched.rs
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs23
-rw-r--r--src/libstd/rt/local.rs9
-rw-r--r--src/libstd/rt/message_queue.rs3
-rw-r--r--src/libstd/rt/mod.rs20
-rw-r--r--src/libstd/rt/rtio.rs11
-rw-r--r--src/libstd/rt/sched.rs329
-rw-r--r--src/libstd/rt/sleeper_list.rs55
-rw-r--r--src/libstd/rt/test.rs115
-rw-r--r--src/libstd/rt/tube.rs34
-rw-r--r--src/libstd/rt/uv/async.rs105
-rw-r--r--src/libstd/rt/uv/idle.rs62
-rw-r--r--src/libstd/rt/uv/mod.rs63
-rw-r--r--src/libstd/rt/uv/uvio.rs118
13 files changed, 747 insertions, 200 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 54db03b6069..ebfa9e263ef 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -22,6 +22,7 @@ use ops::Drop;
 use kinds::Owned;
 use rt::sched::{Scheduler, Coroutine};
 use rt::local::Local;
+use rt::rtio::EventLoop;
 use unstable::intrinsics::{atomic_xchg, atomic_load};
 use util::Void;
 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@@ -158,7 +159,7 @@ impl<T> PortOne<T> {
 
         // Switch to the scheduler to put the ~Task into the Packet state.
         let sched = Local::take::<Scheduler>();
-        do sched.deschedule_running_task_and_then |task| {
+        do sched.deschedule_running_task_and_then |sched, task| {
             unsafe {
                 // Atomically swap the task pointer into the Packet state, issuing
                 // an acquire barrier to prevent reordering of the subsequent read
@@ -172,9 +173,15 @@ impl<T> PortOne<T> {
                     }
                     STATE_ONE => {
                         // Channel is closed. Switch back and check the data.
+                        // NB: We have to drop back into the scheduler event loop here
+                        // instead of switching immediately back or we could end up
+                        // triggering infinite recursion on the scheduler's stack.
                         let task: ~Coroutine = cast::transmute(task_as_state);
-                        let sched = Local::take::<Scheduler>();
-                        sched.resume_task_immediately(task);
+                        let task = Cell(task);
+                        do sched.event_loop.callback {
+                            let sched = Local::take::<Scheduler>();
+                            sched.resume_task_immediately(task.take());
+                        }
                     }
                     _ => util::unreachable()
                 }
@@ -614,5 +621,15 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn recv_a_lot() {
+        // Regression test that we don't run out of stack in scheduler context
+        do run_in_newsched_task {
+            let (port, chan) = stream();
+            for 10000.times { chan.send(()) }
+            for 10000.times { port.recv() }
+        }
+    }
 }
 
diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs
index 313123c38b5..ffff54f00bb 100644
--- a/src/libstd/rt/local.rs
+++ b/src/libstd/rt/local.rs
@@ -85,30 +85,31 @@ impl Local for IoFactoryObject {
 
 #[cfg(test)]
 mod test {
+    use rt::test::*;
     use rt::sched::Scheduler;
     use rt::uv::uvio::UvEventLoop;
     use super::*;
 
     #[test]
     fn thread_local_scheduler_smoke_test() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
     }
 
     #[test]
     fn thread_local_scheduler_two_instances() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         let _scheduler: ~Scheduler = Local::take();
     }
 
     #[test]
     fn borrow_smoke_test() {
-        let scheduler = ~UvEventLoop::new_scheduler();
+        let scheduler = ~new_test_uv_sched();
         Local::put(scheduler);
         unsafe {
             let _scheduler: *mut Scheduler = Local::unsafe_borrow();
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
index eaab9288ac8..21711bbe84c 100644
--- a/src/libstd/rt/message_queue.rs
+++ b/src/libstd/rt/message_queue.rs
@@ -8,6 +8,9 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+//! A concurrent queue that supports multiple producers and a
+//! single consumer.
+
 use container::Container;
 use kinds::Owned;
 use vec::OwnedVector;
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 2fac1df01a4..1113d7abe7d 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -88,6 +88,9 @@ mod work_queue;
 /// A parallel queue.
 mod message_queue;
 
+/// A parallel data structure for tracking sleeping schedulers.
+mod sleeper_list;
+
 /// Stack segments and caching.
 mod stack;
 
@@ -145,12 +148,17 @@ pub mod thread_local_storage;
 pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
 
     use self::sched::{Scheduler, Coroutine};
+    use self::work_queue::WorkQueue;
     use self::uv::uvio::UvEventLoop;
+    use self::sleeper_list::SleeperList;
 
     init(crate_map);
 
     let loop_ = ~UvEventLoop::new();
-    let mut sched = ~Scheduler::new(loop_);
+    let work_queue = WorkQueue::new();
+    let sleepers = SleeperList::new();
+    let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
+    sched.no_sleep = true;
     let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
 
     sched.enqueue_task(main_task);
@@ -221,20 +229,18 @@ fn test_context() {
     use rt::uv::uvio::UvEventLoop;
     use cell::Cell;
     use rt::local::Local;
+    use rt::test::new_test_uv_sched;
 
     assert_eq!(context(), OldTaskContext);
     do run_in_bare_thread {
         assert_eq!(context(), GlobalContext);
-        let mut sched = ~UvEventLoop::new_scheduler();
+        let mut sched = ~new_test_uv_sched();
         let task = ~do Coroutine::new(&mut sched.stack_pool) {
             assert_eq!(context(), TaskContext);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then() |task| {
+            do sched.deschedule_running_task_and_then() |sched, task| {
                 assert_eq!(context(), SchedulerContext);
-                let task = Cell(task);
-                do Local::borrow::<Scheduler> |sched| {
-                    sched.enqueue_task(task.take());
-                }
+                sched.enqueue_task(task);
             }
         };
         sched.enqueue_task(task);
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 4b5eda22ff5..fa657555f3a 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -18,6 +18,7 @@ use rt::uv::uvio;
 // XXX: ~object doesn't work currently so these are some placeholder
 // types to use instead
 pub type EventLoopObject = uvio::UvEventLoop;
+pub type RemoteCallbackObject = uvio::UvRemoteCallback;
 pub type IoFactoryObject = uvio::UvIoFactory;
 pub type RtioTcpStreamObject = uvio::UvTcpStream;
 pub type RtioTcpListenerObject = uvio::UvTcpListener;
@@ -26,10 +27,20 @@ pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
     fn callback_ms(&mut self, ms: u64, ~fn());
+    fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
     /// The asynchronous I/O services. Not all event loops may provide one
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
 }
 
+pub trait RemoteCallback {
+    /// Trigger the remote callback. Note that the number of times the callback
+    /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
+    /// the callback will be called at least once, but multiple callbacks may be coalesced
+    /// and callbacks may be called more often requested. Destruction also triggers the
+    /// callback.
+    fn fire(&mut self);
+}
+
 pub trait IoFactory {
     fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
     fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 2d9cdaddc84..089c95cd7cd 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -12,21 +12,44 @@ use option::*;
 use sys;
 use cast::transmute;
 use cell::Cell;
+use clone::Clone;
 
+use super::sleeper_list::SleeperList;
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool, StackSegment};
-use super::rtio::{EventLoop, EventLoopObject};
+use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
 use super::context::Context;
 use super::task::Task;
+use super::message_queue::MessageQueue;
 use rt::local_ptr;
 use rt::local::Local;
+use rt::rtio::{IoFactoryObject, RemoteCallback};
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
 /// thread local storage and the running task is owned by the
 /// scheduler.
 pub struct Scheduler {
+    /// A queue of available work. Under a work-stealing policy there
+    /// is one per Scheduler.
     priv work_queue: WorkQueue<~Coroutine>,
+    /// The queue of incoming messages from other schedulers.
+    /// These are enqueued by SchedHandles after which a remote callback
+    /// is triggered to handle the message.
+    priv message_queue: MessageQueue<SchedMessage>,
+    /// A shared list of sleeping schedulers. We'll use this to wake
+    /// up schedulers when pushing work onto the work queue.
+    priv sleeper_list: SleeperList,
+    /// Indicates that we have previously pushed a handle onto the
+    /// SleeperList but have not yet received the Wake message.
+    /// Being `true` does not necessarily mean that the scheduler is
+    /// not active since there are multiple event sources that may
+    /// wake the scheduler. It just prevents the scheduler from pushing
+    /// multiple handles onto the sleeper list.
+    priv sleepy: bool,
+    /// A flag to indicate we've received the shutdown message and should
+    /// no longer try to go to sleep, but exit instead.
+    no_sleep: bool,
     stack_pool: StackPool,
     /// The event loop used to drive the scheduler and perform I/O
     event_loop: ~EventLoopObject,
@@ -40,16 +63,25 @@ pub struct Scheduler {
     priv cleanup_job: Option<CleanupJob>
 }
 
-// XXX: Some hacks to put a &fn in Scheduler without borrowck
-// complaining
-type UnsafeTaskReceiver = sys::Closure;
-trait ClosureConverter {
-    fn from_fn(&fn(~Coroutine)) -> Self;
-    fn to_fn(self) -> &fn(~Coroutine);
+pub struct SchedHandle {
+    priv remote: ~RemoteCallbackObject,
+    priv queue: MessageQueue<SchedMessage>
 }
-impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
-    fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
+
+pub struct Coroutine {
+    /// The segment of stack on which the task is currently running or,
+    /// if the task is blocked, on which the task will resume execution
+    priv current_stack_segment: StackSegment,
+    /// These are always valid when the task is not running, unless
+    /// the task is dead
+    priv saved_context: Context,
+    /// The heap, GC, unwinding, local storage, logging
+    task: ~Task
+}
+
+pub enum SchedMessage {
+    Wake,
+    Shutdown
 }
 
 enum CleanupJob {
@@ -61,18 +93,25 @@ pub impl Scheduler {
 
     fn in_task_context(&self) -> bool { self.current_task.is_some() }
 
-    fn new(event_loop: ~EventLoopObject) -> Scheduler {
+    fn new(event_loop: ~EventLoopObject,
+           work_queue: WorkQueue<~Coroutine>,
+           sleeper_list: SleeperList)
+        -> Scheduler {
 
         // Lazily initialize the runtime TLS key
         local_ptr::init_tls_key();
 
         Scheduler {
+            sleeper_list: sleeper_list,
+            message_queue: MessageQueue::new(),
+            sleepy: false,
+            no_sleep: false,
             event_loop: event_loop,
-            work_queue: WorkQueue::new(),
+            work_queue: work_queue,
             stack_pool: StackPool::new(),
             saved_context: Context::empty(),
             current_task: None,
-            cleanup_job: None
+            cleanup_job: None,
         }
     }
 
@@ -102,6 +141,53 @@ pub impl Scheduler {
         return sched;
     }
 
+    fn run_sched_once() {
+
+        let sched = Local::take::<Scheduler>();
+        if sched.interpret_message_queue() {
+            // We performed a scheduling action. There may be other work
+            // to do yet, so let's try again later.
+            let mut sched = Local::take::<Scheduler>();
+            sched.event_loop.callback(Scheduler::run_sched_once);
+            Local::put(sched);
+            return;
+        }
+
+        let sched = Local::take::<Scheduler>();
+        if sched.resume_task_from_queue() {
+            // We performed a scheduling action. There may be other work
+            // to do yet, so let's try again later.
+            let mut sched = Local::take::<Scheduler>();
+            sched.event_loop.callback(Scheduler::run_sched_once);
+            Local::put(sched);
+            return;
+        }
+
+        // If we got here then there was no work to do.
+        // Generate a SchedHandle and push it to the sleeper list so
+        // somebody can wake us up later.
+        rtdebug!("no work to do");
+        let mut sched = Local::take::<Scheduler>();
+        if !sched.sleepy && !sched.no_sleep {
+            rtdebug!("sleeping");
+            sched.sleepy = true;
+            let handle = sched.make_handle();
+            sched.sleeper_list.push(handle);
+        } else {
+            rtdebug!("not sleeping");
+        }
+        Local::put(sched);
+    }
+
+    fn make_handle(&mut self) -> SchedHandle {
+        let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
+
+        return SchedHandle {
+            remote: remote,
+            queue: self.message_queue.clone()
+        };
+    }
+
     /// Schedule a task to be executed later.
     ///
     /// Pushes the task onto the work stealing queue and tells the event loop
@@ -109,17 +195,63 @@ pub impl Scheduler {
     /// directly.
     fn enqueue_task(&mut self, task: ~Coroutine) {
         self.work_queue.push(task);
-        self.event_loop.callback(resume_task_from_queue);
+        self.event_loop.callback(Scheduler::run_sched_once);
 
-        fn resume_task_from_queue() {
-            let scheduler = Local::take::<Scheduler>();
-            scheduler.resume_task_from_queue();
+        // We've made work available. Notify a sleeping scheduler.
+        match self.sleeper_list.pop() {
+            Some(handle) => {
+                let mut handle = handle;
+                handle.send(Wake)
+            }
+            None => (/* pass */)
         }
     }
 
     // * Scheduler-context operations
 
-    fn resume_task_from_queue(~self) {
+    fn interpret_message_queue(~self) -> bool {
+        assert!(!self.in_task_context());
+
+        rtdebug!("looking for scheduler messages");
+
+        let mut this = self;
+        match this.message_queue.pop() {
+            Some(Wake) => {
+                rtdebug!("recv Wake message");
+                this.sleepy = false;
+                Local::put(this);
+                return true;
+            }
+            Some(Shutdown) => {
+                rtdebug!("recv Shutdown message");
+                if this.sleepy {
+                    // There may be an outstanding handle on the sleeper list.
+                    // Pop them all to make sure that's not the case.
+                    loop {
+                        match this.sleeper_list.pop() {
+                            Some(handle) => {
+                                let mut handle = handle;
+                                handle.send(Wake);
+                            }
+                            None => (/* pass */)
+                        }
+                    }
+                }
+                // No more sleeping. After there are no outstanding event loop
+                // references we will shut down.
+                this.no_sleep = true;
+                this.sleepy = false;
+                Local::put(this);
+                return true;
+            }
+            None => {
+                Local::put(this);
+                return false;
+            }
+        }
+    }
+
+    fn resume_task_from_queue(~self) -> bool {
         assert!(!self.in_task_context());
 
         rtdebug!("looking in work queue for task to schedule");
@@ -129,10 +261,12 @@ pub impl Scheduler {
             Some(task) => {
                 rtdebug!("resuming task from work queue");
                 this.resume_task_immediately(task);
+                return true;
             }
             None => {
                 rtdebug!("no tasks in queue");
                 Local::put(this);
+                return false;
             }
         }
     }
@@ -146,11 +280,9 @@ pub impl Scheduler {
 
         rtdebug!("ending running task");
 
-        do self.deschedule_running_task_and_then |dead_task| {
+        do self.deschedule_running_task_and_then |sched, dead_task| {
             let dead_task = Cell(dead_task);
-            do Local::borrow::<Scheduler> |sched| {
-                dead_task.take().recycle(&mut sched.stack_pool);
-            }
+            dead_task.take().recycle(&mut sched.stack_pool);
         }
 
         abort!("control reached end of task");
@@ -159,22 +291,18 @@ pub impl Scheduler {
     fn schedule_new_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
+        do self.switch_running_tasks_and_then(task) |sched, last_task| {
             let last_task = Cell(last_task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(last_task.take());
-            }
+            sched.enqueue_task(last_task.take());
         }
     }
 
     fn schedule_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
+        do self.switch_running_tasks_and_then(task) |sched, last_task| {
             let last_task = Cell(last_task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(last_task.take());
-            }
+            sched.enqueue_task(last_task.take());
         }
     }
 
@@ -218,7 +346,11 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
+    ///
+    /// This passes a Scheduler pointer to the fn after the context switch
+    /// in order to prevent that fn from performing further scheduling operations.
+    /// Doing further scheduling could easily result in infinite recursion.
+    fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
@@ -226,7 +358,8 @@ pub impl Scheduler {
 
         unsafe {
             let blocked_task = this.current_task.swap_unwrap();
-            let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
+            let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine),
+                                            &fn(&mut Scheduler, ~Coroutine)>(f);
             let f_opaque = ClosureConverter::from_fn(f_fake_region);
             this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
         }
@@ -248,14 +381,18 @@ pub impl Scheduler {
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
+    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine,
+                                     f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("switching tasks");
 
         let old_running_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
+        let f_fake_region = unsafe {
+            transmute::<&fn(&mut Scheduler, ~Coroutine),
+                        &fn(&mut Scheduler, ~Coroutine)>(f)
+        };
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
         this.current_task = Some(next_task);
@@ -292,7 +429,7 @@ pub impl Scheduler {
         let cleanup_job = self.cleanup_job.swap_unwrap();
         match cleanup_job {
             DoNothing => { }
-            GiveTask(task, f) => (f.to_fn())(task)
+            GiveTask(task, f) => (f.to_fn())(self, task)
         }
     }
 
@@ -336,17 +473,11 @@ pub impl Scheduler {
     }
 }
 
-static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
-
-pub struct Coroutine {
-    /// The segment of stack on which the task is currently running or,
-    /// if the task is blocked, on which the task will resume execution
-    priv current_stack_segment: StackSegment,
-    /// These are always valid when the task is not running, unless
-    /// the task is dead
-    priv saved_context: Context,
-    /// The heap, GC, unwinding, local storage, logging
-    task: ~Task
+impl SchedHandle {
+    pub fn send(&mut self, msg: SchedMessage) {
+        self.queue.push(msg);
+        self.remote.fire();
+    }
 }
 
 pub impl Coroutine {
@@ -357,6 +488,9 @@ pub impl Coroutine {
     fn with_task(stack_pool: &mut StackPool,
                   task: ~Task,
                   start: ~fn()) -> Coroutine {
+
+        static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+
         let start = Coroutine::build_start_wrapper(start);
         let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
         // NB: Context holds a pointer to that ~fn
@@ -400,6 +534,18 @@ pub impl Coroutine {
     }
 }
 
+// XXX: Some hacks to put a &fn in Scheduler without borrowck
+// complaining
+type UnsafeTaskReceiver = sys::Closure;
+trait ClosureConverter {
+    fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self;
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine);
+}
+impl ClosureConverter for UnsafeTaskReceiver {
+    fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } }
+}
+
 #[cfg(test)]
 mod test {
     use int;
@@ -410,6 +556,7 @@ mod test {
     use rt::local::Local;
     use rt::test::*;
     use super::*;
+    use rt::thread::Thread;
 
     #[test]
     fn test_simple_scheduling() {
@@ -417,7 +564,7 @@ mod test {
             let mut task_ran = false;
             let task_ran_ptr: *mut bool = &mut task_ran;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
             let task = ~do Coroutine::new(&mut sched.stack_pool) {
                 unsafe { *task_ran_ptr = true; }
             };
@@ -434,7 +581,7 @@ mod test {
             let mut task_count = 0;
             let task_count_ptr: *mut int = &mut task_count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
             for int::range(0, total) |_| {
                 let task = ~do Coroutine::new(&mut sched.stack_pool) {
                     unsafe { *task_count_ptr = *task_count_ptr + 1; }
@@ -452,7 +599,7 @@ mod test {
             let mut count = 0;
             let count_ptr: *mut int = &mut count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
             let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
                 let mut sched = Local::take::<Scheduler>();
@@ -460,11 +607,9 @@ mod test {
                     unsafe { *count_ptr = *count_ptr + 1; }
                 };
                 // Context switch directly to the new task
-                do sched.switch_running_tasks_and_then(task2) |task1| {
+                do sched.switch_running_tasks_and_then(task2) |sched, task1| {
                     let task1 = Cell(task1);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(task1.take());
-                    }
+                    sched.enqueue_task(task1.take());
                 }
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -481,7 +626,7 @@ mod test {
             let mut count = 0;
             let count_ptr: *mut int = &mut count;
 
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
 
             let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
                 run_task(count_ptr);
@@ -510,16 +655,14 @@ mod test {
     #[test]
     fn test_block_task() {
         do run_in_bare_thread {
-            let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~new_test_uv_sched();
             let task = ~do Coroutine::new(&mut sched.stack_pool) {
                 let sched = Local::take::<Scheduler>();
                 assert!(sched.in_task_context());
-                do sched.deschedule_running_task_and_then() |task| {
+                do sched.deschedule_running_task_and_then() |sched, task| {
                     let task = Cell(task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        assert!(!sched.in_task_context());
-                        sched.enqueue_task(task.take());
-                    }
+                    assert!(!sched.in_task_context());
+                    sched.enqueue_task(task.take());
                 }
             };
             sched.enqueue_task(task);
@@ -536,8 +679,7 @@ mod test {
         do run_in_newsched_task {
             do spawn {
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
-                    let mut sched = Local::take::<Scheduler>();
+                do sched.deschedule_running_task_and_then |sched, task| {
                     let task = Cell(task);
                     do sched.event_loop.callback_ms(10) {
                         rtdebug!("in callback");
@@ -545,9 +687,70 @@ mod test {
                         sched.enqueue_task(task.take());
                         Local::put(sched);
                     }
-                    Local::put(sched);
                 }
             }
         }
     }
+
+    #[test]
+    fn handle() {
+        use rt::comm::*;
+
+        do run_in_bare_thread {
+            let (port, chan) = oneshot::<()>();
+            let port_cell = Cell(port);
+            let chan_cell = Cell(chan);
+            let mut sched1 = ~new_test_uv_sched();
+            let handle1 = sched1.make_handle();
+            let handle1_cell = Cell(handle1);
+            let task1 = ~do Coroutine::new(&mut sched1.stack_pool) {
+                chan_cell.take().send(());
+            };
+            sched1.enqueue_task(task1);
+
+            let mut sched2 = ~new_test_uv_sched();
+            let task2 = ~do Coroutine::new(&mut sched2.stack_pool) {
+                port_cell.take().recv();
+                // Release the other scheduler's handle so it can exit
+                handle1_cell.take();
+            };
+            sched2.enqueue_task(task2);
+
+            let sched1_cell = Cell(sched1);
+            let _thread1 = do Thread::start {
+                let mut sched1 = sched1_cell.take();
+                sched1.run();
+            };
+
+            let sched2_cell = Cell(sched2);
+            let _thread2 = do Thread::start {
+                let mut sched2 = sched2_cell.take();
+                sched2.run();
+            };
+        }
+    }
+
+    #[test]
+    fn multithreading() {
+        use rt::comm::*;
+        use iter::Times;
+        use vec::OwnedVector;
+        use container::Container;
+
+        do run_in_mt_newsched_task {
+            let mut ports = ~[];
+            for 10.times {
+                let (port, chan) = oneshot();
+                let chan_cell = Cell(chan);
+                do spawntask_later {
+                    chan_cell.take().send(());
+                }
+                ports.push(port);
+            }
+
+            while !ports.is_empty() {
+                ports.pop().recv();
+            }
+        }
+    }
 }
diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs
new file mode 100644
index 00000000000..dfcac8eb088
--- /dev/null
+++ b/src/libstd/rt/sleeper_list.rs
@@ -0,0 +1,55 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Maintains a shared list of sleeping schedulers. Schedulers
+//! use this to wake each other up.
+
+use container::Container;
+use vec::OwnedVector;
+use option::{Option, Some, None};
+use cell::Cell;
+use unstable::sync::{Exclusive, exclusive};
+use rt::sched::{Scheduler, SchedHandle};
+use clone::Clone;
+
+pub struct SleeperList {
+    priv stack: ~Exclusive<~[SchedHandle]>
+}
+
+impl SleeperList {
+    pub fn new() -> SleeperList {
+        SleeperList {
+            stack: ~exclusive(~[])
+        }
+    }
+
+    pub fn push(&mut self, handle: SchedHandle) {
+        let handle = Cell(handle);
+        self.stack.with(|s| s.push(handle.take()));
+    }
+
+    pub fn pop(&mut self) -> Option<SchedHandle> {
+        do self.stack.with |s| {
+            if !s.is_empty() {
+                Some(s.pop())
+            } else {
+                None
+            }
+        }
+    }
+}
+
+impl Clone for SleeperList {
+    fn clone(&self) -> SleeperList {
+        SleeperList {
+            stack: self.stack.clone()
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index c60ae2bfeff..16b0aef5e26 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -9,13 +9,32 @@
 // except according to those terms.
 
 use uint;
-use option::*;
+use option::{Option, Some, None};
 use cell::Cell;
+use clone::Clone;
+use container::Container;
+use old_iter::MutableIter;
+use vec::OwnedVector;
 use result::{Result, Ok, Err};
+use unstable::run_in_bare_thread;
 use super::io::net::ip::{IpAddr, Ipv4};
 use rt::task::Task;
 use rt::thread::Thread;
 use rt::local::Local;
+use rt::sched::{Scheduler, Coroutine};
+use rt::sleeper_list::SleeperList;
+use rt::work_queue::WorkQueue;
+
+pub fn new_test_uv_sched() -> Scheduler {
+    use rt::uv::uvio::UvEventLoop;
+    use rt::work_queue::WorkQueue;
+    use rt::sleeper_list::SleeperList;
+
+    let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new());
+    // Don't wait for the Shutdown message
+    sched.no_sleep = true;
+    return sched;
+}
 
 /// Creates a new scheduler in a new thread and runs a task in it,
 /// then waits for the scheduler to exit. Failure of the task
@@ -28,7 +47,7 @@ pub fn run_in_newsched_task(f: ~fn()) {
     let f = Cell(f);
 
     do run_in_bare_thread {
-        let mut sched = ~UvEventLoop::new_scheduler();
+        let mut sched = ~new_test_uv_sched();
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                          ~Task::without_unwinding(),
                                          f.take());
@@ -37,6 +56,64 @@ pub fn run_in_newsched_task(f: ~fn()) {
     }
 }
 
+/// Create more than one scheduler and run a function in a task
+/// in one of the schedulers. The schedulers will stay alive
+/// until the function `f` returns.
+pub fn run_in_mt_newsched_task(f: ~fn()) {
+    use rt::uv::uvio::UvEventLoop;
+    use rt::sched::Shutdown;
+
+    let f_cell = Cell(f);
+
+    do run_in_bare_thread {
+        static N: uint = 2;
+
+        let sleepers = SleeperList::new();
+        let work_queue = WorkQueue::new();
+
+        let mut handles = ~[];
+        let mut scheds = ~[];
+
+        for uint::range(0, N) |i| {
+            let loop_ = ~UvEventLoop::new();
+            let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+            let handle = sched.make_handle();
+            handles.push(handle);
+            scheds.push(sched);
+        }
+
+        let f_cell = Cell(f_cell.take());
+        let handles = Cell(handles);
+        let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) {
+            f_cell.take()();
+
+            let mut handles = handles.take();
+            // Tell schedulers to exit
+            for handles.each_mut |handle| {
+                handle.send(Shutdown);
+            }
+        };
+
+        scheds[0].enqueue_task(main_task);
+
+        let mut threads = ~[];
+
+        while !scheds.is_empty() {
+            let sched = scheds.pop();
+            let sched_cell = Cell(sched);
+            let thread = do Thread::start {
+                let mut sched = sched_cell.take();
+                sched.run();
+            };
+
+            threads.push(thread);
+        }
+
+        // Wait for schedulers
+        let _threads = threads;
+    }
+}
+
 /// Test tasks will abort on failure instead of unwinding
 pub fn spawntask(f: ~fn()) {
     use super::sched::*;
@@ -45,11 +122,7 @@ pub fn spawntask(f: ~fn()) {
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                      ~Task::without_unwinding(),
                                      f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell(task);
-        let sched = Local::take::<Scheduler>();
-        sched.schedule_new_task(task.take());
-    }
+    sched.schedule_new_task(task);
 }
 
 /// Create a new task and run it right now. Aborts on failure
@@ -60,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) {
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                      ~Task::without_unwinding(),
                                      f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell(task);
-        do Local::borrow::<Scheduler> |sched| {
-            sched.enqueue_task(task.take());
-        }
+    do sched.switch_running_tasks_and_then(task) |sched, task| {
+        sched.enqueue_task(task);
     }
 }
 
@@ -95,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) {
                                      f);
 
     if run_now {
-        do sched.switch_running_tasks_and_then(task) |task| {
-            let task = Cell(task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(task.take());
-            }
+        do sched.switch_running_tasks_and_then(task) |sched, task| {
+            sched.enqueue_task(task);
         }
     } else {
         sched.enqueue_task(task);
@@ -122,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     // Switch to the scheduler
     let f = Cell(Cell(f));
     let sched = Local::take::<Scheduler>();
-    do sched.deschedule_running_task_and_then() |old_task| {
+    do sched.deschedule_running_task_and_then() |sched, old_task| {
         let old_task = Cell(old_task);
         let f = f.take();
-        let mut sched = Local::take::<Scheduler>();
         let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
             do (|| {
                 (f.take())()
@@ -133,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
                 // Check for failure then resume the parent task
                 unsafe { *failed_ptr = task::failing(); }
                 let sched = Local::take::<Scheduler>();
-                do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
-                    let new_task = Cell(new_task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(new_task.take());
-                    }
+                do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
+                    sched.enqueue_task(new_task);
                 }
             }
         };
 
-        sched.resume_task_immediately(new_task);
+        sched.enqueue_task(new_task);
     }
 
     if !failed { Ok(()) } else { Err(()) }
@@ -155,7 +218,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread {
 
     let f = Cell(f);
     let thread = do Thread::start {
-        let mut sched = ~UvEventLoop::new_scheduler();
+        let mut sched = ~new_test_uv_sched();
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                          ~Task::without_unwinding(),
                                          f.take());
diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs
index b2f475a6966..4482a92d916 100644
--- a/src/libstd/rt/tube.rs
+++ b/src/libstd/rt/tube.rs
@@ -72,7 +72,7 @@ impl<T> Tube<T> {
                 assert!(self.p.refcount() > 1); // There better be somebody to wake us up
                 assert!((*state).blocked_task.is_none());
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
+                do sched.deschedule_running_task_and_then |_, task| {
                     (*state).blocked_task = Some(task);
                 }
                 rtdebug!("waking after tube recv");
@@ -107,11 +107,10 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone_cell = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 let mut tube_clone = tube_clone_cell.take();
                 tube_clone.send(1);
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -123,21 +122,17 @@ mod test {
         do run_in_newsched_task {
             let mut tube: Tube<int> = Tube::new();
             let tube_clone = tube.clone();
-            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let tube_clone = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
-                let tube_clone = tube_clone.take();
-                do Local::borrow::<Scheduler> |sched| {
-                    let tube_clone = tube_clone.take();
-                    do sched.event_loop.callback {
-                        let mut tube_clone = tube_clone.take();
-                        // The task should be blocked on this now and
-                        // sending will wake it up.
-                        tube_clone.send(1);
-                    }
+            do sched.deschedule_running_task_and_then |sched, task| {
+                let tube_clone = Cell(tube_clone.take());
+                do sched.event_loop.callback {
+                    let mut tube_clone = tube_clone.take();
+                    // The task should be blocked on this now and
+                    // sending will wake it up.
+                    tube_clone.send(1);
                 }
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -153,7 +148,7 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 callback_send(tube_clone.take(), 0);
 
                 fn callback_send(tube: Tube<int>, i: int) {
@@ -172,8 +167,7 @@ mod test {
                     }
                 }
 
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             for int::range(0, MAX) |i| {
diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs
new file mode 100644
index 00000000000..6ed06cc10b7
--- /dev/null
+++ b/src/libstd/rt/uv/async.rs
@@ -0,0 +1,105 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::{c_int, c_void};
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::uvll::UV_ASYNC;
+use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
+use rt::uv::WatcherInterop;
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct AsyncWatcher(*uvll::uv_async_t);
+impl Watcher for AsyncWatcher { }
+
+impl AsyncWatcher {
+    pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
+        unsafe {
+            let handle = uvll::malloc_handle(UV_ASYNC);
+            assert!(handle.is_not_null());
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            let data = watcher.get_watcher_data();
+            data.async_cb = Some(cb);
+            assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb));
+            return watcher;
+        }
+
+        extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            let status = status_to_maybe_uv_error(watcher.native_handle(), status);
+            let data = watcher.get_watcher_data();
+            let cb = data.async_cb.get_ref();
+            (*cb)(watcher, status);
+        }
+    }
+
+    pub fn send(&mut self) {
+        unsafe {
+            let handle = self.native_handle();
+            uvll::async_send(handle);
+        }
+    }
+
+    pub fn close(self, cb: NullCallback) {
+        let mut this = self;
+        let data = this.get_watcher_data();
+        assert!(data.close_cb.is_none());
+        data.close_cb = Some(cb);
+
+        unsafe {
+            uvll::close(self.native_handle(), close_cb);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_stream_t) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            watcher.drop_watcher_data();
+            unsafe { uvll::free_handle(handle as *c_void); }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
+    fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher {
+        AsyncWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_async_t {
+        match self { &AsyncWatcher(ptr) => ptr }
+    }
+}
+
+#[cfg(test)]
+mod test {
+
+    use super::*;
+    use rt::uv::Loop;
+    use unstable::run_in_bare_thread;
+    use rt::thread::Thread;
+    use cell::Cell;
+
+    #[test]
+    fn smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
+            let watcher_cell = Cell(watcher);
+            let _thread = do Thread::start {
+                let mut watcher = watcher_cell.take();
+                watcher.send();
+            };
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
index 2cf0b5c4872..a81ab48696a 100644
--- a/src/libstd/rt/uv/idle.rs
+++ b/src/libstd/rt/uv/idle.rs
@@ -89,3 +89,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
         match self { &IdleWatcher(ptr) => ptr }
     }
 }
+
+#[cfg(test)]
+mod test {
+
+    use rt::uv::Loop;
+    use super::*;
+    use unstable::run_in_bare_thread;
+
+    #[test]
+    #[ignore(reason = "valgrind - loop destroyed before watcher?")]
+    fn idle_new_then_close() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let idle_watcher = { IdleWatcher::new(&mut loop_) };
+            idle_watcher.close(||());
+        }
+    }
+
+    #[test]
+    fn idle_smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            let mut count = 10;
+            let count_ptr: *mut int = &mut count;
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                if unsafe { *count_ptr == 10 } {
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                } else {
+                    unsafe { *count_ptr = *count_ptr + 1; }
+                }
+            }
+            loop_.run();
+            loop_.close();
+            assert_eq!(count, 10);
+        }
+    }
+
+    #[test]
+    fn idle_start_stop_start() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                idle_watcher.stop();
+                do idle_watcher.start |idle_watcher, status| {
+                    assert!(status.is_none());
+                    let mut idle_watcher = idle_watcher;
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                }
+            }
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs
index 2bd657fd864..5f9e5660814 100644
--- a/src/libstd/rt/uv/mod.rs
+++ b/src/libstd/rt/uv/mod.rs
@@ -57,6 +57,7 @@ pub use self::file::FsRequest;
 pub use self::net::{StreamWatcher, TcpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
+pub use self::async::AsyncWatcher;
 
 /// The implementation of `rtio` for libuv
 pub mod uvio;
@@ -68,6 +69,7 @@ pub mod file;
 pub mod net;
 pub mod idle;
 pub mod timer;
+pub mod async;
 
 /// XXX: Loop(*handle) is buggy with destructors. Normal structs
 /// with dtors may not be destructured, but tuple structs can,
@@ -125,6 +127,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
 pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(FsRequest, Option<UvError>);
 pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
+pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
 
 
 /// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@@ -135,7 +138,8 @@ struct WatcherData {
     close_cb: Option<NullCallback>,
     alloc_cb: Option<AllocCallback>,
     idle_cb: Option<IdleCallback>,
-    timer_cb: Option<TimerCallback>
+    timer_cb: Option<TimerCallback>,
+    async_cb: Option<AsyncCallback>
 }
 
 pub trait WatcherInterop {
@@ -164,7 +168,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
                 close_cb: None,
                 alloc_cb: None,
                 idle_cb: None,
-                timer_cb: None
+                timer_cb: None,
+                async_cb: None
             };
             let data = transmute::<~WatcherData, *c_void>(data);
             uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -364,57 +369,3 @@ fn loop_smoke_test() {
         loop_.close();
     }
 }
-
-#[test]
-#[ignore(reason = "valgrind - loop destroyed before watcher?")]
-fn idle_new_then_close() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close(||());
-    }
-}
-
-#[test]
-fn idle_smoke_test() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        let mut count = 10;
-        let count_ptr: *mut int = &mut count;
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            if unsafe { *count_ptr == 10 } {
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            } else {
-                unsafe { *count_ptr = *count_ptr + 1; }
-            }
-        }
-        loop_.run();
-        loop_.close();
-        assert_eq!(count, 10);
-    }
-}
-
-#[test]
-fn idle_start_stop_start() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            idle_watcher.stop();
-            do idle_watcher.start |idle_watcher, status| {
-                assert!(status.is_none());
-                let mut idle_watcher = idle_watcher;
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            }
-        }
-        loop_.run();
-        loop_.close();
-    }
-}
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index cacd67314eb..1ee6504d11f 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -12,6 +12,7 @@ use option::*;
 use result::*;
 use ops::Drop;
 use cell::{Cell, empty_cell};
+use cast;
 use cast::transmute;
 use clone::Clone;
 use rt::io::IoError;
@@ -23,6 +24,9 @@ use rt::sched::Scheduler;
 use rt::io::{standard_error, OtherIoError};
 use rt::tube::Tube;
 use rt::local::Local;
+use rt::work_queue::WorkQueue;
+use unstable::sync::{UnsafeAtomicRcBox, AtomicInt};
+use unstable::intrinsics;
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use uint;
@@ -39,11 +43,6 @@ pub impl UvEventLoop {
             uvio: UvIoFactory(Loop::new())
         }
     }
-
-    /// A convenience constructor
-    fn new_scheduler() -> Scheduler {
-        Scheduler::new(~UvEventLoop::new())
-    }
 }
 
 impl Drop for UvEventLoop {
@@ -82,6 +81,10 @@ impl EventLoop for UvEventLoop {
         }
     }
 
+    fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
+        ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
+    }
+
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
         Some(&mut self.uvio)
     }
@@ -101,6 +104,85 @@ fn test_callback_run_once() {
     }
 }
 
+pub struct UvRemoteCallback {
+    // The uv async handle for triggering the callback
+    async: AsyncWatcher,
+    // An atomic flag to tell the callback to exit,
+    // set from the dtor.
+    exit_flag: UnsafeAtomicRcBox<AtomicInt>
+}
+
+impl UvRemoteCallback {
+    pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
+        let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0));
+        let exit_flag_clone = exit_flag.clone();
+        let async = do AsyncWatcher::new(loop_) |watcher, status| {
+            assert!(status.is_none());
+            f();
+            let exit_flag_ptr = exit_flag_clone.get();
+            unsafe {
+                if (*exit_flag_ptr).load() == 1 {
+                    watcher.close(||());
+                }
+            }
+        };
+        UvRemoteCallback {
+            async: async,
+            exit_flag: exit_flag
+        }
+    }
+}
+
+impl RemoteCallback for UvRemoteCallback {
+    fn fire(&mut self) { self.async.send() }
+}
+
+impl Drop for UvRemoteCallback {
+    fn finalize(&self) {
+        unsafe {
+            let mut this: &mut UvRemoteCallback = cast::transmute_mut(self);
+            let exit_flag_ptr = this.exit_flag.get();
+            (*exit_flag_ptr).store(1);
+            this.async.send();
+        }
+    }
+}
+
+#[cfg(test)]
+mod test_remote {
+    use super::*;
+    use cell;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::thread::Thread;
+    use rt::tube::Tube;
+    use rt::rtio::EventLoop;
+    use rt::local::Local;
+    use rt::sched::Scheduler;
+
+    #[test]
+    fn test_uv_remote() {
+        do run_in_newsched_task {
+            let mut tube = Tube::new();
+            let tube_clone = tube.clone();
+            let remote_cell = cell::empty_cell();
+            do Local::borrow::<Scheduler>() |sched| {
+                let tube_clone = tube_clone.clone();
+                let tube_clone_cell = Cell(tube_clone);
+                let remote = do sched.event_loop.remote_callback {
+                    tube_clone_cell.take().send(1);
+                };
+                remote_cell.put_back(remote);
+            }
+            let _thread = do Thread::start {
+                remote_cell.take().fire();
+            };
+
+            assert!(tube.recv() == 1);
+        }
+    }
+}
+
 pub struct UvIoFactory(Loop);
 
 pub impl UvIoFactory {
@@ -123,12 +205,10 @@ impl IoFactory for UvIoFactory {
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
 
             rtdebug!("connect: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell(task);
 
@@ -168,7 +248,7 @@ impl IoFactory for UvIoFactory {
             Ok(_) => Ok(~UvTcpListener::new(watcher)),
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |_, task| {
                     let task_cell = Cell(task);
                     do watcher.as_stream().close {
                         let scheduler = Local::take::<Scheduler>();
@@ -204,7 +284,7 @@ impl Drop for UvTcpListener {
     fn finalize(&self) {
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.as_stream().close {
                 let scheduler = Local::take::<Scheduler>();
@@ -266,7 +346,7 @@ impl Drop for UvTcpStream {
         rtdebug!("closing tcp stream");
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.close {
                 let scheduler = Local::take::<Scheduler>();
@@ -285,11 +365,9 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
             rtdebug!("read: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut watcher = watcher;
             let task_cell = Cell(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -331,7 +409,7 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@@ -425,11 +503,9 @@ fn test_read_and_block() {
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |sched, task| {
                     let task = Cell(task);
-                    do Local::borrow::<Scheduler> |scheduler| {
-                        scheduler.enqueue_task(task.take());
-                    }
+                    sched.enqueue_task(task.take());
                 }
             }