about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-29 17:52:00 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-29 17:52:00 -0700
commit134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 (patch)
tree74f7efa3539745c6144b6d9775a3726677198220 /src
parentf343e6172b7132545c72e3e09e6afccc06fdcee7 (diff)
downloadrust-134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9.tar.gz
rust-134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9.zip
core::rt: Change the signature of context switching methods to avoid infinite recursion
Diffstat (limited to 'src')
-rw-r--r--src/libcore/rt/comm.rs4
-rw-r--r--src/libcore/rt/mod.rs7
-rw-r--r--src/libcore/rt/sched.rs63
-rw-r--r--src/libcore/rt/test.rs32
-rw-r--r--src/libcore/rt/tube.rs34
-rw-r--r--src/libcore/rt/uv/uvio.rs26
6 files changed, 66 insertions, 100 deletions
diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs
index d108e20347a..8ff3887f779 100644
--- a/src/libcore/rt/comm.rs
+++ b/src/libcore/rt/comm.rs
@@ -159,7 +159,7 @@ impl<T> PortOne<T> {
 
         // Switch to the scheduler to put the ~Task into the Packet state.
         let sched = Local::take::<Scheduler>();
-        do sched.deschedule_running_task_and_then |task| {
+        do sched.deschedule_running_task_and_then |sched, task| {
             unsafe {
                 // Atomically swap the task pointer into the Packet state, issuing
                 // an acquire barrier to prevent reordering of the subsequent read
@@ -178,12 +178,10 @@ impl<T> PortOne<T> {
                         // triggering infinite recursion on the scheduler's stack.
                         let task: ~Coroutine = cast::transmute(task_as_state);
                         let task = Cell(task);
-                        let mut sched = Local::take::<Scheduler>();
                         do sched.event_loop.callback {
                             let sched = Local::take::<Scheduler>();
                             sched.resume_task_immediately(task.take());
                         }
-                        Local::put(sched);
                     }
                     _ => util::unreachable()
                 }
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index e23ad76a8c6..1113d7abe7d 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -238,12 +238,9 @@ fn test_context() {
         let task = ~do Coroutine::new(&mut sched.stack_pool) {
             assert_eq!(context(), TaskContext);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then() |task| {
+            do sched.deschedule_running_task_and_then() |sched, task| {
                 assert_eq!(context(), SchedulerContext);
-                let task = Cell(task);
-                do Local::borrow::<Scheduler> |sched| {
-                    sched.enqueue_task(task.take());
-                }
+                sched.enqueue_task(task);
             }
         };
         sched.enqueue_task(task);
diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs
index c6d6bb9f39e..089c95cd7cd 100644
--- a/src/libcore/rt/sched.rs
+++ b/src/libcore/rt/sched.rs
@@ -280,11 +280,9 @@ pub impl Scheduler {
 
         rtdebug!("ending running task");
 
-        do self.deschedule_running_task_and_then |dead_task| {
+        do self.deschedule_running_task_and_then |sched, dead_task| {
             let dead_task = Cell(dead_task);
-            do Local::borrow::<Scheduler> |sched| {
-                dead_task.take().recycle(&mut sched.stack_pool);
-            }
+            dead_task.take().recycle(&mut sched.stack_pool);
         }
 
         abort!("control reached end of task");
@@ -293,22 +291,18 @@ pub impl Scheduler {
     fn schedule_new_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
+        do self.switch_running_tasks_and_then(task) |sched, last_task| {
             let last_task = Cell(last_task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(last_task.take());
-            }
+            sched.enqueue_task(last_task.take());
         }
     }
 
     fn schedule_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
-        do self.switch_running_tasks_and_then(task) |last_task| {
+        do self.switch_running_tasks_and_then(task) |sched, last_task| {
             let last_task = Cell(last_task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(last_task.take());
-            }
+            sched.enqueue_task(last_task.take());
         }
     }
 
@@ -352,7 +346,11 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
+    ///
+    /// This passes a Scheduler pointer to the fn after the context switch
+    /// in order to prevent that fn from performing further scheduling operations.
+    /// Doing further scheduling could easily result in infinite recursion.
+    fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
@@ -360,7 +358,8 @@ pub impl Scheduler {
 
         unsafe {
             let blocked_task = this.current_task.swap_unwrap();
-            let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
+            let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine),
+                                            &fn(&mut Scheduler, ~Coroutine)>(f);
             let f_opaque = ClosureConverter::from_fn(f_fake_region);
             this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
         }
@@ -382,14 +381,18 @@ pub impl Scheduler {
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
+    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine,
+                                     f: &fn(&mut Scheduler, ~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("switching tasks");
 
         let old_running_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
+        let f_fake_region = unsafe {
+            transmute::<&fn(&mut Scheduler, ~Coroutine),
+                        &fn(&mut Scheduler, ~Coroutine)>(f)
+        };
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
         this.current_task = Some(next_task);
@@ -426,7 +429,7 @@ pub impl Scheduler {
         let cleanup_job = self.cleanup_job.swap_unwrap();
         match cleanup_job {
             DoNothing => { }
-            GiveTask(task, f) => (f.to_fn())(task)
+            GiveTask(task, f) => (f.to_fn())(self, task)
         }
     }
 
@@ -535,12 +538,12 @@ pub impl Coroutine {
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
 trait ClosureConverter {
-    fn from_fn(&fn(~Coroutine)) -> Self;
-    fn to_fn(self) -> &fn(~Coroutine);
+    fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self;
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine);
 }
 impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
-    fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
+    fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } }
 }
 
 #[cfg(test)]
@@ -604,11 +607,9 @@ mod test {
                     unsafe { *count_ptr = *count_ptr + 1; }
                 };
                 // Context switch directly to the new task
-                do sched.switch_running_tasks_and_then(task2) |task1| {
+                do sched.switch_running_tasks_and_then(task2) |sched, task1| {
                     let task1 = Cell(task1);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(task1.take());
-                    }
+                    sched.enqueue_task(task1.take());
                 }
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -658,12 +659,10 @@ mod test {
             let task = ~do Coroutine::new(&mut sched.stack_pool) {
                 let sched = Local::take::<Scheduler>();
                 assert!(sched.in_task_context());
-                do sched.deschedule_running_task_and_then() |task| {
+                do sched.deschedule_running_task_and_then() |sched, task| {
                     let task = Cell(task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        assert!(!sched.in_task_context());
-                        sched.enqueue_task(task.take());
-                    }
+                    assert!(!sched.in_task_context());
+                    sched.enqueue_task(task.take());
                 }
             };
             sched.enqueue_task(task);
@@ -680,8 +679,7 @@ mod test {
         do run_in_newsched_task {
             do spawn {
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
-                    let mut sched = Local::take::<Scheduler>();
+                do sched.deschedule_running_task_and_then |sched, task| {
                     let task = Cell(task);
                     do sched.event_loop.callback_ms(10) {
                         rtdebug!("in callback");
@@ -689,7 +687,6 @@ mod test {
                         sched.enqueue_task(task.take());
                         Local::put(sched);
                     }
-                    Local::put(sched);
                 }
             }
         }
diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs
index 1bbfe8d473d..16b0aef5e26 100644
--- a/src/libcore/rt/test.rs
+++ b/src/libcore/rt/test.rs
@@ -122,11 +122,7 @@ pub fn spawntask(f: ~fn()) {
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                      ~Task::without_unwinding(),
                                      f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell(task);
-        let sched = Local::take::<Scheduler>();
-        sched.schedule_new_task(task.take());
-    }
+    sched.schedule_new_task(task);
 }
 
 /// Create a new task and run it right now. Aborts on failure
@@ -137,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) {
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
                                      ~Task::without_unwinding(),
                                      f);
-    do sched.switch_running_tasks_and_then(task) |task| {
-        let task = Cell(task);
-        do Local::borrow::<Scheduler> |sched| {
-            sched.enqueue_task(task.take());
-        }
+    do sched.switch_running_tasks_and_then(task) |sched, task| {
+        sched.enqueue_task(task);
     }
 }
 
@@ -172,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) {
                                      f);
 
     if run_now {
-        do sched.switch_running_tasks_and_then(task) |task| {
-            let task = Cell(task);
-            do Local::borrow::<Scheduler> |sched| {
-                sched.enqueue_task(task.take());
-            }
+        do sched.switch_running_tasks_and_then(task) |sched, task| {
+            sched.enqueue_task(task);
         }
     } else {
         sched.enqueue_task(task);
@@ -199,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     // Switch to the scheduler
     let f = Cell(Cell(f));
     let sched = Local::take::<Scheduler>();
-    do sched.deschedule_running_task_and_then() |old_task| {
+    do sched.deschedule_running_task_and_then() |sched, old_task| {
         let old_task = Cell(old_task);
         let f = f.take();
-        let mut sched = Local::take::<Scheduler>();
         let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
             do (|| {
                 (f.take())()
@@ -210,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
                 // Check for failure then resume the parent task
                 unsafe { *failed_ptr = task::failing(); }
                 let sched = Local::take::<Scheduler>();
-                do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
-                    let new_task = Cell(new_task);
-                    do Local::borrow::<Scheduler> |sched| {
-                        sched.enqueue_task(new_task.take());
-                    }
+                do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
+                    sched.enqueue_task(new_task);
                 }
             }
         };
 
-        sched.resume_task_immediately(new_task);
+        sched.enqueue_task(new_task);
     }
 
     if !failed { Ok(()) } else { Err(()) }
diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs
index b2f475a6966..4482a92d916 100644
--- a/src/libcore/rt/tube.rs
+++ b/src/libcore/rt/tube.rs
@@ -72,7 +72,7 @@ impl<T> Tube<T> {
                 assert!(self.p.refcount() > 1); // There better be somebody to wake us up
                 assert!((*state).blocked_task.is_none());
                 let sched = Local::take::<Scheduler>();
-                do sched.deschedule_running_task_and_then |task| {
+                do sched.deschedule_running_task_and_then |_, task| {
                     (*state).blocked_task = Some(task);
                 }
                 rtdebug!("waking after tube recv");
@@ -107,11 +107,10 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone_cell = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 let mut tube_clone = tube_clone_cell.take();
                 tube_clone.send(1);
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -123,21 +122,17 @@ mod test {
         do run_in_newsched_task {
             let mut tube: Tube<int> = Tube::new();
             let tube_clone = tube.clone();
-            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let tube_clone = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
-                let tube_clone = tube_clone.take();
-                do Local::borrow::<Scheduler> |sched| {
-                    let tube_clone = tube_clone.take();
-                    do sched.event_loop.callback {
-                        let mut tube_clone = tube_clone.take();
-                        // The task should be blocked on this now and
-                        // sending will wake it up.
-                        tube_clone.send(1);
-                    }
+            do sched.deschedule_running_task_and_then |sched, task| {
+                let tube_clone = Cell(tube_clone.take());
+                do sched.event_loop.callback {
+                    let mut tube_clone = tube_clone.take();
+                    // The task should be blocked on this now and
+                    // sending will wake it up.
+                    tube_clone.send(1);
                 }
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -153,7 +148,7 @@ mod test {
             let tube_clone = tube.clone();
             let tube_clone = Cell(tube_clone);
             let sched = Local::take::<Scheduler>();
-            do sched.deschedule_running_task_and_then |task| {
+            do sched.deschedule_running_task_and_then |sched, task| {
                 callback_send(tube_clone.take(), 0);
 
                 fn callback_send(tube: Tube<int>, i: int) {
@@ -172,8 +167,7 @@ mod test {
                     }
                 }
 
-                let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.enqueue_task(task);
             }
 
             for int::range(0, MAX) |i| {
diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs
index e25b6140abb..1ee6504d11f 100644
--- a/src/libcore/rt/uv/uvio.rs
+++ b/src/libcore/rt/uv/uvio.rs
@@ -205,12 +205,10 @@ impl IoFactory for UvIoFactory {
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
 
             rtdebug!("connect: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell(task);
 
@@ -250,7 +248,7 @@ impl IoFactory for UvIoFactory {
             Ok(_) => Ok(~UvTcpListener::new(watcher)),
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |_, task| {
                     let task_cell = Cell(task);
                     do watcher.as_stream().close {
                         let scheduler = Local::take::<Scheduler>();
@@ -286,7 +284,7 @@ impl Drop for UvTcpListener {
     fn finalize(&self) {
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.as_stream().close {
                 let scheduler = Local::take::<Scheduler>();
@@ -348,7 +346,7 @@ impl Drop for UvTcpStream {
         rtdebug!("closing tcp stream");
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.close {
                 let scheduler = Local::take::<Scheduler>();
@@ -367,11 +365,9 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
             rtdebug!("read: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut watcher = watcher;
             let task_cell = Cell(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -413,7 +409,7 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@@ -507,11 +503,9 @@ fn test_read_and_block() {
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |sched, task| {
                     let task = Cell(task);
-                    do Local::borrow::<Scheduler> |scheduler| {
-                        scheduler.enqueue_task(task.take());
-                    }
+                    sched.enqueue_task(task.take());
                 }
             }