about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libcore/rt/sched/mod.rs115
-rw-r--r--src/libcore/rt/uvio.rs41
2 files changed, 94 insertions, 62 deletions
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs
index a2a440ba76e..2aaf0a44455 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched/mod.rs
@@ -20,6 +20,7 @@ use super::context::Context;
 #[cfg(test)] use super::uvio::UvEventLoop;
 #[cfg(test)] use unstable::run_in_bare_thread;
 #[cfg(test)] use int;
+#[cfg(test)] use cell::Cell;
 
 mod local;
 
@@ -46,14 +47,14 @@ pub struct Scheduler {
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
 trait HackAroundBorrowCk {
-    fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
+    fn from_fn(&fn(~Task)) -> Self;
+    fn to_fn(self) -> &fn(~Task);
 }
 impl HackAroundBorrowCk for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
+    fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver {
         unsafe { transmute(f) }
     }
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
+    fn to_fn(self) -> &fn(~Task) {
         unsafe { transmute(self) }
     }
 }
@@ -97,10 +98,12 @@ pub impl Scheduler {
 
         let scheduler = Scheduler::unsafe_local_borrow();
         fn run_scheduler_once() {
-            let scheduler = Scheduler::unsafe_local_borrow();
+            let scheduler = Scheduler::local_take();
             if scheduler.resume_task_from_queue() {
                 // Ok, a task ran. Nice! We'll do it again later
-                scheduler.event_loop.callback(run_scheduler_once);
+                do Scheduler::local_borrow |scheduler| {
+                    scheduler.event_loop.callback(run_scheduler_once);
+                }
             }
         }
 
@@ -124,9 +127,13 @@ pub impl Scheduler {
         local::put(sched);
     }
 
+    fn local_take() -> ~Scheduler {
+        local::take()
+    }
+
     // * Scheduler-context operations
 
-    fn resume_task_from_queue(&mut self) -> bool {
+    fn resume_task_from_queue(~self) -> bool {
         assert!(!self.in_task_context());
 
         let mut self = self;
@@ -137,12 +144,14 @@ pub impl Scheduler {
             }
             None => {
                 rtdebug!("no tasks in queue");
+                local::put(self);
                 return false;
             }
         }
     }
 
-    fn resume_task_immediately(&mut self, task: ~Task) {
+    fn resume_task_immediately(~self, task: ~Task) {
+        let mut self = self;
         assert!(!self.in_task_context());
 
         rtdebug!("scheduling a task");
@@ -151,20 +160,22 @@ pub impl Scheduler {
         self.current_task = Some(task);
         self.enqueue_cleanup_job(DoNothing);
 
+        local::put(self);
+
         // Take pointers to both the task and scheduler's saved registers.
-        {
-            let (sched_context, _, next_task_context) = self.get_contexts();
-            let next_task_context = next_task_context.unwrap();
-            // Context switch to the task, restoring it's registers
-            // and saving the scheduler's
-            Context::swap(sched_context, next_task_context);
-        }
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, _, next_task_context) = sched.get_contexts();
+        let next_task_context = next_task_context.unwrap();
+        // Context switch to the task, restoring it's registers
+        // and saving the scheduler's
+        Context::swap(sched_context, next_task_context);
 
+        let sched = Scheduler::unsafe_local_borrow();
         // The running task should have passed ownership elsewhere
-        assert!(self.current_task.is_none());
+        assert!(sched.current_task.is_none());
 
         // Running tasks may have asked us to do some cleanup
-        self.run_cleanup_job();
+        sched.run_cleanup_job();
     }
 
 
@@ -172,18 +183,23 @@ pub impl Scheduler {
 
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
-    fn terminate_current_task(&mut self) {
+    fn terminate_current_task(~self) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("ending running task");
 
         let dead_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RecycleTask(dead_task));
-        {
-            let (sched_context, last_task_context, _) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            Context::swap(last_task_context, sched_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
+
+        // Control never reaches here
     }
 
     /// Block a running task, context switch to the scheduler, then pass the
@@ -194,22 +210,25 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
+    fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("blocking task");
 
         let blocked_task = self.current_task.swap_unwrap();
         let f_fake_region = unsafe {
-            transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
+            transmute::<&fn(~Task), &fn(~Task)>(f)
         };
         let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
         self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
-        {
-            let (sched_context, last_task_context, _) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            Context::swap(last_task_context, sched_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
 
         // We could be executing in a different thread now
         let sched = Scheduler::unsafe_local_borrow();
@@ -219,7 +238,8 @@ pub impl Scheduler {
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
+    fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("switching tasks");
@@ -227,12 +247,14 @@ pub impl Scheduler {
         let old_running_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RescheduleTask(old_running_task));
         self.current_task = Some(next_task);
-        {
-            let (_, last_task_context, next_task_context) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            let next_task_context = next_task_context.unwrap();
-            Context::swap(last_task_context, next_task_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (_, last_task_context, next_task_context) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        let next_task_context = next_task_context.unwrap();
+        Context::swap(last_task_context, next_task_context);
 
         // We could be executing in a different thread now
         let sched = Scheduler::unsafe_local_borrow();
@@ -261,7 +283,7 @@ pub impl Scheduler {
                 self.task_queue.push_front(task);
             }
             RecycleTask(task) => task.recycle(&mut self.stack_pool),
-            GiveTask(task, f) => (f.to_fn())(self, task)
+            GiveTask(task, f) => (f.to_fn())(task)
         }
     }
 
@@ -338,7 +360,7 @@ pub impl Task {
 
             start();
 
-            let sched = Scheduler::unsafe_local_borrow();
+            let sched = Scheduler::local_take();
             sched.terminate_current_task();
         };
         return wrapper;
@@ -398,7 +420,7 @@ fn test_swap_tasks() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task1 = ~do Task::new(&mut sched.stack_pool) {
             unsafe { *count_ptr = *count_ptr + 1; }
-            let sched = Scheduler::unsafe_local_borrow();
+            let mut sched = Scheduler::local_take();
             let task2 = ~do Task::new(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -463,7 +485,7 @@ fn test_run_a_lot_of_tasks_direct() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            let sched = Scheduler::unsafe_local_borrow();
+            let mut sched = Scheduler::local_take();
             let task = ~do Task::new(&mut sched.stack_pool) {
                 unsafe {
                     *count_ptr = *count_ptr + 1;
@@ -483,11 +505,14 @@ fn test_block_task() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            let sched = Scheduler::unsafe_local_borrow();
+            let sched = Scheduler::local_take();
             assert!(sched.in_task_context());
-            do sched.deschedule_running_task_and_then() |sched, task| {
-                assert!(!sched.in_task_context());
-                sched.task_queue.push_back(task);
+            do sched.deschedule_running_task_and_then() |task| {
+                let task = Cell(task);
+                do Scheduler::local_borrow |sched| {
+                    assert!(!sched.in_task_context());
+                    sched.task_queue.push_back(task.take());
+                }
             }
         };
         sched.task_queue.push_back(task);
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index a43ec07c2de..e3fed29ddd2 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -104,14 +104,16 @@ impl IoFactory for UvIoFactory {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
 
             rtdebug!("connect: entered scheduler context");
-            assert!(!scheduler.in_task_context());
+            do Scheduler::local_borrow |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell(task);
 
@@ -131,7 +133,7 @@ impl IoFactory for UvIoFactory {
                 unsafe { (*result_cell_ptr).put_back(maybe_stream); }
 
                 // Context switch
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -176,10 +178,10 @@ impl TcpListener for UvTcpListener {
 
         let server_tcp_watcher = self.watcher();
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
 
-        do scheduler.deschedule_running_task_and_then |_, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             let task_cell = Cell(task);
             let mut server_tcp_watcher = server_tcp_watcher;
             do server_tcp_watcher.listen |server_stream_watcher, status| {
@@ -199,7 +201,7 @@ impl TcpListener for UvTcpListener {
 
                 rtdebug!("resuming task from listen");
                 // Context switch
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -239,13 +241,15 @@ impl Stream for UvStream {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             rtdebug!("read: entered scheduler context");
-            assert!(!scheduler.in_task_context());
+            do Scheduler::local_borrow |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
             let mut watcher = watcher;
             let task_cell = Cell(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -271,7 +275,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -283,11 +287,11 @@ impl Stream for UvStream {
     fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
             let buf = unsafe { &*buf_ptr };
@@ -302,7 +306,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -404,12 +408,15 @@ fn test_read_and_block() {
                 }
                 reads += 1;
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |scheduler, task| {
-                    scheduler.task_queue.push_back(task);
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task = Cell(task);
+                    do Scheduler::local_borrow |scheduler| {
+                        scheduler.task_queue.push_back(task.take());
+                    }
                 }
             }