diff options
| -rw-r--r-- | src/libcore/rt/sched/mod.rs | 115 | ||||
| -rw-r--r-- | src/libcore/rt/uvio.rs | 41 |
2 files changed, 94 insertions, 62 deletions
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index a2a440ba76e..2aaf0a44455 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -20,6 +20,7 @@ use super::context::Context; #[cfg(test)] use super::uvio::UvEventLoop; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use int; +#[cfg(test)] use cell::Cell; mod local; @@ -46,14 +47,14 @@ pub struct Scheduler { // complaining type UnsafeTaskReceiver = sys::Closure; trait HackAroundBorrowCk { - fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, ~Task); + fn from_fn(&fn(~Task)) -> Self; + fn to_fn(self) -> &fn(~Task); } impl HackAroundBorrowCk for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { + fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } } } @@ -97,10 +98,12 @@ pub impl Scheduler { let scheduler = Scheduler::unsafe_local_borrow(); fn run_scheduler_once() { - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); if scheduler.resume_task_from_queue() { // Ok, a task ran. Nice! We'll do it again later - scheduler.event_loop.callback(run_scheduler_once); + do Scheduler::local_borrow |scheduler| { + scheduler.event_loop.callback(run_scheduler_once); + } } } @@ -124,9 +127,13 @@ pub impl Scheduler { local::put(sched); } + fn local_take() -> ~Scheduler { + local::take() + } + // * Scheduler-context operations - fn resume_task_from_queue(&mut self) -> bool { + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); let mut self = self; @@ -137,12 +144,14 @@ pub impl Scheduler { } None => { rtdebug!("no tasks in queue"); + local::put(self); return false; } } } - fn resume_task_immediately(&mut self, task: ~Task) { + fn resume_task_immediately(~self, task: ~Task) { + let mut self = self; assert!(!self.in_task_context()); rtdebug!("scheduling a task"); @@ -151,20 +160,22 @@ pub impl Scheduler { self.current_task = Some(task); self.enqueue_cleanup_job(DoNothing); + local::put(self); + // Take pointers to both the task and scheduler's saved registers. - { - let (sched_context, _, next_task_context) = self.get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - } + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, _, next_task_context) = sched.get_contexts(); + let next_task_context = next_task_context.unwrap(); + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(sched_context, next_task_context); + let sched = Scheduler::unsafe_local_borrow(); // The running task should have passed ownership elsewhere - assert!(self.current_task.is_none()); + assert!(sched.current_task.is_none()); // Running tasks may have asked us to do some cleanup - self.run_cleanup_job(); + sched.run_cleanup_job(); } @@ -172,18 +183,23 @@ pub impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. - fn terminate_current_task(&mut self) { + fn terminate_current_task(~self) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("ending running task"); let dead_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RecycleTask(dead_task)); - { - let (sched_context, last_task_context, _) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + + // Control never reaches here } /// Block a running task, context switch to the scheduler, then pass the @@ -194,22 +210,25 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { + fn deschedule_running_task_and_then(~self, f: &fn(~Task)) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("blocking task"); let blocked_task = self.current_task.swap_unwrap(); let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f) + transmute::<&fn(~Task), &fn(~Task)>(f) }; let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - { - let (sched_context, last_task_context, _) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); // We could be executing in a different thread now let sched = Scheduler::unsafe_local_borrow(); @@ -219,7 +238,8 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) { + fn resume_task_from_running_task_direct(~self, next_task: ~Task) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("switching tasks"); @@ -227,12 +247,14 @@ pub impl Scheduler { let old_running_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RescheduleTask(old_running_task)); self.current_task = Some(next_task); - { - let (_, last_task_context, next_task_context) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (_, last_task_context, next_task_context) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + let next_task_context = next_task_context.unwrap(); + Context::swap(last_task_context, next_task_context); // We could be executing in a different thread now let sched = Scheduler::unsafe_local_borrow(); @@ -261,7 +283,7 @@ pub impl Scheduler { self.task_queue.push_front(task); } RecycleTask(task) => task.recycle(&mut self.stack_pool), - GiveTask(task, f) => (f.to_fn())(self, task) + GiveTask(task, f) => (f.to_fn())(task) } } @@ -338,7 +360,7 @@ pub impl Task { start(); - let sched = Scheduler::unsafe_local_borrow(); + let sched = Scheduler::local_take(); sched.terminate_current_task(); }; return wrapper; @@ -398,7 +420,7 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - let sched = Scheduler::unsafe_local_borrow(); + let mut sched = Scheduler::local_take(); let task2 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; @@ -463,7 +485,7 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - let sched = Scheduler::unsafe_local_borrow(); + let mut sched = Scheduler::local_take(); let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -483,11 +505,14 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); + let sched = Scheduler::local_take(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.task_queue.push_back(task); + do sched.deschedule_running_task_and_then() |task| { + let task = Cell(task); + do Scheduler::local_borrow |sched| { + assert!(!sched.in_task_context()); + sched.task_queue.push_back(task.take()); + } } }; sched.task_queue.push_back(task); diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index a43ec07c2de..e3fed29ddd2 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,14 +104,16 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |task| { rtdebug!("connect: entered scheduler context"); - assert!(!scheduler.in_task_context()); + do Scheduler::local_borrow |scheduler| { + assert!(!scheduler.in_task_context()); + } let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -131,7 +133,7 @@ impl IoFactory for UvIoFactory { unsafe { (*result_cell_ptr).put_back(maybe_stream); } // Context switch - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -176,10 +178,10 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |task| { let task_cell = Cell(task); let mut server_tcp_watcher = server_tcp_watcher; do server_tcp_watcher.listen |server_stream_watcher, status| { @@ -199,7 +201,7 @@ impl TcpListener for UvTcpListener { rtdebug!("resuming task from listen"); // Context switch - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -239,13 +241,15 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |task| { rtdebug!("read: entered scheduler context"); - assert!(!scheduler.in_task_context()); + do Scheduler::local_borrow |scheduler| { + assert!(!scheduler.in_task_context()); + } let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -271,7 +275,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -283,11 +287,11 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { &*buf_ptr }; @@ -302,7 +306,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -404,12 +408,15 @@ fn test_read_and_block() { } reads += 1; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |scheduler, task| { - scheduler.task_queue.push_back(task); + do scheduler.deschedule_running_task_and_then |task| { + let task = Cell(task); + do Scheduler::local_borrow |scheduler| { + scheduler.task_queue.push_back(task.take()); + } } } |
