about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-08-20 09:42:00 -0700
committerbors <bors@rust-lang.org>2013-08-20 09:42:00 -0700
commit67c954e365970e4c2cd06f0c50724656d7010f45 (patch)
treed3b73e89a241ab0210e53d80b1846394139bf693 /src/libstd/rt
parent7f268128954fef84dcbcb7c9fe77e2a107e0bf69 (diff)
parente5ccf13668ed7b66d6efd9a1a03926e98546705d (diff)
downloadrust-67c954e365970e4c2cd06f0c50724656d7010f45.tar.gz
rust-67c954e365970e4c2cd06f0c50724656d7010f45.zip
auto merge of #8566 : toddaaro/rust/idle-opt+cleaning, r=catamorphism,brson
Instead of a furious storm of idle callbacks we just have one. This is a major performance gain - around 40% on my machine for the ping pong bench.

Also in this PR is a cleanup commit for the scheduler code. Was previously up as a separate PR, but bors load + imminent merge hell led me to roll them together. Was #8549.

Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/rtio.rs13
-rw-r--r--src/libstd/rt/sched.rs567
-rw-r--r--src/libstd/rt/task.rs2
-rw-r--r--src/libstd/rt/util.rs3
-rw-r--r--src/libstd/rt/uv/idle.rs14
-rw-r--r--src/libstd/rt/uv/uvio.rs89
6 files changed, 386 insertions, 302 deletions
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index a7c794fb5f1..e29c30ba033 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream;
 pub type RtioTcpListenerObject = uvio::UvTcpListener;
 pub type RtioUdpSocketObject = uvio::UvUdpSocket;
 pub type RtioTimerObject = uvio::UvTimer;
+pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
 
 pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
     fn callback_ms(&mut self, ms: u64, ~fn());
     fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
     /// The asynchronous I/O services. Not all event loops may provide one
@@ -35,11 +37,12 @@ pub trait EventLoop {
 }
 
 pub trait RemoteCallback {
-    /// Trigger the remote callback. Note that the number of times the callback
-    /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
-    /// the callback will be called at least once, but multiple callbacks may be coalesced
-    /// and callbacks may be called more often requested. Destruction also triggers the
-    /// callback.
+    /// Trigger the remote callback. Note that the number of times the
+    /// callback is run is not guaranteed. All that is guaranteed is
+    /// that, after calling 'fire', the callback will be called at
+    /// least once, but multiple callbacks may be coalesced and
+    /// callbacks may be called more often requested. Destruction also
+    /// triggers the callback.
     fn fire(&mut self);
 }
 
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index e65a45f0e07..3f7f2b5d653 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -23,7 +23,7 @@ use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
 use rt::local_ptr;
 use rt::local::Local;
-use rt::rtio::RemoteCallback;
+use rt::rtio::{RemoteCallback, PausibleIdleCallback};
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
 use cell::Cell;
@@ -31,10 +31,11 @@ use rand::{XorShiftRng, RngUtil};
 use iterator::{range};
 use vec::{OwnedVector};
 
-/// The Scheduler is responsible for coordinating execution of Coroutines
-/// on a single thread. When the scheduler is running it is owned by
-/// thread local storage and the running task is owned by the
-/// scheduler.
+/// A scheduler is responsible for coordinating the execution of Tasks
+/// on a single thread. The scheduler runs inside a slightly modified
+/// Rust Task. When not running this task is stored in the scheduler
+/// struct. The scheduler struct acts like a baton, all scheduling
+/// actions are transfers of the baton.
 ///
 /// XXX: This creates too many callbacks to run_sched_once, resulting
 /// in too much allocation and too many events.
