about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libcore/rt/mod.rs1
-rw-r--r--src/libcore/rt/sched/local.rs100
-rw-r--r--src/libcore/rt/sched/mod.rs (renamed from src/libcore/rt/sched.rs)443
-rw-r--r--src/libcore/rt/uvio.rs362
4 files changed, 457 insertions, 449 deletions
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index 0f2a6cd7ef9..b1227af5f4c 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -32,6 +32,7 @@ macro_rules! rtdebug (
     ($( $arg:expr),+) => ( $(let _ = $arg)*; )
 )
 
+#[path = "sched/mod.rs"]
 mod sched;
 mod rtio;
 pub mod uvll;
diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs
new file mode 100644
index 00000000000..d8001011114
--- /dev/null
+++ b/src/libcore/rt/sched/local.rs
@@ -0,0 +1,100 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Access to the thread-local Scheduler
+
+use ptr::mut_null;
+use libc::c_void;
+use cast::transmute;
+
+use super::Scheduler;
+use tls = super::super::thread_local_storage;
+#[cfg(test)] use super::super::uvio::UvEventLoop;
+
+/// Give the Scheduler to thread-local storage
+pub fn put(sched: ~Scheduler) {
+    unsafe {
+        let key = tls_key();
+        let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
+        tls::set(key, void_sched);
+    }
+}
+
+/// Take ownership of the Scheduler from thread-local storage
+pub fn take() -> ~Scheduler {
+    unsafe {
+        let key = tls_key();
+        let void_sched: *mut c_void = tls::get(key);
+        assert!(void_sched.is_not_null());
+        let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
+        tls::set(key, mut_null());
+        return sched;
+    }
+}
+
+/// Borrow a mutable reference to the thread-local Scheduler
+/// # Safety Note
+/// Because this leaves the Scheduler in thread-local storage it is possible
+/// For the Scheduler pointer to be aliased
+pub unsafe fn borrow() -> &mut Scheduler {
+    unsafe {
+        let key = tls_key();
+        let mut void_sched: *mut c_void = tls::get(key);
+        assert!(void_sched.is_not_null());
+        {
+            let void_sched_ptr = &mut void_sched;
+            let sched: &mut ~Scheduler = {
+                transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
+            };
+            let sched: &mut Scheduler = &mut **sched;
+            return sched;
+        }
+    }
+}
+
+fn tls_key() -> tls::Key {
+    unsafe {
+        let key: *mut c_void = rust_get_sched_tls_key();
+        let key: &mut tls::Key = transmute(key);
+        return *key;
+    }
+}
+
+extern {
+    fn rust_get_sched_tls_key() -> *mut c_void;
+}
+
+#[test]
+fn thread_local_scheduler_smoke_test() {
+    let scheduler = ~UvEventLoop::new_scheduler();
+    put(scheduler);
+    let _scheduler = take();
+}
+
+#[test]
+fn thread_local_scheduler_two_instances() {
+    let scheduler = ~UvEventLoop::new_scheduler();
+    put(scheduler);
+    let _scheduler = take();
+    let scheduler = ~UvEventLoop::new_scheduler();
+    put(scheduler);
+    let _scheduler = take();
+}
+
+#[test]
+fn borrow_smoke_test() {
+    let scheduler = ~UvEventLoop::new_scheduler();
+    put(scheduler);
+    unsafe {
+        let _scheduler = borrow();
+    }
+    let _scheduler = take();
+}
+
diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched/mod.rs
index 25f446fb86d..f157e6a80e0 100644
--- a/src/libcore/rt/sched.rs
+++ b/src/libcore/rt/sched/mod.rs
@@ -11,18 +11,18 @@
 use option::*;
 use sys;
 use cast::transmute;
-use libc::c_void;
-use ptr::mut_null;
 
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool, StackSegment};
-use super::rtio::{EventLoop, EventLoopObject};
+use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject};
 use super::context::Context;
-use tls = super::thread_local_storage;
 
 #[cfg(test)] use super::uvio::UvEventLoop;
 #[cfg(test)] use unstable::run_in_bare_thread;
 #[cfg(test)] use int;
+#[cfg(test)] use cell::Cell;
+
+mod local;
 
 /// The Scheduler is responsible for coordinating execution of Tasks
 /// on a single thread. When the scheduler is running it is owned by
