about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-04-15 16:19:01 -0700
committerBrian Anderson <banderson@mozilla.com>2013-04-15 17:45:34 -0700
commit68583a25a0b31bc113cf1f4ec479339cbf876e4d (patch)
treea0d30ca211a59ac96ddec1c01a2596c563752c04
parentebefe07792caf17c03c6f90fb1979d4e6c935001 (diff)
downloadrust-68583a25a0b31bc113cf1f4ec479339cbf876e4d.tar.gz
rust-68583a25a0b31bc113cf1f4ec479339cbf876e4d.zip
core::rt: Restructure context switches to take ownership of the Scheduler
In order to do a context switch you have to give up ownership of the scheduler,
effectively passing it to the next execution context. This could help avoid
some situations here tasks retain unsafe pointers to schedulers between context
switches, across which they may have changed threads.

There are still a number of uses of unsafe scheduler pointers.
-rw-r--r--src/libcore/rt/sched/mod.rs115
-rw-r--r--src/libcore/rt/uvio.rs41
2 files changed, 94 insertions, 62 deletions
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs
index a2a440ba76e..2aaf0a44455 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched/mod.rs
@@ -20,6 +20,7 @@ use super::context::Context;
 #[cfg(test)] use super::uvio::UvEventLoop;
 #[cfg(test)] use unstable::run_in_bare_thread;
 #[cfg(test)] use int;
+#[cfg(test)] use cell::Cell;
 
 mod local;
 
@@ -46,14 +47,14 @@ pub struct Scheduler {
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
 trait HackAroundBorrowCk {
-    fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
+    fn from_fn(&fn(~Task)) -> Self;
+    fn to_fn(self) -> &fn(~Task);
 }
 impl HackAroundBorrowCk for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
+    fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver {
         unsafe { transmute(f) }
     }
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
+    fn to_fn(self) -> &fn(~Task) {
         unsafe { transmute(self) }
     }
 }
