about summary refs log tree commit diff
path: root/src/libstd/rt/sched.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/rt/sched.rs')
-rw-r--r--src/libstd/rt/sched.rs1027
1 files changed, 454 insertions, 573 deletions
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index ae4ca2b9783..0326c2cbfe5 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -10,7 +10,8 @@
 
 use either::{Left, Right};
 use option::{Option, Some, None};
-use cast::transmute;
+use sys;
+use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
 use clone::Clone;
 use unstable::raw;
 
@@ -27,6 +28,7 @@ use rt::local::Local;
 use rt::rtio::RemoteCallback;
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
+use cell::Cell;
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
@@ -59,11 +61,8 @@ pub struct Scheduler {
     stack_pool: StackPool,
     /// The event loop used to drive the scheduler and perform I/O
     event_loop: ~EventLoopObject,
-    /// The scheduler's saved context.
-    /// Always valid when a task is executing, otherwise not
-    priv saved_context: Context,
-    /// The currently executing task
-    current_task: Option<~Task>,
+    /// The scheduler runs on a special task.
+    sched_task: Option<~Task>,
     /// An action performed after a context switch on behalf of the
     /// code running before the context switch
     priv cleanup_job: Option<CleanupJob>,
@@ -90,7 +89,6 @@ enum CleanupJob {
 }
 
 impl Scheduler {
-    pub fn in_task_context(&self) -> bool { self.current_task.is_some() }
 
     pub fn sched_id(&self) -> uint { to_uint(self) }
 
@@ -103,15 +101,14 @@ impl Scheduler {
 
     }
 
+    // When you create a scheduler it isn't yet "in" a task, so the
+    // task field is None.
     pub fn new_special(event_loop: ~EventLoopObject,
                        work_queue: WorkQueue<~Task>,
                        sleeper_list: SleeperList,
                        run_anything: bool)
         -> Scheduler {
 
-        // Lazily initialize the runtime TLS key
-        local_ptr::init_tls_key();
-
         Scheduler {
             sleeper_list: sleeper_list,
             message_queue: MessageQueue::new(),
@@ -120,8 +117,7 @@ impl Scheduler {
             event_loop: event_loop,
             work_queue: work_queue,
             stack_pool: StackPool::new(),
-            saved_context: Context::empty(),
-            current_task: None,
+            sched_task: None,
             cleanup_job: None,
             metrics: SchedMetrics::new(),
             run_anything: run_anything
@@ -132,8 +128,47 @@ impl Scheduler {
     // the scheduler itself doesn't have to call event_loop.run.
     // That will be important for embedding the runtime into external
     // event loops.
-    pub fn run(~self) -> ~Scheduler {
-        assert!(!self.in_task_context());
+
+    // Take a main task to run, and a scheduler to run it in. Create a
+    // scheduler task and bootstrap into it.
+    pub fn bootstrap(~self, task: ~Task) {
+
+        // Initialize the TLS key.
+        local_ptr::init_tls_key();
+
+        // Create a task for the scheduler with an empty context.
+        let sched_task = Task::new_sched_task();
+
+        // Now that we have an empty task struct for the scheduler
+        // task, put it in TLS.
+        Local::put::(~sched_task);
+
+        // Now, as far as all the scheduler state is concerned, we are
+        // inside the "scheduler" context. So we can act like the
+        // scheduler and resume the provided task.
+        self.resume_task_immediately(task);
+
+        // Now we are back in the scheduler context, having
+        // successfully run the input task. Start by running the
+        // scheduler. Grab it out of TLS - performing the scheduler
+        // action will have given it away.
+        let sched = Local::take::<Scheduler>();
+        sched.run();
+
+        // Now that we are done with the scheduler, clean up the
+        // scheduler task. Do so by removing it from TLS and manually
+        // cleaning up the memory it uses. As we didn't actually call
+        // task.run() on the scheduler task we never get through all
+        // the cleanup code it runs.
+
+        rtdebug!("post sched.run(), cleaning up scheduler task");
+        let mut stask = Local::take::<Task>();
+        stask.destroyed = true;
+    }
+
+    // This does not return a scheduler, as the scheduler is placed
+    // inside the task.
+    pub fn run(~self) {
 
         let mut self_sched = self;
 
@@ -142,79 +177,88 @@ impl Scheduler {
         // schedulers.
         self_sched.event_loop.callback(Scheduler::run_sched_once);
 
+        // This is unsafe because we need to place the scheduler, with
+        // the event_loop inside, inside our task. But we still need a
+        // mutable reference to the event_loop to give it the "run"
+        // command.
         unsafe {
-            let event_loop: *mut ~EventLoopObject = {
-                let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
-                event_loop
-            };
+            let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
 
-            // Give ownership of the scheduler (self) to the thread
-            Local::put(self_sched);
+            // Our scheduler must be in the task before the event loop
+            // is started.
+            let self_sched = Cell::new(self_sched);
+            do Local::borrow::<Task,()> |stask| {
+                stask.sched = Some(self_sched.take());
+            };
 
             (*event_loop).run();
         }
-
-        rtdebug!("run taking sched");
-        let sched = Local::take::<Scheduler>();
-        // XXX: Reenable this once we're using a per-scheduler queue. With a shared
-        // queue this is not true
-        //assert!(sched.work_queue.is_empty());
-        rtdebug!("scheduler metrics: %s\n", {
-            use to_str::ToStr;
-            sched.metrics.to_str()
-        });
-        return sched;
     }
 
-    fn run_sched_once() {
+    // One iteration of the scheduler loop, always run at least once.
 
-        let mut sched = Local::take::<Scheduler>();
-        sched.metrics.turns += 1;
-
-        // First, check the message queue for instructions.
-        // XXX: perf. Check for messages without atomics.
-        // It's ok if we miss messages occasionally, as long as
-        // we sync and check again before sleeping.
-        if sched.interpret_message_queue() {
-            // We performed a scheduling action. There may be other work
-            // to do yet, so let's try again later.
-            rtdebug!("run_sched_once, interpret_message_queue taking sched");
-            let mut sched = Local::take::<Scheduler>();
-            sched.metrics.messages_received += 1;
-            sched.event_loop.callback(Scheduler::run_sched_once);
-            Local::put(sched);
-            return;
-        }
+    // The model for this function is that you continue through it
+    // until you either use the scheduler while performing a schedule
+    // action, in which case you give it away and do not return, or
+    // you reach the end and sleep. In the case that a scheduler
+    // action is performed the loop is evented such that this function
+    // is called again.
+    fn run_sched_once() {
 
-        // Now, look in the work queue for tasks to run
-        rtdebug!("run_sched_once taking");
+        // When we reach the scheduler context via the event loop we
+        // already have a scheduler stored in our local task, so we
+        // start off by taking it. This is the only path through the
+        // scheduler where we get the scheduler this way.
         let sched = Local::take::<Scheduler>();
-        if sched.resume_task_from_queue() {
-            // We performed a scheduling action. There may be other work
-            // to do yet, so let's try again later.
-            do Local::borrow::<Scheduler, ()> |sched| {
-                sched.metrics.tasks_resumed_from_queue += 1;
-                sched.event_loop.callback(Scheduler::run_sched_once);
+
+        // Our first task is to read mail to see if we have important
+        // messages.
+
+        // 1) A wake message is easy, mutate sched struct and return
+        //    it.
+        // 2) A shutdown is also easy, shutdown.
+        // 3) A pinned task - we resume immediately and do not return
+        //    here.
+
+        let result = sched.interpret_message_queue();
+        let sched = match result {
+            Some(sched) => {
+                // We did not resume a task, so we returned.
+                sched
             }
-            return;
-        }
+            None => {
+                return;
+            }
+        };
+
+        let result = sched.resume_task_from_queue();
+        let mut sched = match result {
+            Some(sched) => {
+                // Failed to dequeue a task, so we return.
+                sched
+            }
+            None => {
+                return;
+            }
+        };
 
         // If we got here then there was no work to do.
         // Generate a SchedHandle and push it to the sleeper list so
         // somebody can wake us up later.
-        rtdebug!("no work to do");
-        do Local::borrow::<Scheduler, ()> |sched| {
-            sched.metrics.wasted_turns += 1;
-            if !sched.sleepy && !sched.no_sleep {
-                rtdebug!("sleeping");
-                sched.metrics.sleepy_times += 1;
-                sched.sleepy = true;
-                let handle = sched.make_handle();
-                sched.sleeper_list.push(handle);
-            } else {
-                rtdebug!("not sleeping");
-            }
+        sched.metrics.wasted_turns += 1;
+        if !sched.sleepy && !sched.no_sleep {
+            rtdebug!("scheduler has no work to do, going to sleep");
+            sched.metrics.sleepy_times += 1;
+            sched.sleepy = true;
+            let handle = sched.make_handle();
+            sched.sleeper_list.push(handle);
+        } else {
+            rtdebug!("not sleeping, already doing so or no_sleep set");
         }
+
+        // Finished a cycle without using the Scheduler. Place it back
+        // in TLS.
+        Local::put(sched);
     }
 
     pub fn make_handle(&mut self) -> SchedHandle {
@@ -234,18 +278,6 @@ impl Scheduler {
     /// to the work queue directly.
     pub fn enqueue_task(&mut self, task: ~Task) {
 
-        // We don't want to queue tasks that belong on other threads,
-        // so we send them home at enqueue time.
-
-        // The borrow checker doesn't like our disassembly of the
-        // Coroutine struct and partial use and mutation of the
-        // fields. So completely disassemble here and stop using?
-
-        // XXX perf: I think we might be able to shuffle this code to
-        // only destruct when we need to.
-
-        rtdebug!("a task was queued on: %u", self.sched_id());
-
         let this = self;
 
         // We push the task onto our local queue clone.
@@ -283,30 +315,23 @@ impl Scheduler {
 
     // * Scheduler-context operations
 
-    fn interpret_message_queue(~self) -> bool {
-        assert!(!self.in_task_context());
-
-        rtdebug!("looking for scheduler messages");
+    // This function returns None if the scheduler is "used", or it
+    // returns the still-available scheduler.
+    fn interpret_message_queue(~self) -> Option<~Scheduler> {
 
         let mut this = self;
         match this.message_queue.pop() {
             Some(PinnedTask(task)) => {
-                rtdebug!("recv BiasedTask message in sched: %u",
-                         this.sched_id());
                 let mut task = task;
-                task.home = Some(Sched(this.make_handle()));
+                task.give_home(Sched(this.make_handle()));
                 this.resume_task_immediately(task);
-                return true;
+                return None;
             }
-
             Some(Wake) => {
-                rtdebug!("recv Wake message");
                 this.sleepy = false;
-                Local::put(this);
-                return true;
+                return Some(this);
             }
             Some(Shutdown) => {
-                rtdebug!("recv Shutdown message");
                 if this.sleepy {
                     // There may be an outstanding handle on the
                     // sleeper list.  Pop them all to make sure that's
@@ -325,12 +350,14 @@ impl Scheduler {
                 // event loop references we will shut down.
                 this.no_sleep = true;
                 this.sleepy = false;
-                Local::put(this);
-                return true;
+                // YYY: Does a shutdown count as a "use" of the
+                // scheduler? This seems to work - so I'm leaving it
+                // this way despite not having a solid rational for
+                // why I should return the scheduler here.
+                return Some(this);
             }
             None => {
-                Local::put(this);
-                return false;
+                return Some(this);
             }
         }
     }
@@ -338,7 +365,7 @@ impl Scheduler {
     /// Given an input Coroutine sends it back to its home scheduler.
     fn send_task_home(task: ~Task) {
         let mut task = task;
-        let mut home = task.home.take_unwrap();
+        let mut home = task.take_unwrap_home();
         match home {
             Sched(ref mut home_handle) => {
                 home_handle.send(PinnedTask(task));
@@ -351,69 +378,45 @@ impl Scheduler {
 
     // Resume a task from the queue - but also take into account that
     // it might not belong here.
-    fn resume_task_from_queue(~self) -> bool {
-        assert!(!self.in_task_context());
 
-        rtdebug!("looking in work queue for task to schedule");
+    // If we perform a scheduler action we give away the scheduler ~
+    // pointer, if it is still available we return it.
+
+    fn resume_task_from_queue(~self) -> Option<~Scheduler> {
+
         let mut this = self;
 
-        // The borrow checker imposes the possibly absurd requirement
-        // that we split this into two match expressions. This is due
-        // to the inspection of the internal bits of task, as that
-        // can't be in scope when we act on task.
         match this.work_queue.pop() {
             Some(task) => {
-                let action_id = {
-                    let home = &task.home;
-                    match home {
-                        &Some(Sched(ref home_handle))
-                        if home_handle.sched_id != this.sched_id() => {
-                            SendHome
-                        }
-                        &Some(AnySched) if this.run_anything => {
-                            ResumeNow
-                        }
-                        &Some(AnySched) => {
-                            Requeue
-                        }
-                        &Some(Sched(_)) => {
-                            ResumeNow
-                        }
-                        &None => {
-                            Homeless
+                let mut task = task;
+                let home = task.take_unwrap_home();
+                match home {
+                    Sched(home_handle) => {
+                        if home_handle.sched_id != this.sched_id() {
+                            task.give_home(Sched(home_handle));
+                            Scheduler::send_task_home(task);
+                            return Some(this);
+                        } else {
+                            task.give_home(Sched(home_handle));
+                            this.resume_task_immediately(task);
+                            return None;
                         }
                     }
-                };
-
-                match action_id {
-                    SendHome => {
-                        rtdebug!("sending task home");
-                        Scheduler::send_task_home(task);
-                        Local::put(this);
-                        return false;
-                    }
-                    ResumeNow => {
-                        rtdebug!("resuming now");
+                    AnySched if this.run_anything => {
+                        task.give_home(AnySched);
                         this.resume_task_immediately(task);
-                        return true;
+                        return None;
                     }
-                    Requeue => {
-                        rtdebug!("re-queueing")
+                    AnySched => {
+                        task.give_home(AnySched);
                         this.enqueue_task(task);
-                        Local::put(this);
-                        return false;
-                    }
-                    Homeless => {
-                        rtabort!("task home was None!");
+                        return Some(this);
                     }
                 }
             }
-
             None => {
-               rtdebug!("no tasks in queue");
-               Local::put(this);
-               return false;
-           }
+                return Some(this);
+            }
         }
     }
 
@@ -422,33 +425,20 @@ impl Scheduler {
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
     pub fn terminate_current_task(~self) {
+        // Similar to deschedule running task and then, but cannot go through
+        // the task-blocking path. The task is already dying.
         let mut this = self;
-        assert!(this.in_task_context());
-
-        rtdebug!("ending running task");
-
-        // This task is post-cleanup, so it must be unkillable. This sequence
-        // of descheduling and recycling must not get interrupted by a kill.
-        // FIXME(#7544): Make this use an inner descheduler, like yield should.
-        this.current_task.get_mut_ref().death.unkillable += 1;
-
-        do this.deschedule_running_task_and_then |sched, dead_task| {
-            match dead_task.wake() {
-                Some(dead_task) => {
-                    let mut dead_task = dead_task;
-                    dead_task.death.unkillable -= 1; // FIXME(#7544) ugh
-                    let coroutine = dead_task.coroutine.take_unwrap();
-                    coroutine.recycle(&mut sched.stack_pool);
-                }
-                None => rtabort!("dead task killed before recycle"),
-            }
+        let stask = this.sched_task.take_unwrap();
+        do this.change_task_context(stask) |sched, mut dead_task| {
+            let coroutine = dead_task.coroutine.take_unwrap();
+            coroutine.recycle(&mut sched.stack_pool);
         }
-
-        rtabort!("control reached end of task");
     }
 
-    pub fn schedule_task(~self, task: ~Task) {
-        assert!(self.in_task_context());
+    // If a scheduling action is performed, return None. If not,
+    // return Some(sched).
+
+    pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
 
         // is the task home?
         let is_home = task.is_home_no_tls(&self);
@@ -461,55 +451,115 @@ impl Scheduler {
         if is_home || (!homed && this.run_anything) {
             // here we know we are home, execute now OR we know we
             // aren't homed, and that this sched doesn't care
+            rtdebug!("task: %u is on ok sched, executing", to_uint(task));
             do this.switch_running_tasks_and_then(task) |sched, last_task| {
                 sched.enqueue_blocked_task(last_task);
             }
+            return None;
         } else if !homed && !this.run_anything {
             // the task isn't homed, but it can't be run here
             this.enqueue_task(task);
-            Local::put(this);
+            return Some(this);
         } else {
             // task isn't home, so don't run it here, send it home
             Scheduler::send_task_home(task);
-            Local::put(this);
+            return Some(this);
         }
     }
 
-    // Core scheduling ops
-
-    pub fn resume_task_immediately(~self, task: ~Task) {
+    // The primary function for changing contexts. In the current
+    // design the scheduler is just a slightly modified GreenTask, so
+    // all context swaps are from Task to Task. The only difference
+    // between the various cases is where the inputs come from, and
+    // what is done with the resulting task. That is specified by the
+    // cleanup function f, which takes the scheduler and the
+    // old task as inputs.
+
+    pub fn change_task_context(~self,
+                               next_task: ~Task,
+                               f: &fn(&mut Scheduler, ~Task)) {
         let mut this = self;
-        assert!(!this.in_task_context());
 
-        rtdebug!("scheduling a task");
-        this.metrics.context_switches_sched_to_task += 1;
+        // The current task is grabbed from TLS, not taken as an input.
+        let current_task: ~Task = Local::take::<Task>();
 
-        // Store the task in the scheduler so it can be grabbed later
-        this.current_task = Some(task);
-        this.enqueue_cleanup_job(DoNothing);
+        // These transmutes do something fishy with a closure.
+        let f_fake_region = unsafe {
+            transmute::<&fn(&mut Scheduler, ~Task),
+                        &fn(&mut Scheduler, ~Task)>(f)
+        };
+        let f_opaque = ClosureConverter::from_fn(f_fake_region);
+
+        // The current task is placed inside an enum with the cleanup
+        // function. This enum is then placed inside the scheduler.
+        this.enqueue_cleanup_job(GiveTask(current_task, f_opaque));
 
-        Local::put(this);
+        // The scheduler is then placed inside the next task.
+        let mut next_task = next_task;
+        next_task.sched = Some(this);
 
-        // Take pointers to both the task and scheduler's saved registers.
+        // However we still need an internal mutable pointer to the
+        // original task. The strategy here was "arrange memory, then
+        // get pointers", so we crawl back up the chain using
+        // transmute to eliminate borrowck errors.
         unsafe {
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            let (sched_context, _, next_task_context) = (*sched).get_contexts();
-            let next_task_context = next_task_context.unwrap();
-            // Context switch to the task, restoring it's registers
-            // and saving the scheduler's
-            Context::swap(sched_context, next_task_context);
 
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            // The running task should have passed ownership elsewhere
-            assert!((*sched).current_task.is_none());
+            let sched: &mut Scheduler =
+                transmute_mut_region(*next_task.sched.get_mut_ref());
+
+            let current_task: &mut Task = match sched.cleanup_job {
+                Some(GiveTask(ref task, _)) => {
+                    transmute_mut_region(*transmute_mut_unsafe(task))
+                }
+                Some(DoNothing) => {
+                    rtabort!("no next task");
+                }
+                None => {
+                    rtabort!("no cleanup job");
+                }
+            };
+
+            let (current_task_context, next_task_context) =
+                Scheduler::get_contexts(current_task, next_task);
+
+            // Done with everything - put the next task in TLS. This
+            // works because due to transmute the borrow checker
+            // believes that we have no internal pointers to
+            // next_task.
+            Local::put(next_task);
+
+            // The raw context swap operation. The next action taken
+            // will be running the cleanup job from the context of the
+            // next task.
+            Context::swap(current_task_context, next_task_context);
+        }
 
-            // Running tasks may have asked us to do some cleanup
+        // When the context swaps back to the scheduler we immediately
+        // run the cleanup job, as expected by the previously called
+        // swap_contexts function.
+        unsafe {
+            let sched = Local::unsafe_borrow::<Scheduler>();
             (*sched).run_cleanup_job();
 
             // Must happen after running the cleanup job (of course).
-            // Might not be running in task context; if not, a later call to
-            // resume_task_immediately will take care of this.
-            (*sched).current_task.map(|t| t.death.check_killed());
+            let task = Local::unsafe_borrow::<Task>();
+            (*task).death.check_killed();
+        }
+    }
+
+    // There are a variety of "obvious" functions to be passed to
+    // change_task_context, so we can make a few "named cases".
+
+    // Enqueue the old task on the current scheduler.
+    pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) {
+        sched.enqueue_task(task);
+    }
+
+    // Sometimes we just want the old API though.
+
+    pub fn resume_task_immediately(~self, task: ~Task) {
+        do self.change_task_context(task) |sched, stask| {
+            sched.sched_task = Some(stask);
         }
     }
 
@@ -533,152 +583,78 @@ impl Scheduler {
     /// in order to prevent that fn from performing further scheduling operations.
     /// Doing further scheduling could easily result in infinite recursion.
     pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
+        // Trickier - we need to get the scheduler task out of self
+        // and use it as the destination.
         let mut this = self;
-        assert!(this.in_task_context());
-
-        rtdebug!("blocking task");
-        this.metrics.context_switches_task_to_sched += 1;
-
-        unsafe {
-            let blocked_task = this.current_task.take_unwrap();
-            let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask),
-                                            &fn(&mut Scheduler, BlockedTask)>(f);
-            let f_opaque = ClosureConverter::from_fn(f_fake_region);
-            this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
-        }
-
-        Local::put(this);
-
-        unsafe {
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            let (sched_context, last_task_context, _) = (*sched).get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            Context::swap(last_task_context, sched_context);
-
-            // We could be executing in a different thread now
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            (*sched).run_cleanup_job();
-
-            // As above, must happen after running the cleanup job.
-            (*sched).current_task.map(|t| t.death.check_killed());
-        }
+        let stask = this.sched_task.take_unwrap();
+        // Otherwise this is the same as below.
+        this.switch_running_tasks_and_then(stask, f);
     }
 
-    /// Switch directly to another task, without going through the scheduler.
-    /// You would want to think hard about doing this, e.g. if there are
-    /// pending I/O events it would be a bad idea.
     pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
                                          f: &fn(&mut Scheduler, BlockedTask)) {
-        let mut this = self;
-        assert!(this.in_task_context());
-
-        rtdebug!("switching tasks");
-        this.metrics.context_switches_task_to_task += 1;
+        // This is where we convert the BlockedTask-taking closure into one
+        // that takes just a Task, and is aware of the block-or-killed protocol.
+        do self.change_task_context(next_task) |sched, task| {
+            // Task might need to receive a kill signal instead of blocking.
+            // We can call the "and_then" only if it blocks successfully.
+            match BlockedTask::try_block(task) {
+                Left(killed_task) => sched.enqueue_task(killed_task),
+                Right(blocked_task) => f(sched, blocked_task),
+            }
+        }
+    }
 
-        let old_running_task = this.current_task.take_unwrap();
-        let f_fake_region = unsafe {
-            transmute::<&fn(&mut Scheduler, BlockedTask),
-                        &fn(&mut Scheduler, BlockedTask)>(f)
+    // A helper that looks up the scheduler and runs a task later by
+    // enqueuing it.
+    pub fn run_task_later(next_task: ~Task) {
+        // We aren't performing a scheduler operation, so we want to
+        // put the Scheduler back when we finish.
+        let next_task = Cell::new(next_task);
+        do Local::borrow::<Scheduler,()> |sched| {
+            sched.enqueue_task(next_task.take());
         };
-        let f_opaque = ClosureConverter::from_fn(f_fake_region);
-        this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
-        this.current_task = Some(next_task);
+    }
 
-        Local::put(this);
+    // A helper that looks up the scheduler and runs a task. If it can
+    // be run now it is run now.
+    pub fn run_task(new_task: ~Task) {
+        let sched = Local::take::<Scheduler>();
+        sched.schedule_task(new_task).map_consume(Local::put);
+    }
 
+    // Returns a mutable reference to both contexts involved in this
+    // swap. This is unsafe - we are getting mutable internal
+    // references to keep even when we don't own the tasks. It looks
+    // kinda safe because we are doing transmutes before passing in
+    // the arguments.
+    pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
+        (&'a mut Context, &'a mut Context) {
+        let current_task_context =
+            &mut current_task.coroutine.get_mut_ref().saved_context;
+        let next_task_context =
+            &mut next_task.coroutine.get_mut_ref().saved_context;
         unsafe {
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            let (_, last_task_context, next_task_context) = (*sched).get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            let next_task_context = next_task_context.unwrap();
-            Context::swap(last_task_context, next_task_context);
-
-            // We could be executing in a different thread now
-            let sched = Local::unsafe_borrow::<Scheduler>();
-            (*sched).run_cleanup_job();
-
-            // As above, must happen after running the cleanup job.
-            (*sched).current_task.map(|t| t.death.check_killed());
+            (transmute_mut_region(current_task_context),
+             transmute_mut_region(next_task_context))
         }
     }
 
-
-
     // * Other stuff
 
     pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
-        assert!(self.cleanup_job.is_none());
         self.cleanup_job = Some(job);
     }
 
     pub fn run_cleanup_job(&mut self) {
         rtdebug!("running cleanup job");
-
-        assert!(self.cleanup_job.is_some());
-
         let cleanup_job = self.cleanup_job.take_unwrap();
         match cleanup_job {
             DoNothing => { }
-            GiveTask(task, f) => {
-                let f = f.to_fn();
-                // Task might need to receive a kill signal instead of blocking.
-                // We can call the "and_then" only if it blocks successfully.
-                match BlockedTask::try_block(task) {
-                    Left(killed_task) => self.enqueue_task(killed_task),
-                    Right(blocked_task) => f(self, blocked_task),
-                }
-            }
+            GiveTask(task, f) => f.to_fn()(self, task)
         }
     }
 
-    /// Get mutable references to all the contexts that may be involved in a
-    /// context switch.
-    ///
-    /// Returns (the scheduler context, the optional context of the
-    /// task in the cleanup list, the optional context of the task in
-    /// the current task slot).  When context switching to a task,
-    /// callers should first arrange for that task to be located in the
-    /// Scheduler's current_task slot and set up the
-    /// post-context-switch cleanup job.
-    pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
-                                              Option<&'a mut Context>,
-                                              Option<&'a mut Context>) {
-        let last_task = match self.cleanup_job {
-            Some(GiveTask(~ref task, _)) => {
-                Some(task)
-            }
-            Some(DoNothing) => {
-                None
-            }
-            None => fail!("all context switches should have a cleanup job")
-        };
-        // XXX: Pattern matching mutable pointers above doesn't work
-        // because borrowck thinks the three patterns are conflicting
-        // borrows
-        unsafe {
-            let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
-            let last_task_context = match last_task {
-                Some(t) => {
-                    Some(&mut t.coroutine.get_mut_ref().saved_context)
-                }
-                None => {
-                    None
-                }
-            };
-            let next_task_context = match self.current_task {
-                Some(ref mut t) => {
-                    Some(&mut t.coroutine.get_mut_ref().saved_context)
-                }
-                None => {
-                    None
-                }
-            };
-            // XXX: These transmutes can be removed after snapshot
-            return (transmute(&mut self.saved_context),
-                    last_task_context,
-                    transmute(next_task_context));
-        }
-    }
 }
 
 // The cases for the below function.
@@ -700,29 +676,73 @@ impl SchedHandle {
 // complaining
 type UnsafeTaskReceiver = raw::Closure;
 trait ClosureConverter {
-    fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self;
-    fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask);
+    fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
 }
 impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver {
+    fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
         unsafe { transmute(f) }
     }
-    fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } }
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
 }
 
-
 #[cfg(test)]
 mod test {
+    use rt::test::*;
+    use unstable::run_in_bare_thread;
+    use borrow::to_uint;
+    use rt::local::*;
+    use rt::sched::{Scheduler};
+    use uint;
     use int;
     use cell::Cell;
-    use unstable::run_in_bare_thread;
-    use task::spawn;
-    use rt::local::Local;
-    use rt::test::*;
-    use super::*;
     use rt::thread::Thread;
-    use borrow::to_uint;
-    use rt::task::{Task,Sched};
+    use rt::task::{Task, Sched};
+    use option::{Some};
+
+    #[test]
+    fn trivial_run_in_newsched_task_test() {
+        let mut task_ran = false;
+        let task_ran_ptr: *mut bool = &mut task_ran;
+        do run_in_newsched_task || {
+            unsafe { *task_ran_ptr = true };
+            rtdebug!("executed from the new scheduler")
+        }
+        assert!(task_ran);
+    }
+
+    #[test]
+    fn multiple_task_test() {
+        let total = 10;
+        let mut task_run_count = 0;
+        let task_run_count_ptr: *mut uint = &mut task_run_count;
+        do run_in_newsched_task || {
+            for uint::range(0,total) |_| {
+                do spawntask || {
+                    unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
+                }
+            }
+        }
+        assert!(task_run_count == total);
+    }
+
+    #[test]
+    fn multiple_task_nested_test() {
+        let mut task_run_count = 0;
+        let task_run_count_ptr: *mut uint = &mut task_run_count;
+        do run_in_newsched_task || {
+            do spawntask || {
+                unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
+                do spawntask || {
+                    unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
+                    do spawntask || {
+                        unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
+                    }
+                }
+            }
+        }
+        assert!(task_run_count == 3);
+    }
 
     // Confirm that a sched_id actually is the uint form of the
     // pointer to the scheduler struct.
@@ -745,46 +765,50 @@ mod test {
         }
     }
 
-    // A simple test to check if a homed task run on a single
-    // scheduler ends up executing while home.
+
+    // A very simple test that confirms that a task executing on the
+    // home scheduler notices that it is home.
     #[test]
     fn test_home_sched() {
         do run_in_bare_thread {
             let mut task_ran = false;
             let task_ran_ptr: *mut bool = &mut task_ran;
-            let mut sched = ~new_test_uv_sched();
 
+            let mut sched = ~new_test_uv_sched();
             let sched_handle = sched.make_handle();
-            let sched_id = sched.sched_id();
 
-            let task = ~do Task::new_root_homed(&mut sched.stack_pool,
-                                                 Sched(sched_handle)) {
+            let mut task = ~do Task::new_root_homed(&mut sched.stack_pool,
+                                                Sched(sched_handle)) {
                 unsafe { *task_ran_ptr = true };
-                let sched = Local::take::<Scheduler>();
-                assert!(sched.sched_id() == sched_id);
-                Local::put::<Scheduler>(sched);
+                assert!(Task::on_appropriate_sched());
             };
-            sched.enqueue_task(task);
-            sched.run();
-            assert!(task_ran);
+
+            let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
+            task.death.on_exit = Some(on_exit);
+
+            sched.bootstrap(task);
         }
     }
 
-    // A test for each state of schedule_task
+    // An advanced test that checks all four possible states that a
+    // (task,sched) can be in regarding homes.
+
     #[test]
     fn test_schedule_home_states() {
 
         use rt::uv::uvio::UvEventLoop;
-        use rt::sched::Shutdown;
         use rt::sleeper_list::SleeperList;
         use rt::work_queue::WorkQueue;
+        use rt::sched::Shutdown;
+        use borrow;
+        use rt::comm::*;
 
         do run_in_bare_thread {
 
             let sleepers = SleeperList::new();
             let work_queue = WorkQueue::new();
 
-            // our normal scheduler
+            // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
                 ~UvEventLoop::new(),
                 work_queue.clone(),
@@ -792,113 +816,93 @@ mod test {
 
             let normal_handle = Cell::new(normal_sched.make_handle());
 
-            // our special scheduler
+            // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
                 ~UvEventLoop::new(),
                 work_queue.clone(),
                 sleepers.clone(),
-                true);
+                false);
 
             let special_handle = Cell::new(special_sched.make_handle());
-            let special_handle2 = Cell::new(special_sched.make_handle());
-            let special_id = special_sched.sched_id();
+
             let t1_handle = special_sched.make_handle();
             let t4_handle = special_sched.make_handle();
 
-            let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool,
-                                               Sched(t1_handle)) || {
-                let is_home = Task::is_home_using_id(special_id);
-                rtdebug!("t1 should be home: %b", is_home);
-                assert!(is_home);
-            };
-            let t1f = Cell::new(t1f);
+            // Four test tasks:
+            //   1) task is home on special
+            //   2) task not homed, sched doesn't care
+            //   3) task not homed, sched requeues
+            //   4) task not home, send home
 
-            let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) {
-                let on_special = Task::on_special();
-                rtdebug!("t2 should not be on special: %b", on_special);
-                assert!(!on_special);
+            let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool,
+                                                 Sched(t1_handle)) || {
+                rtassert!(Task::on_appropriate_sched());
             };
-            let t2f = Cell::new(t2f);
+            rtdebug!("task1 id: **%u**", borrow::to_uint(task1));
 
-            let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) {
-                // not on special
-                let on_special = Task::on_special();
-                rtdebug!("t3 should not be on special: %b", on_special);
-                assert!(!on_special);
-            };
-            let t3f = Cell::new(t3f);
-
-            let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool,
-                                          Sched(t4_handle)) {
-                // is home
-                let home = Task::is_home_using_id(special_id);
-                rtdebug!("t4 should be home: %b", home);
-                assert!(home);
+            let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) {
+                rtassert!(Task::on_appropriate_sched());
             };
-            let t4f = Cell::new(t4f);
 
-            // we have four tests, make them as closures
-            let t1: ~fn() = || {
-                // task is home on special
-                let task = t1f.take();
-                let sched = Local::take::<Scheduler>();
-                sched.schedule_task(task);
+            let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) {
+                rtassert!(Task::on_appropriate_sched());
             };
-            let t2: ~fn() = || {
-                // not homed, task doesn't care
-                let task = t2f.take();
-                let sched = Local::take::<Scheduler>();
-                sched.schedule_task(task);
-            };
-            let t3: ~fn() = || {
-                // task not homed, must leave
-                let task = t3f.take();
-                let sched = Local::take::<Scheduler>();
-                sched.schedule_task(task);
-            };
-            let t4: ~fn() = || {
-                // task not home, send home
-                let task = t4f.take();
-                let sched = Local::take::<Scheduler>();
-                sched.schedule_task(task);
+
+            let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool,
+                                                 Sched(t4_handle)) {
+                rtassert!(Task::on_appropriate_sched());
             };
+            rtdebug!("task4 id: **%u**", borrow::to_uint(task4));
+
+            let task1 = Cell::new(task1);
+            let task2 = Cell::new(task2);
+            let task3 = Cell::new(task3);
+            let task4 = Cell::new(task4);
 
-            let t1 = Cell::new(t1);
-            let t2 = Cell::new(t2);
-            let t3 = Cell::new(t3);
-            let t4 = Cell::new(t4);
-
-            // build a main task that runs our four tests
-            let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) {
-                // the two tasks that require a normal start location
-                t2.take()();
-                t4.take()();
-                normal_handle.take().send(Shutdown);
-                special_handle.take().send(Shutdown);
+            // Signal from the special task that we are done.
+            let (port, chan) = oneshot::<()>();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) {
+                rtdebug!("*about to submit task2*");
+                Scheduler::run_task(task2.take());
+                rtdebug!("*about to submit task4*");
+                Scheduler::run_task(task4.take());
+                rtdebug!("*normal_task done*");
+                port.take().recv();
+                let mut nh = normal_handle.take();
+                nh.send(Shutdown);
+                let mut sh = special_handle.take();
+                sh.send(Shutdown);
             };
 
-            // task to run the two "special start" tests
-            let special_task = ~do Task::new_root_homed(
-                &mut special_sched.stack_pool,
-                Sched(special_handle2.take())) {
-                t1.take()();
-                t3.take()();
+            rtdebug!("normal task: %u", borrow::to_uint(normal_task));
+
+            let special_task = ~do Task::new_root(&mut special_sched.stack_pool) {
+                rtdebug!("*about to submit task1*");
+                Scheduler::run_task(task1.take());
+                rtdebug!("*about to submit task3*");
+                Scheduler::run_task(task3.take());
+                rtdebug!("*done with special_task*");
+                chan.take().send(());
             };
 
-            // enqueue the main tasks
-            normal_sched.enqueue_task(special_task);
-            normal_sched.enqueue_task(main_task);
+            rtdebug!("special task: %u", borrow::to_uint(special_task));
+
+            let special_sched = Cell::new(special_sched);
+            let normal_sched = Cell::new(normal_sched);
+            let special_task = Cell::new(special_task);
+            let normal_task = Cell::new(normal_task);
 
-            let nsched_cell = Cell::new(normal_sched);
             let normal_thread = do Thread::start {
-                let sched = nsched_cell.take();
-                sched.run();
+                normal_sched.take().bootstrap(normal_task.take());
+                rtdebug!("finished with normal_thread");
             };
 
-            let ssched_cell = Cell::new(special_sched);
             let special_thread = do Thread::start {
-                let sched = ssched_cell.take();
-                sched.run();
+                special_sched.take().bootstrap(special_task.take());
+                rtdebug!("finished with special_sched");
             };
 
             normal_thread.join();
@@ -906,7 +910,6 @@ mod test {
         }
     }
 
-    // Do it a lot
     #[test]
     fn test_stress_schedule_task_states() {
         let n = stress_factor() * 120;
@@ -916,123 +919,13 @@ mod test {
     }
 
     #[test]
-    fn test_simple_scheduling() {
-        do run_in_bare_thread {
-            let mut task_ran = false;
-            let task_ran_ptr: *mut bool = &mut task_ran;
-
-            let mut sched = ~new_test_uv_sched();
-            let task = ~do Task::new_root(&mut sched.stack_pool) {
-                unsafe { *task_ran_ptr = true; }
-            };
-            sched.enqueue_task(task);
-            sched.run();
-            assert!(task_ran);
-        }
-    }
-
-    #[test]
-    fn test_several_tasks() {
-        do run_in_bare_thread {
-            let total = 10;
-            let mut task_count = 0;
-            let task_count_ptr: *mut int = &mut task_count;
-
-            let mut sched = ~new_test_uv_sched();
-            for int::range(0, total) |_| {
-                let task = ~do Task::new_root(&mut sched.stack_pool) {
-                    unsafe { *task_count_ptr = *task_count_ptr + 1; }
-                };
-                sched.enqueue_task(task);
-            }
-            sched.run();
-            assert_eq!(task_count, total);
-        }
-    }
-
-    #[test]
-    fn test_swap_tasks_then() {
-        do run_in_bare_thread {
-            let mut count = 0;
-            let count_ptr: *mut int = &mut count;
-
-            let mut sched = ~new_test_uv_sched();
-            let task1 = ~do Task::new_root(&mut sched.stack_pool) {
-                unsafe { *count_ptr = *count_ptr + 1; }
-                let mut sched = Local::take::<Scheduler>();
-                let task2 = ~do Task::new_root(&mut sched.stack_pool) {
-                    unsafe { *count_ptr = *count_ptr + 1; }
-                };
-                // Context switch directly to the new task
-                do sched.switch_running_tasks_and_then(task2) |sched, task1| {
-                    sched.enqueue_blocked_task(task1);
-                }
-                unsafe { *count_ptr = *count_ptr + 1; }
-            };
-            sched.enqueue_task(task1);
-            sched.run();
-            assert_eq!(count, 3);
-        }
-    }
-
-    #[bench] #[test] #[ignore(reason = "long test")]
-    fn test_run_a_lot_of_tasks_queued() {
-        do run_in_bare_thread {
-            static MAX: int = 1000000;
-            let mut count = 0;
-            let count_ptr: *mut int = &mut count;
-
-            let mut sched = ~new_test_uv_sched();
-
-            let start_task = ~do Task::new_root(&mut sched.stack_pool) {
-                run_task(count_ptr);
-            };
-            sched.enqueue_task(start_task);
-            sched.run();
-
-            assert_eq!(count, MAX);
-
-            fn run_task(count_ptr: *mut int) {
-                do Local::borrow::<Scheduler, ()> |sched| {
-                    let task = ~do Task::new_root(&mut sched.stack_pool) {
-                        unsafe {
-                            *count_ptr = *count_ptr + 1;
-                            if *count_ptr != MAX {
-                                run_task(count_ptr);
-                            }
-                        }
-                    };
-                    sched.enqueue_task(task);
-                }
-            };
-        }
-    }
-
-    #[test]
-    fn test_block_task() {
-        do run_in_bare_thread {
-            let mut sched = ~new_test_uv_sched();
-            let task = ~do Task::new_root(&mut sched.stack_pool) {
-                let sched = Local::take::<Scheduler>();
-                assert!(sched.in_task_context());
-                do sched.deschedule_running_task_and_then() |sched, task| {
-                    assert!(!sched.in_task_context());
-                    sched.enqueue_blocked_task(task);
-                }
-            };
-            sched.enqueue_task(task);
-            sched.run();
-        }
-    }
-
-    #[test]
     fn test_io_callback() {
         // This is a regression test that when there are no schedulable tasks
         // in the work queue, but we are performing I/O, that once we do put
         // something in the work queue again the scheduler picks it up and doesn't
         // exit before emptying the work queue
         do run_in_newsched_task {
-            do spawn {
+            do spawntask {
                 let sched = Local::take::<Scheduler>();
                 do sched.deschedule_running_task_and_then |sched, task| {
                     let task = Cell::new(task);
@@ -1053,34 +946,21 @@ mod test {
 
         do run_in_bare_thread {
             let (port, chan) = oneshot::<()>();
-            let port_cell = Cell::new(port);
-            let chan_cell = Cell::new(chan);
-            let mut sched1 = ~new_test_uv_sched();
-            let handle1 = sched1.make_handle();
-            let handle1_cell = Cell::new(handle1);
-            let task1 = ~do Task::new_root(&mut sched1.stack_pool) {
-                chan_cell.take().send(());
-            };
-            sched1.enqueue_task(task1);
-
-            let mut sched2 = ~new_test_uv_sched();
-            let task2 = ~do Task::new_root(&mut sched2.stack_pool) {
-                port_cell.take().recv();
-                // Release the other scheduler's handle so it can exit
-                handle1_cell.take();
-            };
-            sched2.enqueue_task(task2);
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
 
-            let sched1_cell = Cell::new(sched1);
-            let thread1 = do Thread::start {
-                let sched1 = sched1_cell.take();
-                sched1.run();
+            let _thread_one = do Thread::start {
+                let chan = Cell::new(chan.take());
+                do run_in_newsched_task_core {
+                    chan.take().send(());
+                }
             };
 
-            let sched2_cell = Cell::new(sched2);
-            let thread2 = do Thread::start {
-                let sched2 = sched2_cell.take();
-                sched2.run();
+            let _thread_two = do Thread::start {
+                let port = Cell::new(port.take());
+                do run_in_newsched_task_core {
+                    port.take().recv();
+                }
             };
 
             thread1.join();
@@ -1112,21 +992,21 @@ mod test {
         }
     }
 
-    #[test]
+     #[test]
     fn thread_ring() {
         use rt::comm::*;
         use comm::{GenericPort, GenericChan};
 
         do run_in_mt_newsched_task {
-            let (end_port, end_chan) = oneshot();
+                let (end_port, end_chan) = oneshot();
 
             let n_tasks = 10;
             let token = 2000;
 
-            let (p, ch1) = stream();
+                let (p, ch1) = stream();
             let mut p = p;
-            ch1.send((token, end_chan));
-            let mut i = 2;
+                ch1.send((token, end_chan));
+                let mut i = 2;
             while i <= n_tasks {
                 let (next_p, ch) = stream();
                 let imm_i = i;
@@ -1151,9 +1031,9 @@ mod test {
             while (true) {
                 match p.recv() {
                     (1, end_chan) => {
-                        debug!("%d\n", id);
-                        end_chan.send(());
-                        return;
+                                        debug!("%d\n", id);
+                                end_chan.send(());
+                                return;
                     }
                     (token, end_chan) => {
                         debug!("thread: %d   got token: %d", id, token);
@@ -1178,15 +1058,16 @@ mod test {
 
             impl Drop for S {
                 fn drop(&self) {
-                    let _foo = @0;
+                        let _foo = @0;
                 }
             }
 
             let s = S { field: () };
 
             do spawntask {
-                let _ss = &s;
+                        let _ss = &s;
             }
         }
     }
+
 }