@@ -38,31 +38,25 @@ pub struct Scheduler {
     priv saved_context: Context,
     /// The currently executing task
     priv current_task: Option<~Task>,
-    /// A queue of jobs to perform immediately upon return from task
-    /// context to scheduler context.
-    /// XXX: This probably should be a single cleanup action and it
-    /// should run after a context switch, not on return from the
-    /// scheduler
-    priv cleanup_jobs: ~[CleanupJob]
+    /// An action performed after a context switch on behalf of the
+    /// code running before the context switch
+    priv cleanup_job: Option<CleanupJob>
 }
 
 // XXX: Some hacks to put a &fn in Scheduler without borrowck
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
-trait HackAroundBorrowCk {
-    fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
+trait ClosureConverter {
+    fn from_fn(&fn(~Task)) -> Self;
+    fn to_fn(self) -> &fn(~Task);
 }
-impl HackAroundBorrowCk for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
-        unsafe { transmute(f) }
-    }
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
-        unsafe { transmute(self) }
-    }
+impl ClosureConverter for UnsafeTaskReceiver {
+    fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
 }
 
 enum CleanupJob {
+    DoNothing,
     RescheduleTask(~Task),
     RecycleTask(~Task),
     GiveTask(~Task, UnsafeTaskReceiver)
@@ -84,7 +78,7 @@ pub impl Scheduler {
             stack_pool: StackPool::new(),
             saved_context: Context::empty(),
             current_task: None,
-            cleanup_jobs: ~[]
+            cleanup_job: None
         }
     }
 
@@ -96,39 +90,52 @@ pub impl Scheduler {
         assert!(!self.in_task_context());
 
         // Give ownership of the scheduler (self) to the thread
-        do self.install |scheduler| {
-            fn run_scheduler_once() {
-                do Scheduler::local |scheduler| {
-                    if scheduler.resume_task_from_queue() {
-                        // Ok, a task ran. Nice! We'll do it again later
-                        scheduler.event_loop.callback(run_scheduler_once);
-                    }
+        local::put(self);
+
+        let scheduler = unsafe { local::borrow() };
+        fn run_scheduler_once() {
+            let scheduler = Scheduler::take_local();
+            if scheduler.resume_task_from_queue() {
+                // Ok, a task ran. Nice! We'll do it again later
+                do Scheduler::borrow_local |scheduler| {
+                    scheduler.event_loop.callback(run_scheduler_once);
                 }
             }
-
-            scheduler.event_loop.callback(run_scheduler_once);
-            scheduler.event_loop.run();
         }
+
+        scheduler.event_loop.callback(run_scheduler_once);
+        scheduler.event_loop.run();
+
+        return local::take();
     }
 
-    fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler {
-        let mut tlsched = ThreadLocalScheduler::new();
-        tlsched.put_scheduler(self);
-        {
-            let sched = tlsched.get_scheduler();
-            f(sched);
+    /// Get a mutable pointer to the thread-local I/O
+    /// # Safety Note
+    /// This allows other mutable aliases to the scheduler, both in the current
+    /// execution context and other execution contexts.
+    unsafe fn borrow_local_io() -> &mut IoFactoryObject {
+        unsafe {
+            let io = local::borrow().event_loop.io().unwrap();
+            transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io)
         }
-        return tlsched.take_scheduler();
     }
 
-    fn local(f: &fn(&mut Scheduler)) {
-        let mut tlsched = ThreadLocalScheduler::new();
-        f(tlsched.get_scheduler());
+    /// Borrow the thread-local scheduler from thread-local storage.
+    /// While the scheduler is borrowed it is not available in TLS.
+    fn borrow_local(f: &fn(&mut Scheduler)) {
+        let mut sched = local::take();
+        f(sched);
+        local::put(sched);
+    }
+
+    /// Take ownership of the scheduler from thread local storage
+    fn take_local() -> ~Scheduler {
+        local::take()
     }
 
     // * Scheduler-context operations
 
-    fn resume_task_from_queue(&mut self) -> bool {
+    fn resume_task_from_queue(~self) -> bool {
         assert!(!self.in_task_context());
 
         let mut self = self;
@@ -139,24 +146,38 @@ pub impl Scheduler {
             }
             None => {
                 rtdebug!("no tasks in queue");
+                local::put(self);
                 return false;
             }
         }
     }
 
-    fn resume_task_immediately(&mut self, task: ~Task) {
+    fn resume_task_immediately(~self, task: ~Task) {
+        let mut self = self;
         assert!(!self.in_task_context());
 
         rtdebug!("scheduling a task");
 
         // Store the task in the scheduler so it can be grabbed later
         self.current_task = Some(task);
-        self.swap_in_task();
+        self.enqueue_cleanup_job(DoNothing);
+
+        local::put(self);
+
+        // Take pointers to both the task and scheduler's saved registers.
+        let sched = unsafe { local::borrow() };
+        let (sched_context, _, next_task_context) = sched.get_contexts();
+        let next_task_context = next_task_context.unwrap();
+        // Context switch to the task, restoring it's registers
+        // and saving the scheduler's
+        Context::swap(sched_context, next_task_context);
+
+        let sched = unsafe { local::borrow() };
         // The running task should have passed ownership elsewhere
-        assert!(self.current_task.is_none());
+        assert!(sched.current_task.is_none());
 
         // Running tasks may have asked us to do some cleanup
-        self.run_cleanup_jobs();
+        sched.run_cleanup_job();
     }
 
 
@@ -164,15 +185,23 @@ pub impl Scheduler {
 
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
-    fn terminate_current_task(&mut self) {
+    fn terminate_current_task(~self) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("ending running task");
 
         let dead_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RecycleTask(dead_task));
-        let dead_task = self.task_from_last_cleanup_job();
-        self.swap_out_task(dead_task);
+
+        local::put(self);
+
+        let sched = unsafe { local::borrow() };
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
+
+        // Control never reaches here
     }
 
     /// Block a running task, context switch to the scheduler, then pass the
@@ -183,127 +212,120 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
+    fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("blocking task");
 
         let blocked_task = self.current_task.swap_unwrap();
-        let f_fake_region = unsafe {
-            transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
-        };
-        let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
+        let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
+        let f_opaque = ClosureConverter::from_fn(f_fake_region);
         self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
-        let blocked_task = self.task_from_last_cleanup_job();
 
-        self.swap_out_task(blocked_task);
+        local::put(self);
+
+        let sched = unsafe { local::borrow() };
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
+
+        // We could be executing in a different thread now
+        let sched = unsafe { local::borrow() };
+        sched.run_cleanup_job();
     }
 
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
+    fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("switching tasks");
 
         let old_running_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RescheduleTask(old_running_task));
-        let old_running_task = self.task_from_last_cleanup_job();
-
         self.current_task = Some(next_task);
-        self.swap_in_task_from_running_task(old_running_task);
-    }
 
+        local::put(self);
 
-    // * Context switching
+        let sched = unsafe { local::borrow() };
+        let (_, last_task_context, next_task_context) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        let next_task_context = next_task_context.unwrap();
+        Context::swap(last_task_context, next_task_context);
 
-    // NB: When switching to a task callers are expected to first set
-    // self.running_task. When switching away from a task likewise move
-    // out of the self.running_task
-
-    priv fn swap_in_task(&mut self) {
-        // Take pointers to both the task and scheduler's saved registers.
-        let running_task: &~Task = self.current_task.get_ref();
-        let task_context = &running_task.saved_context;
-        let scheduler_context = &mut self.saved_context;
-
-        // Context switch to the task, restoring it's registers
-        // and saving the scheduler's
-        Context::swap(scheduler_context, task_context);
-    }
-
-    priv fn swap_out_task(&mut self, running_task: &mut Task) {
-        let task_context = &mut running_task.saved_context;
-        let scheduler_context = &self.saved_context;
-        Context::swap(task_context, scheduler_context);
+        // We could be executing in a different thread now
+        let sched = unsafe { local::borrow() };
+        sched.run_cleanup_job();
     }
 
-    priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) {
-        let running_task_context = &mut running_task.saved_context;
-        let next_context = &self.current_task.get_ref().saved_context;
-        Context::swap(running_task_context, next_context);
-    }
-
-
     // * Other stuff
 
     fn in_task_context(&self) -> bool { self.current_task.is_some() }
 
     fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
-        self.cleanup_jobs.unshift(job);
+        assert!(self.cleanup_job.is_none());
+        self.cleanup_job = Some(job);
     }
 
-    fn run_cleanup_jobs(&mut self) {
-        assert!(!self.in_task_context());
-        rtdebug!("running cleanup jobs");
+    fn run_cleanup_job(&mut self) {
+        rtdebug!("running cleanup job");
 
-        while !self.cleanup_jobs.is_empty() {
-            match self.cleanup_jobs.pop() {
-                RescheduleTask(task) => {
-                    // NB: Pushing to the *front* of the queue
-                    self.task_queue.push_front(task);
-                }
-                RecycleTask(task) => task.recycle(&mut self.stack_pool),
-                GiveTask(task, f) => (f.to_fn())(self, task)
+        assert!(self.cleanup_job.is_some());
+
+        let cleanup_job = self.cleanup_job.swap_unwrap();
+        match cleanup_job {
+            DoNothing => { }
+            RescheduleTask(task) => {
+                // NB: Pushing to the *front* of the queue
+                self.task_queue.push_front(task);
             }
+            RecycleTask(task) => task.recycle(&mut self.stack_pool),
+            GiveTask(task, f) => (f.to_fn())(task)
         }
     }
 
-    // XXX: Hack. This should return &'self mut but I don't know how to
-    // make the borrowcheck happy
-    #[cfg(stage0)]
-    fn task_from_last_cleanup_job(&mut self) -> &mut Task {
-        assert!(!self.cleanup_jobs.is_empty());
-        let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0];
-        let last_task: &'self Task = match last_job {
-            &RescheduleTask(~ref task) => task,
-            &RecycleTask(~ref task) => task,
-            &GiveTask(~ref task, _) => task,
-        };
-        // XXX: Pattern matching mutable pointers above doesn't work
-        // because borrowck thinks the three patterns are conflicting
-        // borrows
-        return unsafe { transmute::<&Task, &mut Task>(last_task) };
-    }
-
-    // XXX: Hack. This should return &'self mut but I don't know how to
-    // make the borrowcheck happy
-    #[cfg(stage1)]
-    #[cfg(stage2)]
-    #[cfg(stage3)]
-    fn task_from_last_cleanup_job<'a>(&'a mut self) -> &mut Task {
-        assert!(!self.cleanup_jobs.is_empty());
-        let last_job: &'a mut CleanupJob = &mut self.cleanup_jobs[0];
-        let last_task: &'a Task = match last_job {
-            &RescheduleTask(~ref task) => task,
-            &RecycleTask(~ref task) => task,
-            &GiveTask(~ref task, _) => task,
+    /// Get mutable references to all the contexts that may be involved in a
+    /// context switch.
+    ///
+    /// Returns (the scheduler context, the optional context of the
+    /// task in the cleanup list, the optional context of the task in
+    /// the current task slot).  When context switching to a task,
+    /// callers should first arrange for that task to be located in the
+    /// Scheduler's current_task slot and set up the
+    /// post-context-switch cleanup job.
+    fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
+                                          Option<&'a mut Context>,
+                                          Option<&'a mut Context>) {
+        let last_task = match self.cleanup_job {
+            Some(RescheduleTask(~ref task)) |
+            Some(RecycleTask(~ref task)) |
+            Some(GiveTask(~ref task, _)) => {
+                Some(task)
+            }
+            Some(DoNothing) => {
+                None
+            }
+            None => fail!(fmt!("all context switches should have a cleanup job"))
         };
         // XXX: Pattern matching mutable pointers above doesn't work
         // because borrowck thinks the three patterns are conflicting
         // borrows
-        return unsafe { transmute::<&Task, &mut Task>(last_task) };
+        unsafe {
+            let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
+            let last_task_context = match last_task {
+                Some(ref t) => Some(&mut t.saved_context), None => None
+            };
+            let next_task_context = match self.current_task {
+                Some(ref mut t) => Some(&mut t.saved_context), None => None
+            };
+            // XXX: These transmutes can be removed after snapshot
+            return (transmute(&mut self.saved_context),
+                    last_task_context,
+                    transmute(next_task_context));
+        }
     }
 }
 