@@ -97,10 +98,12 @@ pub impl Scheduler {
 
         let scheduler = Scheduler::unsafe_local_borrow();
         fn run_scheduler_once() {
-            let scheduler = Scheduler::unsafe_local_borrow();
+            let scheduler = Scheduler::local_take();
             if scheduler.resume_task_from_queue() {
                 // Ok, a task ran. Nice! We'll do it again later
-                scheduler.event_loop.callback(run_scheduler_once);
+                do Scheduler::local_borrow |scheduler| {
+                    scheduler.event_loop.callback(run_scheduler_once);
+                }
             }
         }
 
@@ -124,9 +127,13 @@ pub impl Scheduler {
         local::put(sched);
     }
 
+    fn local_take() -> ~Scheduler {
+        local::take()
+    }
+
     // * Scheduler-context operations
 
-    fn resume_task_from_queue(&mut self) -> bool {
+    fn resume_task_from_queue(~self) -> bool {
         assert!(!self.in_task_context());
 
         let mut self = self;
@@ -137,12 +144,14 @@ pub impl Scheduler {
             }
             None => {
                 rtdebug!("no tasks in queue");
+                local::put(self);
                 return false;
             }
         }
     }
 
-    fn resume_task_immediately(&mut self, task: ~Task) {
+    fn resume_task_immediately(~self, task: ~Task) {
+        let mut self = self;
         assert!(!self.in_task_context());
 
         rtdebug!("scheduling a task");
@@ -151,20 +160,22 @@ pub impl Scheduler {
         self.current_task = Some(task);
         self.enqueue_cleanup_job(DoNothing);
 
+        local::put(self);
+
         // Take pointers to both the task and scheduler's saved registers.
-        {
-            let (sched_context, _, next_task_context) = self.get_contexts();
-            let next_task_context = next_task_context.unwrap();
-            // Context switch to the task, restoring it's registers
-            // and saving the scheduler's
-            Context::swap(sched_context, next_task_context);
-        }
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, _, next_task_context) = sched.get_contexts();
+        let next_task_context = next_task_context.unwrap();
+        // Context switch to the task, restoring it's registers
+        // and saving the scheduler's
+        Context::swap(sched_context, next_task_context);
 
+        let sched = Scheduler::unsafe_local_borrow();
         // The running task should have passed ownership elsewhere
-        assert!(self.current_task.is_none());
+        assert!(sched.current_task.is_none());
 
         // Running tasks may have asked us to do some cleanup
-        self.run_cleanup_job();
+        sched.run_cleanup_job();
     }
 
 
@@ -172,18 +183,23 @@ pub impl Scheduler {
 
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
-    fn terminate_current_task(&mut self) {
+    fn terminate_current_task(~self) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("ending running task");
 
         let dead_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RecycleTask(dead_task));
-        {
-            let (sched_context, last_task_context, _) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            Context::swap(last_task_context, sched_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
+
+        // Control never reaches here
     }
 
     /// Block a running task, context switch to the scheduler, then pass the
@@ -194,22 +210,25 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
+    fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("blocking task");
 
         let blocked_task = self.current_task.swap_unwrap();
         let f_fake_region = unsafe {
-            transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f)
+            transmute::<&fn(~Task), &fn(~Task)>(f)
         };
         let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
         self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
-        {
-            let (sched_context, last_task_context, _) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            Context::swap(last_task_context, sched_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (sched_context, last_task_context, _) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        Context::swap(last_task_context, sched_context);
 
         // We could be executing in a different thread now
         let sched = Scheduler::unsafe_local_borrow();
@@ -219,7 +238,8 @@ pub impl Scheduler {
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
+    fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
+        let mut self = self;
         assert!(self.in_task_context());
 
         rtdebug!("switching tasks");
@@ -227,12 +247,14 @@ pub impl Scheduler {
         let old_running_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RescheduleTask(old_running_task));
         self.current_task = Some(next_task);
-        {
-            let (_, last_task_context, next_task_context) = self.get_contexts();
-            let last_task_context = last_task_context.unwrap();
-            let next_task_context = next_task_context.unwrap();
-            Context::swap(last_task_context, next_task_context);
-        }
+
+        local::put(self);
+
+        let sched = Scheduler::unsafe_local_borrow();
+        let (_, last_task_context, next_task_context) = sched.get_contexts();
+        let last_task_context = last_task_context.unwrap();
+        let next_task_context = next_task_context.unwrap();
+        Context::swap(last_task_context, next_task_context);
 
         // We could be executing in a different thread now
         let sched = Scheduler::unsafe_local_borrow();
@@ -261,7 +283,7 @@ pub impl Scheduler {
                 self.task_queue.push_front(task);
             }
             RecycleTask(task) => task.recycle(&mut self.stack_pool),
-            GiveTask(task, f) => (f.to_fn())(self, task)
+            GiveTask(task, f) => (f.to_fn())(task)
         }
     }
 
@@ -338,7 +360,7 @@ pub impl Task {
 
             start();
 
-            let sched = Scheduler::unsafe_local_borrow();
+            let sched = Scheduler::local_take();
             sched.terminate_current_task();
         };
         return wrapper;
@@ -398,7 +420,7 @@ fn test_swap_tasks() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task1 = ~do Task::new(&mut sched.stack_pool) {
             unsafe { *count_ptr = *count_ptr + 1; }
-            let sched = Scheduler::unsafe_local_borrow();
+            let mut sched = Scheduler::local_take();
             let task2 = ~do Task::new(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -463,7 +485,7 @@ fn test_run_a_lot_of_tasks_direct() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            let sched = Scheduler::unsafe_local_borrow();
+            let mut sched = Scheduler::local_take();
             let task = ~do Task::new(&mut sched.stack_pool) {
                 unsafe {
                     *count_ptr = *count_ptr + 1;
@@ -483,11 +505,14 @@ fn test_block_task() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            let sched = Scheduler::unsafe_local_borrow();
+            let sched = Scheduler::local_take();
             assert!(sched.in_task_context());
-            do sched.deschedule_running_task_and_then() |sched, task| {
-                assert!(!sched.in_task_context());
-                sched.task_queue.push_back(task);
+            do sched.deschedule_running_task_and_then() |task| {
+                let task = Cell(task);
+                do Scheduler::local_borrow |sched| {
+                    assert!(!sched.in_task_context());
+                    sched.task_queue.push_back(task.take());
+                }
             }
         };
         sched.task_queue.push_back(task);
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index a43ec07c2de..e3fed29ddd2 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -104,14 +104,16 @@ impl IoFactory for UvIoFactory {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
 
             rtdebug!("connect: entered scheduler context");
-            assert!(!scheduler.in_task_context());
+            do Scheduler::local_borrow |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell(task);
 
@@ -131,7 +133,7 @@ impl IoFactory for UvIoFactory {
                 unsafe { (*result_cell_ptr).put_back(maybe_stream); }
 
                 // Context switch
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -176,10 +178,10 @@ impl TcpListener for UvTcpListener {
 
         let server_tcp_watcher = self.watcher();
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
 
-        do scheduler.deschedule_running_task_and_then |_, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             let task_cell = Cell(task);
             let mut server_tcp_watcher = server_tcp_watcher;
             do server_tcp_watcher.listen |server_stream_watcher, status| {
@@ -199,7 +201,7 @@ impl TcpListener for UvTcpListener {
 
                 rtdebug!("resuming task from listen");
                 // Context switch
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -239,13 +241,15 @@ impl Stream for UvStream {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
 
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             rtdebug!("read: entered scheduler context");
-            assert!(!scheduler.in_task_context());
+            do Scheduler::local_borrow |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
             let mut watcher = watcher;
             let task_cell = Cell(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -271,7 +275,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -283,11 +287,11 @@ impl Stream for UvStream {
     fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
-        let scheduler = Scheduler::unsafe_local_borrow();
+        let scheduler = Scheduler::local_take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
+        do scheduler.deschedule_running_task_and_then |task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
             let buf = unsafe { &*buf_ptr };
@@ -302,7 +306,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -404,12 +408,15 @@ fn test_read_and_block() {
                 }
                 reads += 1;
 
-                let scheduler = Scheduler::unsafe_local_borrow();
+                let scheduler = Scheduler::local_take();
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |scheduler, task| {
-                    scheduler.task_queue.push_back(task);
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task = Cell(task);
+                    do Scheduler::local_borrow |scheduler| {
+                        scheduler.task_queue.push_back(task.take());
+                    }
                 }
             }