about summary refs log tree commit diff
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2014-02-14 00:26:47 -0800
committerbors <bors@rust-lang.org>2014-02-14 00:26:47 -0800
commit03b324ff4481255a371bb234fc3e53bcb8d08e7e (patch)
tree0ff6f6307f936d2b2eced588c60b3dd2bc568e6b
parent2fe7bfe4d2de9942449d3656317e66bc9ec50204 (diff)
parent2650b61505e5ed5ac3075451a73e64fd226f5b10 (diff)
downloadrust-03b324ff4481255a371bb234fc3e53bcb8d08e7e.tar.gz
rust-03b324ff4481255a371bb234fc3e53bcb8d08e7e.zip
auto merge of #12186 : alexcrichton/rust/no-sleep-2, r=brson
Any single-threaded task benchmark will spend a good chunk of time in `kqueue()` on osx and `epoll()` on linux, and the reason for this is that each time a task is terminated it will hit the syscall. When a task terminates, it context switches back to the scheduler thread, and the scheduler thread falls out of `run_sched_once` whenever it figures out that it did some work.

If we know that `epoll()` will return nothing, then we can continue to do work locally (only while there's work to be done). We must fall back to `epoll()` whenever there's active I/O in order to check whether it's ready or not, but without that (which is largely the case in benchmarks), we can prevent the costly syscall and can get a nice speedup.

I've separated the commits into preparation for this change and then the change itself, the last commit message has more details.
-rw-r--r--src/libgreen/basic.rs2
-rw-r--r--src/libgreen/sched.rs157
-rw-r--r--src/librustuv/addrinfo.rs2
-rw-r--r--src/librustuv/file.rs3
-rw-r--r--src/librustuv/lib.rs28
-rw-r--r--src/librustuv/net.rs8
-rw-r--r--src/librustuv/pipe.rs2
-rw-r--r--src/librustuv/process.rs2
-rw-r--r--src/librustuv/stream.rs7
-rw-r--r--src/librustuv/timer.rs26
-rw-r--r--src/librustuv/uvio.rs4
-rw-r--r--src/libstd/rt/rtio.rs1
12 files changed, 157 insertions, 85 deletions
diff --git a/src/libgreen/basic.rs b/src/libgreen/basic.rs
index 10a56b2b225..5bccf05f7b3 100644
--- a/src/libgreen/basic.rs
+++ b/src/libgreen/basic.rs
@@ -158,6 +158,8 @@ impl EventLoop for BasicLoop {
     }
 
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
+
+    fn has_active_io(&self) -> bool { false }
 }
 
 struct BasicRemote {
diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs
index b224b0cabf3..4b1c4e3b425 100644
--- a/src/libgreen/sched.rs
+++ b/src/libgreen/sched.rs
@@ -252,12 +252,23 @@ impl Scheduler {
 
     // * Execution Functions - Core Loop Logic
 
-    // The model for this function is that you continue through it
-    // until you either use the scheduler while performing a schedule
-    // action, in which case you give it away and return early, or
-    // you reach the end and sleep. In the case that a scheduler
-    // action is performed the loop is evented such that this function
-    // is called again.
+    // This function is run from the idle callback on the uv loop, indicating
+    // that there are no I/O events pending. When this function returns, we will
+    // fall back to epoll() in the uv event loop, waiting for more things to
+    // happen. We may come right back off epoll() if the idle callback is still
+    // active, in which case we're truly just polling to see if I/O events are
+    // complete.
+    //
+    // The model for this function is to execute as much work as possible while
+    // still fairly considering I/O tasks. Falling back to epoll() frequently is
+    // often quite expensive, so we attempt to avoid it as much as possible. If
+    // we have any active I/O on the event loop, then we're forced to fall back
+    // to epoll() in order to provide fairness, but as long as we're doing work
+    // and there's no active I/O, we can continue to do work.
+    //
+    // If we try really hard to do some work, but no work is available to be
+    // done, then we fall back to epoll() to block this thread waiting for more
+    // work (instead of busy waiting).
     fn run_sched_once(mut ~self, stask: ~GreenTask) {
         // Make sure that we're not lying in that the `stask` argument is indeed
         // the scheduler task for this scheduler.
@@ -269,26 +280,46 @@ impl Scheduler {
 
         // First we check for scheduler messages, these are higher
         // priority than regular tasks.
-        let (sched, stask) =
-            match self.interpret_message_queue(stask, DontTryTooHard) {
-                Some(pair) => pair,
-                None => return
-            };
-
-        // This helper will use a randomized work-stealing algorithm
-        // to find work.
-        let (sched, stask) = match sched.do_work(stask) {
-            Some(pair) => pair,
-            None => return
-        };
+        let (mut sched, mut stask, mut did_work) =
+            self.interpret_message_queue(stask, DontTryTooHard);
 
-        // Now, before sleeping we need to find out if there really
-        // were any messages. Give it your best!
-        let (mut sched, stask) =
-            match sched.interpret_message_queue(stask, GiveItYourBest) {
-                Some(pair) => pair,
-                None => return
+        // After processing a message, we consider doing some more work on the
+        // event loop. The "keep going" condition changes after the first
+        // iteration becase we don't want to spin here infinitely.
+        //
+        // Once we start doing work we can keep doing work so long as the
+        // iteration does something. Note that we don't want to starve the
+        // message queue here, so each iteration when we're done working we
+        // check the message queue regardless of whether we did work or not.
+        let mut keep_going = !did_work || !sched.event_loop.has_active_io();
+        while keep_going {
+            let (a, b, c) = match sched.do_work(stask) {
+                (sched, task, false) => {
+                    sched.interpret_message_queue(task, GiveItYourBest)
+                }
+                (sched, task, true) => {
+                    let (sched, task, _) =
+                        sched.interpret_message_queue(task, GiveItYourBest);
+                    (sched, task, true)
+                }
             };
+            sched = a;
+            stask = b;
+            did_work = c;
+
+            // We only keep going if we managed to do something productive and
+            // also don't have any active I/O. If we didn't do anything, we
+            // should consider going to sleep, and if we have active I/O we need
+            // to poll for completion.
+            keep_going = did_work && !sched.event_loop.has_active_io();
+        }
+
+        // If we ever did some work, then we shouldn't put our scheduler
+        // entirely to sleep just yet. Leave the idle callback active and fall
+        // back to epoll() to see what's going on.
+        if did_work {
+            return stask.put_with_sched(sched);
+        }
 
         // If we got here then there was no work to do.
         // Generate a SchedHandle and push it to the sleeper list so
@@ -318,7 +349,7 @@ impl Scheduler {
     // return None.
     fn interpret_message_queue(mut ~self, stask: ~GreenTask,
                                effort: EffortLevel)
-        -> Option<(~Scheduler, ~GreenTask)>
+            -> (~Scheduler, ~GreenTask, bool)
     {
 
         let msg = if effort == DontTryTooHard {
@@ -349,25 +380,25 @@ impl Scheduler {
             Some(PinnedTask(task)) => {
                 let mut task = task;
                 task.give_home(HomeSched(self.make_handle()));
-                self.resume_task_immediately(stask, task).put();
-                return None;
+                let (sched, task) = self.resume_task_immediately(stask, task);
+                (sched, task, true)
             }
             Some(TaskFromFriend(task)) => {
                 rtdebug!("got a task from a friend. lovely!");
-                self.process_task(stask, task,
-                                  Scheduler::resume_task_immediately_cl);
-                return None;
+                let (sched, task) =
+                    self.process_task(stask, task,
+                                      Scheduler::resume_task_immediately_cl);
+                (sched, task, true)
             }
             Some(RunOnce(task)) => {
                 // bypass the process_task logic to force running this task once
                 // on this home scheduler. This is often used for I/O (homing).
-                self.resume_task_immediately(stask, task).put();
-                return None;
+                let (sched, task) = self.resume_task_immediately(stask, task);
+                (sched, task, true)
             }
             Some(Wake) => {
                 self.sleepy = false;
-                stask.put_with_sched(self);
-                return None;
+                (self, stask, true)
             }
             Some(Shutdown) => {
                 rtdebug!("shutting down");
@@ -389,31 +420,30 @@ impl Scheduler {
                 // event loop references we will shut down.
                 self.no_sleep = true;
                 self.sleepy = false;
-                stask.put_with_sched(self);
-                return None;
+                (self, stask, true)
             }
             Some(NewNeighbor(neighbor)) => {
                 self.work_queues.push(neighbor);
-                return Some((self, stask));
-            }
-            None => {
-                return Some((self, stask));
+                (self, stask, false)
             }
+            None => (self, stask, false)
         }
     }
 
-    fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> {
+    fn do_work(mut ~self,
+               stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) {
         rtdebug!("scheduler calling do work");
         match self.find_work() {
             Some(task) => {
                 rtdebug!("found some work! running the task");
-                self.process_task(stask, task,
-                                  Scheduler::resume_task_immediately_cl);
-                return None;
+                let (sched, task) =
+                    self.process_task(stask, task,
+                                      Scheduler::resume_task_immediately_cl);
+                (sched, task, true)
             }
             None => {
                 rtdebug!("no work was found, returning the scheduler struct");
-                return Some((self, stask));
+                (self, stask, false)
             }
         }
     }
@@ -486,7 +516,8 @@ impl Scheduler {
     // place.
 
     fn process_task(mut ~self, cur: ~GreenTask,
-                    mut next: ~GreenTask, schedule_fn: SchedulingFn) {
+                    mut next: ~GreenTask,
+                    schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) {
         rtdebug!("processing a task");
 
         match next.take_unwrap_home() {
@@ -495,23 +526,23 @@ impl Scheduler {
                     rtdebug!("sending task home");
                     next.give_home(HomeSched(home_handle));
                     Scheduler::send_task_home(next);
-                    cur.put_with_sched(self);
+                    (self, cur)
                 } else {
                     rtdebug!("running task here");
                     next.give_home(HomeSched(home_handle));
-                    schedule_fn(self, cur, next);
+                    schedule_fn(self, cur, next)
                 }
             }
             AnySched if self.run_anything => {
                 rtdebug!("running anysched task here");
                 next.give_home(AnySched);
-                schedule_fn(self, cur, next);
+                schedule_fn(self, cur, next)
             }
             AnySched => {
                 rtdebug!("sending task to friend");
                 next.give_home(AnySched);
                 self.send_to_friend(next);
-                cur.put_with_sched(self);
+                (self, cur)
             }
         }
     }
@@ -664,18 +695,19 @@ impl Scheduler {
     // * Context Swapping Helpers - Here be ugliness!
 
     pub fn resume_task_immediately(~self, cur: ~GreenTask,
-                                   next: ~GreenTask) -> ~GreenTask {
+                                   next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
         assert!(cur.is_sched());
-        self.change_task_context(cur, next, |sched, stask| {
+        let mut cur = self.change_task_context(cur, next, |sched, stask| {
             assert!(sched.sched_task.is_none());
             sched.sched_task = Some(stask);
-        })
+        });
+        (cur.sched.take_unwrap(), cur)
     }
 
     fn resume_task_immediately_cl(sched: ~Scheduler,
                                   cur: ~GreenTask,
-                                  next: ~GreenTask) {
-        sched.resume_task_immediately(cur, next).put()
+                                  next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
+        sched.resume_task_immediately(cur, next)
     }
 
     /// Block a running task, context switch to the scheduler, then pass the
@@ -741,15 +773,17 @@ impl Scheduler {
         cur.put();
     }
 
-    fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) {
-        sched.change_task_context(cur, next, |sched, last_task| {
+    fn switch_task(sched: ~Scheduler, cur: ~GreenTask,
+                   next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
+        let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
             if last_task.is_sched() {
                 assert!(sched.sched_task.is_none());
                 sched.sched_task = Some(last_task);
             } else {
                 sched.enqueue_task(last_task);
             }
-        }).put()
+        });
+        (cur.sched.take_unwrap(), cur)
     }
 
     // * Task Context Helpers
@@ -769,7 +803,9 @@ impl Scheduler {
     }
 
     pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
-        self.process_task(cur, next, Scheduler::switch_task);
+        let (sched, task) =
+            self.process_task(cur, next, Scheduler::switch_task);
+        task.put_with_sched(sched);
     }
 
     pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
@@ -836,7 +872,8 @@ impl Scheduler {
 
 // Supporting types
 
-type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
+type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask)
+                            -> (~Scheduler, ~GreenTask);
 
 pub enum SchedMessage {
     Wake,
diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs
index 2740671c00d..5d6af2969b8 100644
--- a/src/librustuv/addrinfo.rs
+++ b/src/librustuv/addrinfo.rs
@@ -86,7 +86,7 @@ impl GetAddrInfoRequest {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
 
-                wait_until_woken_after(&mut cx.slot, || {
+                wait_until_woken_after(&mut cx.slot, loop_, || {
                     req.set_data(&cx);
                 });
 
diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs
index 2cef2664c2f..e66452041a5 100644
--- a/src/librustuv/file.rs
+++ b/src/librustuv/file.rs
@@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
         0 => {
             req.fired = true;
             let mut slot = None;
-            wait_until_woken_after(&mut slot, || {
+            let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
+            wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
                 unsafe { uvll::set_data_for_req(req.req, &slot) }
             });
             match req.get_result() {
diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs
index b71dbe05ad2..8c263c5e5f7 100644
--- a/src/librustuv/lib.rs
+++ b/src/librustuv/lib.rs
@@ -47,7 +47,7 @@ via `close` and `delete` methods.
 use std::cast;
 use std::io;
 use std::io::IoError;
-use std::libc::c_int;
+use std::libc::{c_int, c_void};
 use std::ptr::null;
 use std::ptr;
 use std::rt::local::Local;
@@ -95,6 +95,10 @@ pub mod stream;
 pub trait UvHandle<T> {
     fn uv_handle(&self) -> *T;
 
+    fn uv_loop(&self) -> Loop {
+        Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
+    }
+
     // FIXME(#8888) dummy self
     fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
         unsafe {
@@ -136,7 +140,7 @@ pub trait UvHandle<T> {
             uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
             uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
 
-            wait_until_woken_after(&mut slot, || {
+            wait_until_woken_after(&mut slot, &self.uv_loop(), || {
                 uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
             })
         }
@@ -195,16 +199,20 @@ impl Drop for ForbidUnwind {
     }
 }
 
-fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
+fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
+                          loop_: &Loop,
+                          f: ||) {
     let _f = ForbidUnwind::new("wait_until_woken_after");
     unsafe {
         assert!((*slot).is_none());
         let task: ~Task = Local::take();
+        loop_.modify_blockers(1);
         task.deschedule(1, |task| {
             *slot = Some(task);
             f();
             Ok(())
         });
+        loop_.modify_blockers(-1);
     }
 }
 
@@ -273,6 +281,7 @@ impl Loop {
     pub fn new() -> Loop {
         let handle = unsafe { uvll::loop_new() };
         assert!(handle.is_not_null());
+        unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
         Loop::wrap(handle)
     }
 
@@ -285,6 +294,19 @@ impl Loop {
     pub fn close(&mut self) {
         unsafe { uvll::uv_loop_delete(self.handle) };
     }
+
+    // The 'data' field of the uv_loop_t is used to count the number of tasks
+    // that are currently blocked waiting for I/O to complete.
+    fn modify_blockers(&self, amt: uint) {
+        unsafe {
+            let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
+            uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
+        }
+    }
+
+    fn get_blockers(&self) -> uint {
+        unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
+    }
 }
 
 // FIXME: Need to define the error constants like EOF so they can be
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 551e2c9faf7..a091829f297 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -216,7 +216,7 @@ impl TcpWatcher {
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { status: 0, task: None };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &io.loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.status {
@@ -498,6 +498,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
             buf: Option<Buf>,
             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
         }
+        let loop_ = self.uv_loop();
         let m = self.fire_homing_missile();
         let _g = self.read_access.grant(m);
 
@@ -511,7 +512,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
                     result: None,
                 };
                 let handle = self.handle;
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
                 });
                 match cx.result.take_unwrap() {
@@ -571,6 +572,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
         struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         let m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
         let _g = self.write_access.grant(m);
 
         let mut req = Request::new(uvll::UV_UDP_SEND);
@@ -586,7 +588,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { task: None, result: 0 };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.result {
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index c312f112d28..24ac17700cc 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -92,7 +92,7 @@ impl PipeWatcher {
         let mut req = Request::new(uvll::UV_CONNECT);
         let pipe = PipeWatcher::new(io, false);
 
-        wait_until_woken_after(&mut cx.task, || {
+        wait_until_woken_after(&mut cx.task, &io.loop_, || {
             unsafe {
                 uvll::uv_pipe_connect(req.handle,
                                       pipe.handle(),
diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs
index 7b7a16d7084..e1f94d8c4df 100644
--- a/src/librustuv/process.rs
+++ b/src/librustuv/process.rs
@@ -211,7 +211,7 @@ impl RtioProcess for Process {
                 // If there's no exit code previously listed, then the
                 // process's exit callback has yet to be invoked. We just
                 // need to deschedule ourselves and wait to be reawoken.
-                wait_until_woken_after(&mut self.to_wake, || {});
+                wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
                 assert!(self.exit_status.is_some());
             }
         }
diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs
index 262952f8890..f7bf2f051eb 100644
--- a/src/librustuv/stream.rs
+++ b/src/librustuv/stream.rs
@@ -13,6 +13,7 @@ use std::libc::{c_int, size_t, ssize_t};
 use std::ptr;
 use std::rt::task::BlockedTask;
 
+use Loop;
 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
             ForbidUnwind, wakeup};
 use uvll;
@@ -87,7 +88,8 @@ impl StreamWatcher {
             uvll::uv_read_start(self.handle, alloc_cb, read_cb)
         } {
             0 => {
-                wait_until_woken_after(&mut rcx.task, || {});
+                let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+                wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
                 match rcx.result {
                     n if n < 0 => Err(UvError(n as c_int)),
                     n => Ok(n as uint),
@@ -121,7 +123,8 @@ impl StreamWatcher {
                 let mut wcx = WriteContext { result: 0, task: None, };
                 req.defuse(); // uv callback now owns this request
 
-                wait_until_woken_after(&mut wcx.task, || {
+                let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+                wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
                     req.set_data(&wcx);
                 });
                 self.last_write_req = Some(Request::wrap(req.handle));
diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs
index 0ce2501d2cc..8c80cc99145 100644
--- a/src/librustuv/timer.rs
+++ b/src/librustuv/timer.rs
@@ -9,13 +9,12 @@
 // except according to those terms.
 
 use std::libc::c_int;
-use std::mem::replace;
-use std::rt::local::Local;
+use std::mem;
 use std::rt::rtio::RtioTimer;
-use std::rt::task::{BlockedTask, Task};
+use std::rt::task::BlockedTask;
 
 use homing::{HomeHandle, HomingIO};
-use super::{UvHandle, ForbidUnwind, ForbidSwitch};
+use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -23,11 +22,12 @@ pub struct TimerWatcher {
     handle: *uvll::uv_timer_t,
     home: HomeHandle,
     action: Option<NextAction>,
+    blocker: Option<BlockedTask>,
     id: uint, // see comments in timer_cb
 }
 
 pub enum NextAction {
-    WakeTask(BlockedTask),
+    WakeTask,
     SendOnce(Chan<()>),
     SendMany(Chan<()>, uint),
 }
@@ -41,6 +41,7 @@ impl TimerWatcher {
         let me = ~TimerWatcher {
             handle: handle,
             action: None,
+            blocker: None,
             home: io.make_handle(),
             id: 0,
         };
@@ -76,7 +77,7 @@ impl RtioTimer for TimerWatcher {
         let missile = self.fire_homing_missile();
         self.id += 1;
         self.stop();
-        let _missile = match replace(&mut self.action, None) {
+        let _missile = match mem::replace(&mut self.action, None) {
             None => missile, // no need to do a homing dance
             Some(action) => {
                 drop(missile);      // un-home ourself
@@ -89,11 +90,9 @@ impl RtioTimer for TimerWatcher {
         // started, then we need to call stop on the timer.
         let _f = ForbidUnwind::new("timer");
 
-        let task: ~Task = Local::take();
-        task.deschedule(1, |task| {
-            self.action = Some(WakeTask(task));
+        self.action = Some(WakeTask);
+        wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
             self.start(msecs, 0);
-            Ok(())
         });
         self.stop();
     }
@@ -108,7 +107,7 @@ impl RtioTimer for TimerWatcher {
             self.id += 1;
             self.stop();
             self.start(msecs, 0);
-            replace(&mut self.action, Some(SendOnce(chan)))
+            mem::replace(&mut self.action, Some(SendOnce(chan)))
         };
 
         return port;
@@ -124,7 +123,7 @@ impl RtioTimer for TimerWatcher {
             self.id += 1;
             self.stop();
             self.start(msecs, msecs);
-            replace(&mut self.action, Some(SendMany(chan, self.id)))
+            mem::replace(&mut self.action, Some(SendMany(chan, self.id)))
         };
 
         return port;
@@ -137,7 +136,8 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
     let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
 
     match timer.action.take_unwrap() {
-        WakeTask(task) => {
+        WakeTask => {
+            let task = timer.blocker.take_unwrap();
             let _ = task.wake().map(|t| t.reawaken());
         }
         SendOnce(chan) => { let _ = chan.try_send(()); }
diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs
index 54db4b4d3d1..14406cb2a6a 100644
--- a/src/librustuv/uvio.rs
+++ b/src/librustuv/uvio.rs
@@ -99,6 +99,10 @@ impl rtio::EventLoop for UvEventLoop {
         let factory = &mut self.uvio as &mut rtio::IoFactory;
         Some(factory)
     }
+
+    fn has_active_io(&self) -> bool {
+        self.uvio.loop_.get_blockers() > 0
+    }
 }
 
 #[cfg(not(test))]
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 39623e329ea..5573f8ec02e 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -41,6 +41,7 @@ pub trait EventLoop {
 
     /// The asynchronous I/O services. Not all event loops may provide one.
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
+    fn has_active_io(&self) -> bool;
 }
 
 pub trait RemoteCallback {