@@ -333,10 +355,15 @@ pub impl Task {
     priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
         // XXX: The old code didn't have this extra allocation
         let wrapper: ~fn() = || {
+            // This is the first code to execute after the initial
+            // context switch to the task. The previous context may
+            // have asked us to do some cleanup.
+            let sched = unsafe { local::borrow() };
+            sched.run_cleanup_job();
+
             start();
 
-            let mut sched = ThreadLocalScheduler::new();
-            let sched = sched.get_scheduler();
+            let sched = Scheduler::take_local();
             sched.terminate_current_task();
         };
         return wrapper;
@@ -352,112 +379,6 @@ pub impl Task {
     }
 }
 
-// NB: This is a type so we can use make use of the &self region.
-struct ThreadLocalScheduler(tls::Key);
-
-impl ThreadLocalScheduler {
-    fn new() -> ThreadLocalScheduler {
-        unsafe {
-            // NB: This assumes that the TLS key has been created prior.
-            // Currently done in rust_start.
-            let key: *mut c_void = rust_get_sched_tls_key();
-            let key: &mut tls::Key = transmute(key);
-            ThreadLocalScheduler(*key)
-        }
-    }
-
-    fn put_scheduler(&mut self, scheduler: ~Scheduler) {
-        unsafe {
-            let key = match self { &ThreadLocalScheduler(key) => key };
-            let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler);
-            tls::set(key, value);
-        }
-    }
-
-    #[cfg(stage0)]
-    fn get_scheduler(&mut self) -> &'self mut Scheduler {
-        unsafe {
-            let key = match self { &ThreadLocalScheduler(key) => key };
-            let mut value: *mut c_void = tls::get(key);
-            assert!(value.is_not_null());
-            {
-                let value_ptr = &mut value;
-                let sched: &mut ~Scheduler = {
-                    transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr)
-                };
-                let sched: &mut Scheduler = &mut **sched;
-                return sched;
-            }
-        }
-    }
-
-    #[cfg(stage1)]
-    #[cfg(stage2)]
-    #[cfg(stage3)]
-    fn get_scheduler<'a>(&'a mut self) -> &'a mut Scheduler {
-        unsafe {
-            let key = match self { &ThreadLocalScheduler(key) => key };
-            let mut value: *mut c_void = tls::get(key);
-            assert!(value.is_not_null());
-            {
-                let value_ptr = &mut value;
-                let sched: &mut ~Scheduler = {
-                    transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr)
-                };
-                let sched: &mut Scheduler = &mut **sched;
-                return sched;
-            }
-        }
-    }
-
-    fn take_scheduler(&mut self) -> ~Scheduler {
-        unsafe {
-            let key = match self { &ThreadLocalScheduler(key) => key };
-            let value: *mut c_void = tls::get(key);
-            assert!(value.is_not_null());
-            let sched = transmute(value);
-            tls::set(key, mut_null());
-            return sched;
-        }
-    }
-}
-
-extern {
-    fn rust_get_sched_tls_key() -> *mut c_void;
-}
-
-#[test]
-fn thread_local_scheduler_smoke_test() {
-    let scheduler = ~UvEventLoop::new_scheduler();
-    let mut tls_scheduler = ThreadLocalScheduler::new();
-    tls_scheduler.put_scheduler(scheduler);
-    {
-        let _scheduler = tls_scheduler.get_scheduler();
-    }
-    let _scheduler = tls_scheduler.take_scheduler();
-}
-
-#[test]
-fn thread_local_scheduler_two_instances() {
-    let scheduler = ~UvEventLoop::new_scheduler();
-    let mut tls_scheduler = ThreadLocalScheduler::new();
-    tls_scheduler.put_scheduler(scheduler);
-    {
-
-        let _scheduler = tls_scheduler.get_scheduler();
-    }
-    {
-        let scheduler = tls_scheduler.take_scheduler();
-        tls_scheduler.put_scheduler(scheduler);
-    }
-
-    let mut tls_scheduler = ThreadLocalScheduler::new();
-    {
-        let _scheduler = tls_scheduler.get_scheduler();
-    }
-    let _scheduler = tls_scheduler.take_scheduler();
-}
-
 #[test]
 fn test_simple_scheduling() {
     do run_in_bare_thread {
@@ -502,13 +423,12 @@ fn test_swap_tasks() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task1 = ~do Task::new(&mut sched.stack_pool) {
             unsafe { *count_ptr = *count_ptr + 1; }
-            do Scheduler::local |sched| {
-                let task2 = ~do Task::new(&mut sched.stack_pool) {
-                    unsafe { *count_ptr = *count_ptr + 1; }
-                };
-                // Context switch directly to the new task
-                sched.resume_task_from_running_task_direct(task2);
-            }
+            let mut sched = Scheduler::take_local();
+            let task2 = ~do Task::new(&mut sched.stack_pool) {
+                unsafe { *count_ptr = *count_ptr + 1; }
+            };
+            // Context switch directly to the new task
+            sched.resume_task_from_running_task_direct(task2);
             unsafe { *count_ptr = *count_ptr + 1; }
         };
         sched.task_queue.push_back(task1);
@@ -535,7 +455,7 @@ fn test_run_a_lot_of_tasks_queued() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            do Scheduler::local |sched| {
+            do Scheduler::borrow_local |sched| {
                 let task = ~do Task::new(&mut sched.stack_pool) {
                     unsafe {
                         *count_ptr = *count_ptr + 1;
@@ -568,18 +488,17 @@ fn test_run_a_lot_of_tasks_direct() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            do Scheduler::local |sched| {
-                let task = ~do Task::new(&mut sched.stack_pool) {
-                    unsafe {
-                        *count_ptr = *count_ptr + 1;
-                        if *count_ptr != MAX {
-                            run_task(count_ptr);
-                        }
+            let mut sched = Scheduler::take_local();
+            let task = ~do Task::new(&mut sched.stack_pool) {
+                unsafe {
+                    *count_ptr = *count_ptr + 1;
+                    if *count_ptr != MAX {
+                        run_task(count_ptr);
                     }
-                };
-                // Context switch directly to the new task
-                sched.resume_task_from_running_task_direct(task);
-            }
+                }
+            };
+            // Context switch directly to the new task
+            sched.resume_task_from_running_task_direct(task);
         };
     }
 }
@@ -589,11 +508,13 @@ fn test_block_task() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                assert!(sched.in_task_context());
-                do sched.block_running_task_and_then() |sched, task| {
+            let sched = Scheduler::take_local();
+            assert!(sched.in_task_context());
+            do sched.deschedule_running_task_and_then() |task| {
+                let task = Cell(task);
+                do Scheduler::borrow_local |sched| {
                     assert!(!sched.in_task_context());
-                    sched.task_queue.push_back(task);
+                    sched.task_queue.push_back(task.take());
                 }
             }
         };
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index 7162ed27a9d..fe7b0a71dbf 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -120,37 +120,37 @@ impl IoFactory for UvIoFactory {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        do Scheduler::local |scheduler| {
-            assert!(scheduler.in_task_context());
+        let scheduler = Scheduler::take_local();
+        assert!(scheduler.in_task_context());
 
-            // Block this task and take ownership, switch to scheduler context
-            do scheduler.block_running_task_and_then |scheduler, task| {
+        // Block this task and take ownership, switch to scheduler context
+        do scheduler.deschedule_running_task_and_then |task| {
 
-                rtdebug!("connect: entered scheduler context");
+            rtdebug!("connect: entered scheduler context");
+            do Scheduler::borrow_local |scheduler| {
                 assert!(!scheduler.in_task_context());
-                let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
-                let task_cell = Cell(task);
-
-                // Wait for a connection
-                do tcp_watcher.connect(addr) |stream_watcher, status| {
-                    rtdebug!("connect: in connect callback");
-                    let maybe_stream = if status.is_none() {
-                        rtdebug!("status is none");
-                        Some(~UvStream(stream_watcher))
-                    } else {
-                        rtdebug!("status is some");
-                        stream_watcher.close(||());
-                        None
-                    };
-
-                    // Store the stream in the task's stack
-                    unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                    // Context switch
-                    do Scheduler::local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+            }
+            let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+            let task_cell = Cell(task);
+
+            // Wait for a connection
+            do tcp_watcher.connect(addr) |stream_watcher, status| {
+                rtdebug!("connect: in connect callback");
+                let maybe_stream = if status.is_none() {
+                    rtdebug!("status is none");
+                    Some(~UvStream(stream_watcher))
+                } else {
+                    rtdebug!("status is some");
+                    stream_watcher.close(||());
+                    None
+                };
+
+                // Store the stream in the task's stack
+                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+
+                // Context switch
+                let scheduler = Scheduler::take_local();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -194,33 +194,31 @@ impl TcpListener for UvTcpListener {
 
         let server_tcp_watcher = self.watcher();
 
-        do Scheduler::local |scheduler| {
-            assert!(scheduler.in_task_context());
-
-            do scheduler.block_running_task_and_then |_, task| {
-                let task_cell = Cell(task);
-                let mut server_tcp_watcher = server_tcp_watcher;
-                do server_tcp_watcher.listen |server_stream_watcher, status| {
-                    let maybe_stream = if status.is_none() {
-                        let mut server_stream_watcher = server_stream_watcher;
-                        let mut loop_ = loop_from_watcher(&server_stream_watcher);
-                        let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                        let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-                        // XXX: Need's to be surfaced in interface
-                        server_stream_watcher.accept(client_tcp_watcher);
-                        Some(~UvStream::new(client_tcp_watcher))
-                    } else {
-                        None
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                    rtdebug!("resuming task from listen");
-                    // Context switch
-                    do Scheduler::local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+        let scheduler = Scheduler::take_local();
+        assert!(scheduler.in_task_context());
+
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            let mut server_tcp_watcher = server_tcp_watcher;
+            do server_tcp_watcher.listen |server_stream_watcher, status| {
+                let maybe_stream = if status.is_none() {
+                    let mut server_stream_watcher = server_stream_watcher;
+                    let mut loop_ = loop_from_watcher(&server_stream_watcher);
+                    let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                    let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+                    // XXX: Need's to be surfaced in interface
+                    server_stream_watcher.accept(client_tcp_watcher);
+                    Some(~UvStream::new(client_tcp_watcher))
+                } else {
+                    None
+                };
+
+                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+
+                rtdebug!("resuming task from listen");
+                // Context switch
+                let scheduler = Scheduler::take_local();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -259,42 +257,42 @@ impl Stream for UvStream {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
 
-        do Scheduler::local |scheduler| {
-            assert!(scheduler.in_task_context());
-            let watcher = self.watcher();
-            let buf_ptr: *&mut [u8] = &buf;
-            do scheduler.block_running_task_and_then |scheduler, task| {
-                rtdebug!("read: entered scheduler context");
+        let scheduler = Scheduler::take_local();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&mut [u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |task| {
+            rtdebug!("read: entered scheduler context");
+            do Scheduler::borrow_local |scheduler| {
                 assert!(!scheduler.in_task_context());
+            }
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            // XXX: We shouldn't reallocate these callbacks every
+            // call to read
+            let alloc: AllocCallback = |_| unsafe {
+                slice_to_uv_buf(*buf_ptr)
+            };
+            do watcher.read_start(alloc) |watcher, nread, _buf, status| {
+
+                // Stop reading so that no read callbacks are
+                // triggered before the user calls `read` again.
+                // XXX: Is there a performance impact to calling
+                // stop here?
                 let mut watcher = watcher;
-                let task_cell = Cell(task);
-                // XXX: We shouldn't reallocate these callbacks every
-                // call to read
-                let alloc: AllocCallback = |_| unsafe {
-                    slice_to_uv_buf(*buf_ptr)
+                watcher.read_stop();
+
+                let result = if status.is_none() {
+                    assert!(nread >= 0);
+                    Ok(nread as uint)
+                } else {
+                    Err(())
                 };
-                do watcher.read_start(alloc) |watcher, nread, _buf, status| {
-
-                    // Stop reading so that no read callbacks are
-                    // triggered before the user calls `read` again.
-                    // XXX: Is there a performance impact to calling
-                    // stop here?
-                    let mut watcher = watcher;
-                    watcher.read_stop();
-
-                    let result = if status.is_none() {
-                        assert!(nread >= 0);
-                        Ok(nread as uint)
-                    } else {
-                        Err(())
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(result); }
-
-                    do Scheduler::local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Scheduler::take_local();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -305,29 +303,27 @@ impl Stream for UvStream {
     fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
-        do Scheduler::local |scheduler| {
-            assert!(scheduler.in_task_context());
-            let watcher = self.watcher();
-            let buf_ptr: *&[u8] = &buf;
-            do scheduler.block_running_task_and_then |_, task| {
-                let mut watcher = watcher;
-                let task_cell = Cell(task);
-                let buf = unsafe { &*buf_ptr };
-                // XXX: OMGCOPIES
-                let buf = buf.to_vec();
-                do watcher.write(buf) |_watcher, status| {
-                    let result = if status.is_none() {
-                        Ok(())
-                    } else {
-                        Err(())
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(result); }
-
-                    do Scheduler::local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+        let scheduler = Scheduler::take_local();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&[u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |task| {
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            let buf = unsafe { &*buf_ptr };
+            // XXX: OMGCOPIES
+            let buf = buf.to_vec();
+            do watcher.write(buf) |_watcher, status| {
+                let result = if status.is_none() {
+                    Ok(())
+                } else {
+                    Err(())
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Scheduler::take_local();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -342,12 +338,10 @@ fn test_simple_io_no_connect() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let addr = Ipv4(127, 0, 0, 1, 2926);
-                let maybe_chan = io.connect(addr);
-                assert!(maybe_chan.is_none());
-            }
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let addr = Ipv4(127, 0, 0, 1, 2926);
+            let maybe_chan = io.connect(addr);
+            assert!(maybe_chan.is_none());
         };
         sched.task_queue.push_back(task);
         sched.run();
@@ -362,29 +356,25 @@ fn test_simple_tcp_server_and_client() {
         let addr = Ipv4(127, 0, 0, 1, 2929);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
-            }
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let mut stream = io.connect(addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.close();
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
-                let mut buf = [0, .. 2048];
-                let nread = stream.read(buf).unwrap();
-                assert!(nread == 8);
-                for uint::range(0, nread) |i| {
-                    rtdebug!("%u", buf[i] as uint);
-                    assert!(buf[i] == i as u8);
-                }
-                stream.close();
-                listener.close();
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let mut listener = io.bind(addr).unwrap();
+            let mut stream = listener.listen().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert!(nread == 8);
+            for uint::range(0, nread) |i| {
+                rtdebug!("%u", buf[i] as uint);
+                assert!(buf[i] == i as u8);
             }
+            stream.close();
+            listener.close();
         };
 
         // Start the server first so it listens before the client connects
@@ -401,53 +391,51 @@ fn test_read_and_block() {
         let addr = Ipv4(127, 0, 0, 1, 2930);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
-            }
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let mut stream = io.connect(addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.close();
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
-                let mut buf = [0, .. 2048];
-
-                let expected = 32;
-                let mut current = 0;
-                let mut reads = 0;
-
-                while current < expected {
-                    let nread = stream.read(buf).unwrap();
-                    for uint::range(0, nread) |i| {
-                        let val = buf[i] as uint;
-                        assert!(val == current % 8);
-                        current += 1;
-                    }
-                    reads += 1;
-
-                    do Scheduler::local |scheduler| {
-                        // Yield to the other task in hopes that it
-                        // will trigger a read callback while we are
-                        // not ready for it
-                        do scheduler.block_running_task_and_then |scheduler, task| {
-                            scheduler.task_queue.push_back(task);
-                        }
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let mut listener = io.bind(addr).unwrap();
+            let mut stream = listener.listen().unwrap();
+            let mut buf = [0, .. 2048];
+
+            let expected = 32;
+            let mut current = 0;
+            let mut reads = 0;
+
+            while current < expected {
+                let nread = stream.read(buf).unwrap();
+                for uint::range(0, nread) |i| {
+                    let val = buf[i] as uint;
+                    assert!(val == current % 8);
+                    current += 1;
+                }
+                reads += 1;
+
+                let scheduler = Scheduler::take_local();
+                // Yield to the other task in hopes that it
+                // will trigger a read callback while we are
+                // not ready for it
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task = Cell(task);
+                    do Scheduler::borrow_local |scheduler| {
+                        scheduler.task_queue.push_back(task.take());
                     }
                 }
+            }
 
-                // Make sure we had multiple reads
-                assert!(reads > 1);
+            // Make sure we had multiple reads
+            assert!(reads > 1);
 
-                stream.close();
-                listener.close();
-            }
+            stream.close();
+            listener.close();
         };
 
         // Start the server first so it listens before the client connects
@@ -464,19 +452,17 @@ fn test_read_read_read() {
         let addr = Ipv4(127, 0, 0, 1, 2931);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                let mut buf = [0, .. 2048];
-                let mut total_bytes_read = 0;
-                while total_bytes_read < 500000000 {
-                    let nread = stream.read(buf).unwrap();
-                    rtdebug!("read %u bytes", nread as uint);
-                    total_bytes_read += nread;
-                }
-                rtdebug_!("read %u bytes total", total_bytes_read as uint);
-                stream.close();
+            let io = unsafe { Scheduler::borrow_local_io() };
+            let mut stream = io.connect(addr).unwrap();
+            let mut buf = [0, .. 2048];
+            let mut total_bytes_read = 0;
+            while total_bytes_read < 500000000 {
+                let nread = stream.read(buf).unwrap();
+                rtdebug!("read %u bytes", nread as uint);
+                total_bytes_read += nread;
             }
+            rtdebug_!("read %u bytes total", total_bytes_read as uint);
+            stream.close();
         };
 
         sched.task_queue.push_back(client_task);