diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-05-29 17:52:00 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-05-29 17:52:00 -0700 |
| commit | 134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 (patch) | |
| tree | 74f7efa3539745c6144b6d9775a3726677198220 /src | |
| parent | f343e6172b7132545c72e3e09e6afccc06fdcee7 (diff) | |
| download | rust-134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9.tar.gz rust-134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9.zip | |
core::rt: Change the signature of context switching methods to avoid infinite recursion
Diffstat (limited to 'src')
| -rw-r--r-- | src/libcore/rt/comm.rs | 4 | ||||
| -rw-r--r-- | src/libcore/rt/mod.rs | 7 | ||||
| -rw-r--r-- | src/libcore/rt/sched.rs | 63 | ||||
| -rw-r--r-- | src/libcore/rt/test.rs | 32 | ||||
| -rw-r--r-- | src/libcore/rt/tube.rs | 34 | ||||
| -rw-r--r-- | src/libcore/rt/uv/uvio.rs | 26 |
6 files changed, 66 insertions, 100 deletions
diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs index d108e20347a..8ff3887f779 100644 --- a/src/libcore/rt/comm.rs +++ b/src/libcore/rt/comm.rs @@ -159,7 +159,7 @@ impl<T> PortOne<T> { // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read @@ -178,12 +178,10 @@ impl<T> PortOne<T> { // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); let task = Cell(task); - let mut sched = Local::take::<Scheduler>(); do sched.event_loop.callback { let sched = Local::take::<Scheduler>(); sched.resume_task_immediately(task.take()); } - Local::put(sched); } _ => util::unreachable() } diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index e23ad76a8c6..1113d7abe7d 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -238,12 +238,9 @@ fn test_context() { let task = ~do Coroutine::new(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - let task = Cell(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + sched.enqueue_task(task); } }; sched.enqueue_task(task); diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index c6d6bb9f39e..089c95cd7cd 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -280,11 +280,9 @@ pub impl Scheduler { rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |dead_task| { + do self.deschedule_running_task_and_then |sched, dead_task| { let dead_task = Cell(dead_task); - do Local::borrow::<Scheduler> |sched| { - dead_task.take().recycle(&mut sched.stack_pool); - } + dead_task.take().recycle(&mut sched.stack_pool); } abort!("control reached end of task"); @@ -293,22 +291,18 @@ pub impl Scheduler { fn schedule_new_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { + do self.switch_running_tasks_and_then(task) |sched, last_task| { let last_task = Cell(last_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(last_task.take()); - } + sched.enqueue_task(last_task.take()); } } @@ -352,7 +346,11 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) { + /// + /// This passes a Scheduler pointer to the fn after the context switch + /// in order to prevent that fn from performing further scheduling operations. + /// Doing further scheduling could easily result in infinite recursion. + fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); @@ -360,7 +358,8 @@ pub impl Scheduler { unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -382,14 +381,18 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) { + fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, + f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("switching tasks"); let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) }; + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f) + }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.current_task = Some(next_task); @@ -426,7 +429,7 @@ pub impl Scheduler { let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(task) + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -535,12 +538,12 @@ pub impl Coroutine { // complaining type UnsafeTaskReceiver = sys::Closure; trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); + fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } } #[cfg(test)] @@ -604,11 +607,9 @@ mod test { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |task1| { + do sched.switch_running_tasks_and_then(task2) |sched, task1| { let task1 = Cell(task1); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task1.take()); - } + sched.enqueue_task(task1.take()); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -658,12 +659,10 @@ mod test { let task = ~do Coroutine::new(&mut sched.stack_pool) { let sched = Local::take::<Scheduler>(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { let task = Cell(task); - do Local::borrow::<Scheduler> |sched| { - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } + assert!(!sched.in_task_context()); + sched.enqueue_task(task.take()); } }; sched.enqueue_task(task); @@ -680,8 +679,7 @@ mod test { do run_in_newsched_task { do spawn { let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { - let mut sched = Local::take::<Scheduler>(); + do sched.deschedule_running_task_and_then |sched, task| { let task = Cell(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); @@ -689,7 +687,6 @@ mod test { sched.enqueue_task(task.take()); Local::put(sched); } - Local::put(sched); } } } diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs index 1bbfe8d473d..16b0aef5e26 100644 --- a/src/libcore/rt/test.rs +++ b/src/libcore/rt/test.rs @@ -122,11 +122,7 @@ pub fn spawntask(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - let sched = Local::take::<Scheduler>(); - sched.schedule_new_task(task.take()); - } + sched.schedule_new_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -137,11 +133,8 @@ pub fn spawntask_immediately(f: ~fn()) { let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -172,11 +165,8 @@ pub fn spawntask_random(f: ~fn()) { f); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -199,10 +189,9 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Switch to the scheduler let f = Cell(Cell(f)); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |old_task| { + do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let mut sched = Local::take::<Scheduler>(); let new_task = ~do Coroutine::new(&mut sched.stack_pool) { do (|| { (f.take())() @@ -210,16 +199,13 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { // Check for failure then resume the parent task unsafe { *failed_ptr = task::failing(); } let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell(new_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(new_task.take()); - } + do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { + sched.enqueue_task(new_task); } } }; - sched.resume_task_immediately(new_task); + sched.enqueue_task(new_task); } if !failed { Ok(()) } else { Err(()) } diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs index b2f475a6966..4482a92d916 100644 --- a/src/libcore/rt/tube.rs +++ b/src/libcore/rt/tube.rs @@ -72,7 +72,7 @@ impl<T> Tube<T> { assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!((*state).blocked_task.is_none()); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |_, task| { (*state).blocked_task = Some(task); } rtdebug!("waking after tube recv"); @@ -107,11 +107,10 @@ mod test { let tube_clone = tube.clone(); let tube_clone_cell = Cell(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -123,21 +122,17 @@ mod test { do run_in_newsched_task { let mut tube: Tube<int> = Tube::new(); let tube_clone = tube.clone(); - let tube_clone = Cell(Cell(Cell(tube_clone))); + let tube_clone = Cell(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { - let tube_clone = tube_clone.take(); - do Local::borrow::<Scheduler> |sched| { - let tube_clone = tube_clone.take(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone.take(); - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } + do sched.deschedule_running_task_and_then |sched, task| { + let tube_clone = Cell(tube_clone.take()); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); } - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -153,7 +148,7 @@ mod test { let tube_clone = tube.clone(); let tube_clone = Cell(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { callback_send(tube_clone.take(), 0); fn callback_send(tube: Tube<int>, i: int) { @@ -172,8 +167,7 @@ mod test { } } - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index e25b6140abb..1ee6504d11f 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -205,12 +205,10 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("connect: entered scheduler context"); - do Local::borrow::<Scheduler> |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -250,7 +248,7 @@ impl IoFactory for UvIoFactory { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); @@ -286,7 +284,7 @@ impl Drop for UvTcpListener { fn finalize(&self) { let watcher = self.watcher(); let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); @@ -348,7 +346,7 @@ impl Drop for UvTcpStream { rtdebug!("closing tcp stream"); let watcher = self.watcher(); let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); do watcher.close { let scheduler = Local::take::<Scheduler>(); @@ -367,11 +365,9 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); - do Local::borrow::<Scheduler> |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -413,7 +409,7 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; @@ -507,11 +503,9 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell(task); - do Local::borrow::<Scheduler> |scheduler| { - scheduler.enqueue_task(task.take()); - } + sched.enqueue_task(task.take()); } } |