@@ -64,11 +65,12 @@ pub struct Scheduler {
     stack_pool: StackPool,
     /// The event loop used to drive the scheduler and perform I/O
     event_loop: ~EventLoopObject,
-    /// The scheduler runs on a special task.
+    /// The scheduler runs on a special task. When it is not running
+    /// it is stored here instead of the work queue.
     sched_task: Option<~Task>,
     /// An action performed after a context switch on behalf of the
     /// code running before the context switch
-    priv cleanup_job: Option<CleanupJob>,
+    cleanup_job: Option<CleanupJob>,
     metrics: SchedMetrics,
     /// Should this scheduler run any task, or only pinned tasks?
     run_anything: bool,
@@ -76,31 +78,14 @@ pub struct Scheduler {
     /// them to.
     friend_handle: Option<SchedHandle>,
     /// A fast XorShift rng for scheduler use
-    rng: XorShiftRng
-
-}
-
-pub struct SchedHandle {
-    priv remote: ~RemoteCallbackObject,
-    priv queue: MessageQueue<SchedMessage>,
-    sched_id: uint
-}
-
-pub enum SchedMessage {
-    Wake,
-    Shutdown,
-    PinnedTask(~Task),
-    TaskFromFriend(~Task)
-}
-
-enum CleanupJob {
-    DoNothing,
-    GiveTask(~Task, UnsafeTaskReceiver)
+    rng: XorShiftRng,
+    /// A toggleable idle callback
+    idle_callback: ~PausibleIdleCallback
 }
 
 impl Scheduler {
 
-    pub fn sched_id(&self) -> uint { to_uint(self) }
+    // * Initialization Functions
 
     pub fn new(event_loop: ~EventLoopObject,
                work_queue: WorkQueue<~Task>,
@@ -114,8 +99,6 @@ impl Scheduler {
 
     }
 
-    // When you create a scheduler it isn't yet "in" a task, so the
-    // task field is None.
     pub fn new_special(event_loop: ~EventLoopObject,
                        work_queue: WorkQueue<~Task>,
                        work_queues: ~[WorkQueue<~Task>],
@@ -124,6 +107,9 @@ impl Scheduler {
                        friend: Option<SchedHandle>)
         -> Scheduler {
 
+        let mut event_loop = event_loop;
+        let idle_callback = event_loop.pausible_idle_callback();
+
         Scheduler {
             sleeper_list: sleeper_list,
             message_queue: MessageQueue::new(),
@@ -138,7 +124,8 @@ impl Scheduler {
             metrics: SchedMetrics::new(),
             run_anything: run_anything,
             friend_handle: friend,
-            rng: XorShiftRng::new()
+            rng: XorShiftRng::new(),
+            idle_callback: idle_callback
         }
     }
 
@@ -151,6 +138,8 @@ impl Scheduler {
     // scheduler task and bootstrap into it.
     pub fn bootstrap(~self, task: ~Task) {
 
+        let mut this = self;
+
         // Initialize the TLS key.
         local_ptr::init_tls_key();
 
@@ -161,10 +150,15 @@ impl Scheduler {
         // task, put it in TLS.
         Local::put::(sched_task);
 
+        // Before starting our first task, make sure the idle callback
+        // is active. As we do not start in the sleep state this is
+        // important.
+        this.idle_callback.start(Scheduler::run_sched_once);
+
         // Now, as far as all the scheduler state is concerned, we are
         // inside the "scheduler" context. So we can act like the
         // scheduler and resume the provided task.
-        self.resume_task_immediately(task);
+        this.resume_task_immediately(task);
 
         // Now we are back in the scheduler context, having
         // successfully run the input task. Start by running the
@@ -173,7 +167,6 @@ impl Scheduler {
         let sched = Local::take::<Scheduler>();
 
         rtdebug!("starting scheduler %u", sched.sched_id());
-
         sched.run();
 
         // Now that we are done with the scheduler, clean up the
@@ -189,6 +182,9 @@ impl Scheduler {
         let message = stask.sched.get_mut_ref().message_queue.pop();
         assert!(message.is_none());
 
+        // Close the idle callback.
+        stask.sched.get_mut_ref().idle_callback.close();
+
         stask.destroyed = true;
     }
 
@@ -198,11 +194,6 @@ impl Scheduler {
 
         let mut self_sched = self;
 
-        // Always run through the scheduler loop at least once so that
-        // we enter the sleep state and can then be woken up by other
-        // schedulers.
-        self_sched.event_loop.callback(Scheduler::run_sched_once);
-
         // This is unsafe because we need to place the scheduler, with
         // the event_loop inside, inside our task. But we still need a
         // mutable reference to the event_loop to give it the "run"
@@ -221,11 +212,11 @@ impl Scheduler {
         }
     }
 
-    // One iteration of the scheduler loop, always run at least once.
+    // * Execution Functions - Core Loop Logic
 
     // The model for this function is that you continue through it
     // until you either use the scheduler while performing a schedule
-    // action, in which case you give it away and do not return, or
+    // action, in which case you give it away and return early, or
     // you reach the end and sleep. In the case that a scheduler
     // action is performed the loop is evented such that this function
     // is called again.
@@ -235,41 +226,24 @@ impl Scheduler {
         // already have a scheduler stored in our local task, so we
         // start off by taking it. This is the only path through the
         // scheduler where we get the scheduler this way.
-        let sched = Local::take::<Scheduler>();
+        let mut sched = Local::take::<Scheduler>();
 
-        // Our first task is to read mail to see if we have important
-        // messages.
-
-        // 1) A wake message is easy, mutate sched struct and return
-        //    it.
-        // 2) A shutdown is also easy, shutdown.
-        // 3) A pinned task - we resume immediately and do not return
-        //    here.
-        // 4) A message from another scheduler with a non-homed task
-        //    to run here.
-
-        let result = sched.interpret_message_queue();
-        let sched = match result {
-            Some(sched) => {
-                // We did not resume a task, so we returned.
-                sched
-            }
-            None => {
-                return;
-            }
-        };
+        // Assume that we need to continue idling unless we reach the
+        // end of this function without performing an action.
+        sched.idle_callback.resume();
 
-        // Second activity is to try resuming a task from the queue.
+        // First we check for scheduler messages, these are higher
+        // priority than regular tasks.
+        let sched = match sched.interpret_message_queue() {
+            Some(sched) => sched,
+            None => return
+        };
 
-        let result = sched.do_work();
-        let mut sched = match result {
-            Some(sched) => {
-                // Failed to dequeue a task, so we return.
-                sched
-            }
-            None => {
-                return;
-            }
+        // This helper will use a randomized work-stealing algorithm
+        // to find work.
+        let mut sched = match sched.do_work() {
+            Some(sched) => sched,
+            None => return
         };
 
         // If we got here then there was no work to do.
@@ -282,8 +256,13 @@ impl Scheduler {
             sched.sleepy = true;
             let handle = sched.make_handle();
             sched.sleeper_list.push(handle);
+            // Since we are sleeping, deactivate the idle callback.
+            sched.idle_callback.pause();
         } else {
             rtdebug!("not sleeping, already doing so or no_sleep set");
+            // We may not be sleeping, but we still need to deactivate
+            // the idle callback.
+            sched.idle_callback.pause();
         }
 
         // Finished a cycle without using the Scheduler. Place it back
@@ -291,85 +270,33 @@ impl Scheduler {
         Local::put(sched);
     }
 
-    pub fn make_handle(&mut self) -> SchedHandle {
-        let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
-
-        return SchedHandle {
-            remote: remote,
-            queue: self.message_queue.clone(),
-            sched_id: self.sched_id()
-        };
-    }
-
-    /// Schedule a task to be executed later.
-    ///
-    /// Pushes the task onto the work stealing queue and tells the
-    /// event loop to run it later. Always use this instead of pushing
-    /// to the work queue directly.
-    pub fn enqueue_task(&mut self, task: ~Task) {
-
-        let this = self;
-
-        // We push the task onto our local queue clone.
-        this.work_queue.push(task);
-        this.event_loop.callback(Scheduler::run_sched_once);
-
-        // We've made work available. Notify a
-        // sleeping scheduler.
-
-        // XXX: perf. Check for a sleeper without
-        // synchronizing memory.  It's not critical
-        // that we always find it.
-
-        // XXX: perf. If there's a sleeper then we
-        // might as well just send it the task
-        // directly instead of pushing it to the
-        // queue. That is essentially the intent here
-        // and it is less work.
-        match this.sleeper_list.pop() {
-            Some(handle) => {
-                let mut handle = handle;
-                handle.send(Wake)
-            }
-            None => { (/* pass */) }
-        };
-    }
-
-    /// As enqueue_task, but with the possibility for the blocked task to
-    /// already have been killed.
-    pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
-        do blocked_task.wake().map_move |task| {
-            self.enqueue_task(task);
-        };
-    }
-
-    // * Scheduler-context operations
-
     // This function returns None if the scheduler is "used", or it
-    // returns the still-available scheduler.
+    // returns the still-available scheduler. At this point all
+    // message-handling will count as a turn of work, and as a result
+    // return None.
     fn interpret_message_queue(~self) -> Option<~Scheduler> {
 
         let mut this = self;
         match this.message_queue.pop() {
             Some(PinnedTask(task)) => {
-                this.event_loop.callback(Scheduler::run_sched_once);
                 let mut task = task;
                 task.give_home(Sched(this.make_handle()));
                 this.resume_task_immediately(task);
                 return None;
             }
             Some(TaskFromFriend(task)) => {
-                this.event_loop.callback(Scheduler::run_sched_once);
                 rtdebug!("got a task from a friend. lovely!");
-                return this.sched_schedule_task(task);
+                this.process_task(task,
+                                  Scheduler::resume_task_immediately_cl).map_move(Local::put);
+                return None;
             }
             Some(Wake) => {
-                this.event_loop.callback(Scheduler::run_sched_once);
                 this.sleepy = false;
-                return Some(this);
+                Local::put(this);
+                return None;
             }
             Some(Shutdown) => {
-                this.event_loop.callback(Scheduler::run_sched_once);
+                rtdebug!("shutting down");
                 if this.sleepy {
                     // There may be an outstanding handle on the
                     // sleeper list.  Pop them all to make sure that's
@@ -388,11 +315,8 @@ impl Scheduler {
                 // event loop references we will shut down.
                 this.no_sleep = true;
                 this.sleepy = false;
-                // YYY: Does a shutdown count as a "use" of the
-                // scheduler? This seems to work - so I'm leaving it
-                // this way despite not having a solid rational for
-                // why I should return the scheduler here.
-                return Some(this);
+                Local::put(this);
+                return None;
             }
             None => {
                 return Some(this);
@@ -400,30 +324,19 @@ impl Scheduler {
         }
     }
 
-    /// Given an input Coroutine sends it back to its home scheduler.
-    fn send_task_home(task: ~Task) {
-        let mut task = task;
-        let mut home = task.take_unwrap_home();
-        match home {
-            Sched(ref mut home_handle) => {
-                home_handle.send(PinnedTask(task));
-            }
-            AnySched => {
-                rtabort!("error: cannot send anysched task home");
-            }
-        }
-    }
+    fn do_work(~self) -> Option<~Scheduler> {
+        let mut this = self;
 
-    /// Take a non-homed task we aren't allowed to run here and send
-    /// it to the designated friend scheduler to execute.
-    fn send_to_friend(&mut self, task: ~Task) {
-        rtdebug!("sending a task to friend");
-        match self.friend_handle {
-            Some(ref mut handle) => {
-                handle.send(TaskFromFriend(task));
+        rtdebug!("scheduler calling do work");
+        match this.find_work() {
+            Some(task) => {
+                rtdebug!("found some work! processing the task");
+                return this.process_task(task,
+                                         Scheduler::resume_task_immediately_cl);
             }
             None => {
-                rtabort!("tried to send task to a friend but scheduler has no friends");
+                rtdebug!("no work was found, returning the scheduler struct");
+                return Some(this);
             }
         }
     }
@@ -447,8 +360,8 @@ impl Scheduler {
             None => {
                 // Our naive stealing, try kinda hard.
                 rtdebug!("scheduler trying to steal");
-                let _len = self.work_queues.len();
-                return self.try_steals(2);
+                let len = self.work_queues.len();
+                return self.try_steals(len/2);
             }
         }
     }
@@ -462,7 +375,8 @@ impl Scheduler {
             let work_queues = &mut self.work_queues;
             match work_queues[index].steal() {
                 Some(task) => {
-                    rtdebug!("found task by stealing"); return Some(task)
+                    rtdebug!("found task by stealing");
+                    return Some(task)
                 }
                 None => ()
             }
@@ -471,8 +385,11 @@ impl Scheduler {
         return None;
     }
 
-    // Given a task, execute it correctly.
-    fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
+    // * Task Routing Functions - Make sure tasks send up in the right
+    // place.
+
+    fn process_task(~self, task: ~Task,
+                    schedule_fn: SchedulingFn) -> Option<~Scheduler> {
         let mut this = self;
         let mut task = task;
 
@@ -489,15 +406,13 @@ impl Scheduler {
                 } else {
                     rtdebug!("running task here");
                     task.give_home(Sched(home_handle));
-                    this.resume_task_immediately(task);
-                    return None;
+                    return schedule_fn(this, task);
                 }
             }
             AnySched if this.run_anything => {
                 rtdebug!("running anysched task here");
                 task.give_home(AnySched);
-                this.resume_task_immediately(task);
-                return None;
+                return schedule_fn(this, task);
             }
             AnySched => {
                 rtdebug!("sending task to friend");
@@ -508,98 +423,71 @@ impl Scheduler {
         }
     }
 
-    // Bundle the helpers together.
-    fn do_work(~self) -> Option<~Scheduler> {
-        let mut this = self;
-
-        rtdebug!("scheduler calling do work");
-        match this.find_work() {
-            Some(task) => {
-                rtdebug!("found some work! processing the task");
-                return this.process_task(task);
+    fn send_task_home(task: ~Task) {
+        let mut task = task;
+        let mut home = task.take_unwrap_home();
+        match home {
+            Sched(ref mut home_handle) => {
+                home_handle.send(PinnedTask(task));
             }
-            None => {
-                rtdebug!("no work was found, returning the scheduler struct");
-                return Some(this);
+            AnySched => {
+                        rtabort!("error: cannot send anysched task home");
             }
         }
     }
 
-    /// Called by a running task to end execution, after which it will
-    /// be recycled by the scheduler for reuse in a new task.
-    pub fn terminate_current_task(~self) {
-        // Similar to deschedule running task and then, but cannot go through
-        // the task-blocking path. The task is already dying.
-        let mut this = self;
-        let stask = this.sched_task.take_unwrap();
-        do this.change_task_context(stask) |sched, mut dead_task| {
-            let coroutine = dead_task.coroutine.take_unwrap();
-            coroutine.recycle(&mut sched.stack_pool);
+    /// Take a non-homed task we aren't allowed to run here and send
+    /// it to the designated friend scheduler to execute.
+    fn send_to_friend(&mut self, task: ~Task) {
+        rtdebug!("sending a task to friend");
+        match self.friend_handle {
+            Some(ref mut handle) => {
+                handle.send(TaskFromFriend(task));
+            }
+            None => {
+                rtabort!("tried to send task to a friend but scheduler has no friends");
+            }
         }
     }
 
-    // Scheduling a task requires a few checks to make sure the task
-    // ends up in the appropriate location. The run_anything flag on
-    // the scheduler and the home on the task need to be checked. This
-    // helper performs that check. It takes a function that specifies
-    // how to queue the the provided task if that is the correct
-    // action. This is a "core" function that requires handling the
-    // returned Option correctly.
-
-    pub fn schedule_task(~self, task: ~Task,
-                         schedule_fn: ~fn(sched: ~Scheduler, task: ~Task))
-        -> Option<~Scheduler> {
-
-        // is the task home?
-        let is_home = task.is_home_no_tls(&self);
+    /// Schedule a task to be executed later.
+    ///
+    /// Pushes the task onto the work stealing queue and tells the
+    /// event loop to run it later. Always use this instead of pushing
+    /// to the work queue directly.
+    pub fn enqueue_task(&mut self, task: ~Task) {
 
-        // does the task have a home?
-        let homed = task.homed();
+        let this = self;
 
-        let mut this = self;
+        // We push the task onto our local queue clone.
+        this.work_queue.push(task);
+        this.idle_callback.resume();
 
-        if is_home || (!homed && this.run_anything) {
-            // here we know we are home, execute now OR we know we
-            // aren't homed, and that this sched doesn't care
-            rtdebug!("task: %u is on ok sched, executing", to_uint(task));
-            schedule_fn(this, task);
-            return None;
-        } else if !homed && !this.run_anything {
-            // the task isn't homed, but it can't be run here
-            this.send_to_friend(task);
-            return Some(this);
-        } else {
-            // task isn't home, so don't run it here, send it home
-            Scheduler::send_task_home(task);
-            return Some(this);
-        }
-    }
+        // We've made work available. Notify a
+        // sleeping scheduler.
 
-    // There are two contexts in which schedule_task can be called:
-    // inside the scheduler, and inside a task. These contexts handle
-    // executing the task slightly differently. In the scheduler
-    // context case we want to receive the scheduler as an input, and
-    // manually deal with the option. In the task context case we want
-    // to use TLS to find the scheduler, and deal with the option
-    // inside the helper.
-
-    pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
-        do self.schedule_task(task) |sched, next_task| {
-            sched.resume_task_immediately(next_task);
-        }
+        // XXX: perf. Check for a sleeper without
+        // synchronizing memory.  It's not critical
+        // that we always find it.
+        match this.sleeper_list.pop() {
+            Some(handle) => {
+                        let mut handle = handle;
+                handle.send(Wake)
+            }
+            None => { (/* pass */) }
+        };
     }
 
-    // Task context case - use TLS.
-    pub fn run_task(task: ~Task) {
-        let sched = Local::take::<Scheduler>();
-        let opt = do sched.schedule_task(task) |sched, next_task| {
-            do sched.switch_running_tasks_and_then(next_task) |sched, last_task| {
-                sched.enqueue_blocked_task(last_task);
-            }
+    /// As enqueue_task, but with the possibility for the blocked task to
+    /// already have been killed.
+    pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
+        do blocked_task.wake().map_move |task| {
+            self.enqueue_task(task);
         };
-        opt.map_move(Local::put);
     }
 
+    // * Core Context Switching Functions
+
     // The primary function for changing contexts. In the current
     // design the scheduler is just a slightly modified GreenTask, so
     // all context swaps are from Task to Task. The only difference
@@ -629,7 +517,7 @@ impl Scheduler {
 
         // The current task is placed inside an enum with the cleanup
         // function. This enum is then placed inside the scheduler.
-        this.enqueue_cleanup_job(GiveTask(current_task, f_opaque));
+        this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
 
         // The scheduler is then placed inside the next task.
         let mut next_task = next_task;
@@ -645,12 +533,9 @@ impl Scheduler {
                 transmute_mut_region(*next_task.sched.get_mut_ref());
 
             let current_task: &mut Task = match sched.cleanup_job {
-                Some(GiveTask(ref task, _)) => {
+                Some(CleanupJob { task: ref task, _ }) => {
                     transmute_mut_region(*transmute_mut_unsafe(task))
                 }
-                Some(DoNothing) => {
-                    rtabort!("no next task");
-                }
                 None => {
                     rtabort!("no cleanup job");
                 }
@@ -684,19 +569,42 @@ impl Scheduler {
         }
     }
 
-    // Old API for task manipulation implemented over the new core
-    // function.
+    // Returns a mutable reference to both contexts involved in this
+    // swap. This is unsafe - we are getting mutable internal
+    // references to keep even when we don't own the tasks. It looks
+    // kinda safe because we are doing transmutes before passing in
+    // the arguments.
+    pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
+        (&'a mut Context, &'a mut Context) {
+        let current_task_context =
+            &mut current_task.coroutine.get_mut_ref().saved_context;
+        let next_task_context =
+                &mut next_task.coroutine.get_mut_ref().saved_context;
+        unsafe {
+            (transmute_mut_region(current_task_context),
+             transmute_mut_region(next_task_context))
+        }
+    }
+
+    // * Context Swapping Helpers - Here be ugliness!
 
-    pub fn resume_task_immediately(~self, task: ~Task) {
+    pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
         do self.change_task_context(task) |sched, stask| {
             sched.sched_task = Some(stask);
         }
+        return None;
     }
 
+    fn resume_task_immediately_cl(sched: ~Scheduler,
+                                  task: ~Task) -> Option<~Scheduler> {
+        sched.resume_task_immediately(task)
+    }
+
+
     pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
         match blocked_task.wake() {
-            Some(task) => self.resume_task_immediately(task),
-            None => Local::put(self),
+            Some(task) => { self.resume_task_immediately(task); }
+            None => Local::put(self)
         };
     }
 
@@ -735,54 +643,75 @@ impl Scheduler {
         }
     }
 
-    // A helper that looks up the scheduler and runs a task later by
-    // enqueuing it.
+    fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
+        do sched.switch_running_tasks_and_then(task) |sched, last_task| {
+            sched.enqueue_blocked_task(last_task);
+        };
+        return None;
+    }
+
+    // * Task Context Helpers
+
+    /// Called by a running task to end execution, after which it will
+    /// be recycled by the scheduler for reuse in a new task.
+    pub fn terminate_current_task(~self) {
+        // Similar to deschedule running task and then, but cannot go through
+        // the task-blocking path. The task is already dying.
+        let mut this = self;
+        let stask = this.sched_task.take_unwrap();
+        do this.change_task_context(stask) |sched, mut dead_task| {
+            let coroutine = dead_task.coroutine.take_unwrap();
+            coroutine.recycle(&mut sched.stack_pool);
+        }
+    }
+
+    pub fn run_task(task: ~Task) {
+        let sched = Local::take::<Scheduler>();
+        sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
+    }
+
     pub fn run_task_later(next_task: ~Task) {
-        // We aren't performing a scheduler operation, so we want to
-        // put the Scheduler back when we finish.
         let next_task = Cell::new(next_task);
         do Local::borrow::<Scheduler,()> |sched| {
             sched.enqueue_task(next_task.take());
         };
     }
 
-    // Returns a mutable reference to both contexts involved in this
-    // swap. This is unsafe - we are getting mutable internal
-    // references to keep even when we don't own the tasks. It looks
-    // kinda safe because we are doing transmutes before passing in
-    // the arguments.
-    pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
-        (&'a mut Context, &'a mut Context) {
-        let current_task_context =
-            &mut current_task.coroutine.get_mut_ref().saved_context;
-        let next_task_context =
-            &mut next_task.coroutine.get_mut_ref().saved_context;
-        unsafe {
-            (transmute_mut_region(current_task_context),
-             transmute_mut_region(next_task_context))
-        }
-    }
+    // * Utility Functions
 
-    pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
-        self.cleanup_job = Some(job);
-    }
+    pub fn sched_id(&self) -> uint { to_uint(self) }
 
     pub fn run_cleanup_job(&mut self) {
-        rtdebug!("running cleanup job");
         let cleanup_job = self.cleanup_job.take_unwrap();
-        match cleanup_job {
-            DoNothing => { }
-            GiveTask(task, f) => f.to_fn()(self, task)
-        }
+        cleanup_job.run(self);
+    }
+
+    pub fn make_handle(&mut self) -> SchedHandle {
+        let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
+
+        return SchedHandle {
+            remote: remote,
+            queue: self.message_queue.clone(),
+            sched_id: self.sched_id()
+        };
     }
 }
 
-// The cases for the below function.
-enum ResumeAction {
-    SendHome,
-    Requeue,
-    ResumeNow,
-    Homeless
+// Supporting types
+
+type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
+
+pub enum SchedMessage {
+    Wake,
+    Shutdown,
+    PinnedTask(~Task),
+    TaskFromFriend(~Task)
+}
+
+pub struct SchedHandle {
+    priv remote: ~RemoteCallbackObject,
+    priv queue: MessageQueue<SchedMessage>,
+    sched_id: uint
 }
 
 impl SchedHandle {
@@ -792,6 +721,25 @@ impl SchedHandle {
     }
 }
 
+struct CleanupJob {
+    task: ~Task,
+    f: UnsafeTaskReceiver
+}
+
+impl CleanupJob {
+    pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
+        CleanupJob {
+            task: task,
+            f: f
+        }
+    }
+
+    pub fn run(self, sched: &mut Scheduler) {
+        let CleanupJob { task: task, f: f } = self;
+        f.to_fn()(sched, task)
+    }
+}
+
 // XXX: Some hacks to put a &fn in Scheduler without borrowck
 // complaining
 type UnsafeTaskReceiver = raw::Closure;
@@ -1098,6 +1046,51 @@ mod test {
         }
     }
 
+    // A regression test that the final message is always handled.
+    // Used to deadlock because Shutdown was never recvd.
+    #[test]
+    fn no_missed_messages() {
+        use rt::work_queue::WorkQueue;
+        use rt::sleeper_list::SleeperList;
+        use rt::stack::StackPool;
+        use rt::uv::uvio::UvEventLoop;
+        use rt::sched::{Shutdown, TaskFromFriend};
+        use util;
+
+        do run_in_bare_thread {
+            do stress_factor().times {
+                let sleepers = SleeperList::new();
+                let queue = WorkQueue::new();
+                let queues = ~[queue.clone()];
+
+                let mut sched = ~Scheduler::new(
+                    ~UvEventLoop::new(),
+                    queue,
+                    queues.clone(),
+                    sleepers.clone());
+
+                let mut handle = sched.make_handle();
+
+                let sched = Cell::new(sched);
+
+                let thread = do Thread::start {
+                    let mut sched = sched.take();
+                    let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||());
+                    sched.bootstrap(bootstrap_task);
+                };
+
+                let mut stack_pool = StackPool::new();
+                let task = ~Task::new_root(&mut stack_pool, None, ||());
+                handle.send(TaskFromFriend(task));
+
+                handle.send(Shutdown);
+                util::ignore(handle);
+
+                thread.join();
+            }
+        }
+    }
+
     #[test]
     fn multithreading() {
         use rt::comm::*;
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 95d60c11df6..3b8eb87f8af 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -370,7 +370,7 @@ impl Coroutine {
 
                 // Again - might work while safe, or it might not.
                 do Local::borrow::<Scheduler,()> |sched| {
-                    (sched).run_cleanup_job();
+                    sched.run_cleanup_job();
                 }
 
                 // To call the run method on a task we need a direct
diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs
index c81f3ec9a79..1f29830aa04 100644
--- a/src/libstd/rt/util.rs
+++ b/src/libstd/rt/util.rs
@@ -66,8 +66,7 @@ pub fn default_sched_threads() -> uint {
 pub fn dumb_println(s: &str) {
     use io::WriterUtil;
     let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
-    dbg.write_str(s);
-    dbg.write_str("\n");
+    dbg.write_str(s + "\n");
 }
 
 pub fn abort(msg: &str) -> ! {
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
index b73be9f7250..a21146620ca 100644
--- a/src/libstd/rt/uv/idle.rs
+++ b/src/libstd/rt/uv/idle.rs
@@ -48,6 +48,20 @@ impl IdleWatcher {
         }
     }
 
+    pub fn restart(&mut self) {
+        unsafe {
+            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
+        };
+
+        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
+            let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            let data = idle_watcher.get_watcher_data();
+            let cb: &IdleCallback = data.idle_cb.get_ref();
+            let status = status_to_maybe_uv_error(idle_watcher, status);
+            (*cb)(idle_watcher, status);
+        }
+    }
+
     pub fn stop(&mut self) {
         // NB: Not resetting the Rust idle_cb to None here because `stop` is
         // likely called from *within* the idle callback, causing a use after
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index d4794da9b0f..078dce4f0c8 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -117,6 +117,15 @@ impl EventLoop for UvEventLoop {
         }
     }
 
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
+        let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
+        return ~UvPausibleIdleCallback {
+            watcher: idle_watcher,
+            idle_flag: false,
+            closed: false
+        };
+    }
+
     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
         do timer.start(ms, 0) |timer, status| {
@@ -135,6 +144,44 @@ impl EventLoop for UvEventLoop {
     }
 }
 
+pub struct UvPausibleIdleCallback {
+    watcher: IdleWatcher,
+    idle_flag: bool,
+    closed: bool
+}
+
+impl UvPausibleIdleCallback {
+    #[inline]
+    pub fn start(&mut self, f: ~fn()) {
+        do self.watcher.start |_idle_watcher, _status| {
+            f();
+        };
+        self.idle_flag = true;
+    }
+    #[inline]
+    pub fn pause(&mut self) {
+        if self.idle_flag == true {
+            self.watcher.stop();
+            self.idle_flag = false;
+        }
+    }
+    #[inline]
+    pub fn resume(&mut self) {
+        if self.idle_flag == false {
+            self.watcher.restart();
+            self.idle_flag = true;
+        }
+    }
+    #[inline]
+    pub fn close(&mut self) {
+        self.pause();
+        if !self.closed {
+            self.closed = true;
+            self.watcher.close(||());
+        }
+    }
+}
+
 #[test]
 fn test_callback_run_once() {
     do run_in_bare_thread {
@@ -163,14 +210,39 @@ impl UvRemoteCallback {
         let exit_flag_clone = exit_flag.clone();
         let async = do AsyncWatcher::new(loop_) |watcher, status| {
             assert!(status.is_none());
+
+            // The synchronization logic here is subtle. To review,
+            // the uv async handle type promises that, after it is
+            // triggered the remote callback is definitely called at
+            // least once. UvRemoteCallback needs to maintain those
+            // semantics while also shutting down cleanly from the
+            // dtor. In our case that means that, when the
+            // UvRemoteCallback dtor calls `async.send()`, here `f` is
+            // always called later.
+
+            // In the dtor both the exit flag is set and the async
+            // callback fired under a lock.  Here, before calling `f`,
+            // we take the lock and check the flag. Because we are
+            // checking the flag before calling `f`, and the flag is
+            // set under the same lock as the send, then if the flag
+            // is set then we're guaranteed to call `f` after the
+            // final send.
+
+            // If the check was done after `f()` then there would be a
+            // period between that call and the check where the dtor
+            // could be called in the other thread, missing the final
+            // callback while still destroying the handle.
+
+            let should_exit = unsafe {
+                exit_flag_clone.with_imm(|&should_exit| should_exit)
+            };
+
             f();
-            unsafe {
-                do exit_flag_clone.with_imm |&should_exit| {
-                    if should_exit {
-                        watcher.close(||());
-                    }
-                }
+
+            if should_exit {
+                watcher.close(||());
             }
+
         };
         UvRemoteCallback {
             async: async,
@@ -219,7 +291,10 @@ mod test_remote {
                 let tube_clone = tube_clone.clone();
                 let tube_clone_cell = Cell::new(tube_clone);
                 let remote = do sched.event_loop.remote_callback {
-                    tube_clone_cell.take().send(1);
+                    // This could be called multiple times
+                    if !tube_clone_cell.is_empty() {
+                        tube_clone_cell.take().send(1);
+                    }
                 };
                 remote_cell.put_back(remote);
             }