diff options
Diffstat (limited to 'src/libstd/rt/sched.rs')
| -rw-r--r-- | src/libstd/rt/sched.rs | 1027 |
1 files changed, 454 insertions, 573 deletions
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ae4ca2b9783..0326c2cbfe5 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,8 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use cast::transmute; +use sys; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -27,6 +28,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -59,11 +61,8 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option<CleanupJob>, @@ -90,7 +89,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -103,15 +101,14 @@ impl Scheduler { } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -120,8 +117,7 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything @@ -132,8 +128,47 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(~sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::<Scheduler>(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + + rtdebug!("post sched.run(), cleaning up scheduler task"); + let mut stask = Local::take::<Task>(); + stask.destroyed = true; + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -142,79 +177,88 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow::<Task,()> |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::<Scheduler>(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } - fn run_sched_once() { + // One iteration of the scheduler loop, always run at least once. - let mut sched = Local::take::<Scheduler>(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::<Scheduler>(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. + fn run_sched_once() { - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::<Scheduler>(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow::<Scheduler, ()> |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow::<Scheduler, ()> |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -234,18 +278,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -283,30 +315,23 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); + task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); - return true; + return None; } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -325,12 +350,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -338,7 +365,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.take_unwrap(); + let mut home = task.take_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -351,69 +378,45 @@ impl Scheduler { // Resume a task from the queue - but also take into account that // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + let mut this = self; - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless + let mut task = task; + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; } } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; - } - ResumeNow => { - rtdebug!("resuming now"); + AnySched if this.run_anything => { + task.give_home(AnySched); this.resume_task_immediately(task); - return true; + return None; } - Requeue => { - rtdebug!("re-queueing") + AnySched => { + task.give_home(AnySched); this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); + return Some(this); } } } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + return Some(this); + } } } @@ -422,33 +425,20 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("ending running task"); - - // This task is post-cleanup, so it must be unkillable. This sequence - // of descheduling and recycling must not get interrupted by a kill. - // FIXME(#7544): Make this use an inner descheduler, like yield should. - this.current_task.get_mut_ref().death.unkillable += 1; - - do this.deschedule_running_task_and_then |sched, dead_task| { - match dead_task.wake() { - Some(dead_task) => { - let mut dead_task = dead_task; - dead_task.death.unkillable -= 1; // FIXME(#7544) ugh - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); - } - None => rtabort!("dead task killed before recycle"), - } + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); } - - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // If a scheduling action is performed, return None. If not, + // return Some(sched). + + pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -461,55 +451,115 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); do this.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_blocked_task(last_task); } + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops - - pub fn resume_task_immediately(~self, task: ~Task) { + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. + + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::<Task>(); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); + + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - Local::put(this); + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); - // Take pointers to both the task and scheduler's saved registers. + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - let sched = Local::unsafe_borrow::<Scheduler>(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); + + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; + + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); + + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); + + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); + } - // Running tasks may have asked us to do some cleanup + // When the context swaps back to the scheduler we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. + unsafe { + let sched = Local::unsafe_borrow::<Scheduler>(); (*sched).run_cleanup_job(); // Must happen after running the cleanup job (of course). - // Might not be running in task context; if not, a later call to - // resume_task_immediately will take care of this. - (*sched).current_task.map(|t| t.death.check_killed()); + let task = Local::unsafe_borrow::<Task>(); + (*task).death.check_killed(); + } + } + + // There are a variety of "obvious" functions to be passed to + // change_task_context, so we can make a few "named cases". + + // Enqueue the old task on the current scheduler. + pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { + sched.enqueue_task(task); + } + + // Sometimes we just want the old API though. + + pub fn resume_task_immediately(~self, task: ~Task) { + do self.change_task_context(task) |sched, stask| { + sched.sched_task = Some(stask); } } @@ -533,152 +583,78 @@ impl Scheduler { /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; - - unsafe { - let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - } - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::<Scheduler>(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); - } + let stask = this.sched_task.take_unwrap(); + // Otherwise this is the same as below. + this.switch_running_tasks_and_then(stask, f); } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. pub fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(&mut Scheduler, BlockedTask)) { - let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; + // This is where we convert the BlockedTask-taking closure into one + // that takes just a Task, and is aware of the block-or-killed protocol. + do self.change_task_context(next_task) |sched, task| { + // Task might need to receive a kill signal instead of blocking. + // We can call the "and_then" only if it blocks successfully. + match BlockedTask::try_block(task) { + Left(killed_task) => sched.enqueue_task(killed_task), + Right(blocked_task) => f(sched, blocked_task), + } + } + } - let old_running_task = this.current_task.take_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f) + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow::<Scheduler,()> |sched| { + sched.enqueue_task(next_task.take()); }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); + } - Local::put(this); + // A helper that looks up the scheduler and runs a task. If it can + // be run now it is run now. + pub fn run_task(new_task: ~Task) { + let sched = Local::take::<Scheduler>(); + sched.schedule_task(new_task).map_consume(Local::put); + } + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::<Scheduler>(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) } } - - // * Other stuff pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => { - let f = f.to_fn(); - // Task might need to receive a kill signal instead of blocking. - // We can call the "and_then" only if it blocks successfully. - match BlockedTask::try_block(task) { - Left(killed_task) => self.enqueue_task(killed_task), - Right(blocked_task) => f(self, blocked_task), - } - } + GiveTask(task, f) => f.to_fn()(self, task) } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -700,29 +676,73 @@ impl SchedHandle { // complaining type UnsafeTaskReceiver = raw::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask); + fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -745,46 +765,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::<Scheduler>(); - assert!(sched.sched_id() == sched_id); - Local::put::<Scheduler>(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -792,113 +816,93 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); - }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t4f = Cell::new(t4f); - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); - }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); + + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); + + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; normal_thread.join(); @@ -906,7 +910,6 @@ mod test { } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -916,123 +919,13 @@ mod test { } #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::<Scheduler>(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - sched.enqueue_blocked_task(task1); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow::<Scheduler, ()> |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::<Scheduler>(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::<Scheduler>(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1053,34 +946,21 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); - }; - sched1.enqueue_task(task1); - - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); - }; - sched2.enqueue_task(task2); + let port = Cell::new(port); + let chan = Cell::new(chan); - let sched1_cell = Cell::new(sched1); - let thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + let _thread_one = do Thread::start { + let chan = Cell::new(chan.take()); + do run_in_newsched_task_core { + chan.take().send(()); + } }; - let sched2_cell = Cell::new(sched2); - let thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let _thread_two = do Thread::start { + let port = Cell::new(port.take()); + do run_in_newsched_task_core { + port.take().recv(); + } }; thread1.join(); @@ -1112,21 +992,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1151,9 +1031,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1178,15 +1058,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } |
