diff options
| author | toddaaro <github@opprobrio.us> | 2013-07-31 13:52:22 -0700 |
|---|---|---|
| committer | toddaaro <github@opprobrio.us> | 2013-08-01 15:18:29 -0700 |
| commit | 1d82fe5aea71b1c265634f32716b268972141efb (patch) | |
| tree | 876b41aeb9d01d450bdd08f855f572804f24066d | |
| parent | a5f55b3ead06886190d905cfc826bf1d072ff675 (diff) | |
| download | rust-1d82fe5aea71b1c265634f32716b268972141efb.tar.gz rust-1d82fe5aea71b1c265634f32716b268972141efb.zip | |
fixed incorrect handling of returned scheduler option and restructed scheduler functions slightly
| -rw-r--r-- | src/libstd/rt/comm.rs | 8 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 12 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 109 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 2 |
6 files changed, 64 insertions, 75 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index bb106edad94..5a671d877d2 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -131,10 +131,7 @@ impl<T> ChanOne<T> { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let mut sched = Local::take::<Scheduler>(); - rtdebug!("rendezvous send"); - sched.metrics.rendezvous_sends += 1; - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } @@ -350,8 +347,7 @@ impl<T> Drop for ChanOne<T> { assert!((*this.packet()).payload.is_none()); let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { - let sched = Local::take::<Scheduler>(); - sched.schedule_task(woken_task); + Scheduler::run_task(woken_task); }; } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 73c30e5779c..f0f4b646103 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -259,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut handles = ~[]; do nscheds.times { + rtdebug!("inserting a regular scheduler"); + // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); @@ -344,6 +346,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // Run each remaining scheduler in a thread. while !scheds.is_empty() { + rtdebug!("creating regular schedulers"); let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { @@ -360,15 +363,21 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { if use_main_sched { + rtdebug!("about to create the main scheduler task"); + let mut main_sched = main_sched.get(); let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, home, main.take()); main_task.death.on_exit = Some(on_exit.take()); + rtdebug!("boostrapping main_task"); + main_sched.bootstrap(main_task); } + rtdebug!("waiting for threads"); + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); @@ -404,7 +413,6 @@ pub fn context() -> RuntimeContext { if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; } else if Local::exists::<Task>() { - rtdebug!("either task or scheduler context in newrt"); // In this case we know it is a new runtime context, but we // need to check which one. Going to try borrowing task to // check. Task should always be in TLS, so hopefully this diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 816c963ad18..4abe69a7d13 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -142,11 +142,11 @@ impl Scheduler { local_ptr::init_tls_key(); // Create a task for the scheduler with an empty context. - let sched_task = Task::new_sched_task(); + let sched_task = ~Task::new_sched_task(); // Now that we have an empty task struct for the scheduler // task, put it in TLS. - Local::put::(~sched_task); + Local::put::(sched_task); // Now, as far as all the scheduler state is concerned, we are // inside the "scheduler" context. So we can act like the @@ -165,8 +165,6 @@ impl Scheduler { // cleaning up the memory it uses. As we didn't actually call // task.run() on the scheduler task we never get through all // the cleanup code it runs. - - rtdebug!("post sched.run(), cleaning up scheduler task"); let mut stask = Local::take::<Task>(); stask.destroyed = true; } @@ -224,6 +222,8 @@ impl Scheduler { // 2) A shutdown is also easy, shutdown. // 3) A pinned task - we resume immediately and do not return // here. + // 4) A message from another scheduler with a non-homed task + // to run here. let result = sched.interpret_message_queue(); let sched = match result { @@ -236,6 +236,8 @@ impl Scheduler { } }; + // Second activity is to try resuming a task from the queue. + let result = sched.resume_task_from_queue(); let mut sched = match result { Some(sched) => { @@ -333,8 +335,7 @@ impl Scheduler { return None; } Some(TaskFromFriend(task)) => { - this.schedule_task_sched_context(task); - return None; + return this.sched_schedule_task(task); } Some(Wake) => { this.sleepy = false; @@ -442,8 +443,6 @@ impl Scheduler { } } - // * Task-context operations - /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { @@ -457,10 +456,17 @@ impl Scheduler { } } - // If a scheduling action is performed, return None. If not, - // return Some(sched). + // Scheduling a task requires a few checks to make sure the task + // ends up in the appropriate location. The run_anything flag on + // the scheduler and the home on the task need to be checked. This + // helper performs that check. It takes a function that specifies + // how to queue the the provided task if that is the correct + // action. This is a "core" function that requires handling the + // returned Option correctly. - pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { + pub fn schedule_task(~self, task: ~Task, + schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) + -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -474,9 +480,7 @@ impl Scheduler { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - do this.switch_running_tasks_and_then(task) |sched, last_task| { - sched.enqueue_blocked_task(last_task); - } + schedule_fn(this, task); return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here @@ -489,35 +493,30 @@ impl Scheduler { } } - // BAD BAD BAD BAD BAD - // Do something instead of just copy-pasting this. - pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> { - - // is the task home? - let is_home = task.is_home_no_tls(&self); - - // does the task have a home? - let homed = task.homed(); - - let mut this = self; - - if is_home || (!homed && this.run_anything) { - // here we know we are home, execute now OR we know we - // aren't homed, and that this sched doesn't care - rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - this.resume_task_immediately(task); - return None; - } else if !homed && !this.run_anything { - // the task isn't homed, but it can't be run here - this.enqueue_task(task); - return Some(this); - } else { - // task isn't home, so don't run it here, send it home - Scheduler::send_task_home(task); - return Some(this); + // There are two contexts in which schedule_task can be called: + // inside the scheduler, and inside a task. These contexts handle + // executing the task slightly differently. In the scheduler + // context case we want to receive the scheduler as an input, and + // manually deal with the option. In the task context case we want + // to use TLS to find the scheduler, and deal with the option + // inside the helper. + + pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { + do self.schedule_task(task) |sched, next_task| { + sched.resume_task_immediately(next_task); } } + // Task context case - use TLS. + pub fn run_task(task: ~Task) { + let sched = Local::take::<Scheduler>(); + let opt = do sched.schedule_task(task) |sched, next_task| { + do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { + sched.enqueue_blocked_task(last_task); + } + }; + opt.map_consume(Local::put); + } // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so @@ -586,7 +585,7 @@ impl Scheduler { Context::swap(current_task_context, next_task_context); } - // When the context swaps back to the scheduler we immediately + // When the context swaps back to this task we immediately // run the cleanup job, as expected by the previously called // swap_contexts function. unsafe { @@ -599,15 +598,8 @@ impl Scheduler { } } - // There are a variety of "obvious" functions to be passed to - // change_task_context, so we can make a few "named cases". - - // Enqueue the old task on the current scheduler. - pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { - sched.enqueue_task(task); - } - - // Sometimes we just want the old API though. + // Old API for task manipulation implemented over the new core + // function. pub fn resume_task_immediately(~self, task: ~Task) { do self.change_task_context(task) |sched, stask| { @@ -668,13 +660,6 @@ impl Scheduler { }; } - // A helper that looks up the scheduler and runs a task. If it can - // be run now it is run now. - pub fn run_task(new_task: ~Task) { - let sched = Local::take::<Scheduler>(); - sched.schedule_task(new_task).map_consume(Local::put); - } - // Returns a mutable reference to both contexts involved in this // swap. This is unsafe - we are getting mutable internal // references to keep even when we don't own the tasks. It looks @@ -692,8 +677,6 @@ impl Scheduler { } } - // * Other stuff - pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { self.cleanup_job = Some(job); } @@ -1004,22 +987,22 @@ mod test { let port = Cell::new(port); let chan = Cell::new(chan); - let _thread_one = do Thread::start { + let thread_one = do Thread::start { let chan = Cell::new(chan.take()); do run_in_newsched_task_core { chan.take().send(()); } }; - let _thread_two = do Thread::start { + let thread_two = do Thread::start { let port = Cell::new(port.take()); do run_in_newsched_task_core { port.take().recv(); } }; - thread1.join(); - thread2.join(); + thread_two.join(); + thread_one.join(); } } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 13fdaded84b..fcc6ebeada6 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -209,7 +209,7 @@ impl Task { } pub fn run(&mut self, f: &fn()) { - + rtdebug!("run called on task: %u", borrow::to_uint(self)); self.unwinder.try(f); { let _ = self.taskgroup.take(); } self.death.collect_failure(!self.unwinder.unwinding); @@ -301,7 +301,7 @@ impl Task { impl Drop for Task { fn drop(&self) { - rtdebug!("called drop for a task"); + rtdebug!("called drop for a task: %u", borrow::to_uint(self)); assert!(self.destroyed) } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index d0970ec5866..22eb42e2ee8 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -130,7 +130,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { while !scheds.is_empty() { let mut sched = scheds.pop(); - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("bootstrapping non-primary scheduler"); + }; let bootstrap_task_cell = Cell::new(bootstrap_task); let sched_cell = Cell::new(sched); let thread = do Thread::start { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index e93333661cf..5be19752152 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -253,7 +253,7 @@ impl IoFactory for UvIoFactory { let scheduler = Local::take::<Scheduler>(); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |_sched, task| { + do scheduler.deschedule_running_task_and_then |_, task| { rtdebug!("connect: entered scheduler context"); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); |
