diff options
Diffstat (limited to 'src/libstd/task/spawn.rs')
| -rw-r--r-- | src/libstd/task/spawn.rs | 531 |
1 files changed, 11 insertions, 520 deletions
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 66a2e8cc5e0..578839d4542 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -9,6 +9,10 @@ // except according to those terms. /*!************************************************************************** + * + * WARNING: linked failure has been removed since this doc comment was written, + * but it was so pretty that I didn't want to remove it. + * * Spawning & linked failure * * Several data structures are involved in task management to allow properly @@ -73,541 +77,30 @@ use prelude::*; -use cast::transmute; -use cast; use cell::Cell; -use comm::{Chan, GenericChan, oneshot}; -use container::MutableMap; -use hashmap::{HashSet, HashSetMoveIterator}; -use local_data; +use comm::{GenericChan, oneshot}; use rt::local::Local; use rt::sched::{Scheduler, Shutdown, TaskFromFriend}; use rt::task::{Task, Sched}; -use rt::task::{UnwindResult, Success, Failure}; +use rt::task::UnwindResult; use rt::thread::Thread; use rt::work_queue::WorkQueue; -use rt::{in_green_task_context, new_event_loop, KillHandle}; -use task::LinkedFailure; +use rt::{in_green_task_context, new_event_loop}; use task::SingleThreaded; use task::TaskOpts; -use task::unkillable; -use uint; -use unstable::sync::Exclusive; -use util; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; #[cfg(test)] use task; -struct TaskSet(HashSet<KillHandle>); - -impl TaskSet { - #[inline] - fn new() -> TaskSet { - TaskSet(HashSet::new()) - } - #[inline] - fn insert(&mut self, task: KillHandle) { - let didnt_overwrite = (**self).insert(task); - assert!(didnt_overwrite); - } - #[inline] - fn remove(&mut self, task: &KillHandle) { - let was_present = (**self).remove(task); - assert!(was_present); - } - #[inline] - fn move_iter(self) -> HashSetMoveIterator<KillHandle> { - (*self).move_iter() - } -} - -// One of these per group of linked-failure tasks. -struct TaskGroupData { - // All tasks which might kill this group. When this is empty, the group - // can be "GC"ed (i.e., its link in the ancestor list can be removed). - members: TaskSet, - // All tasks unidirectionally supervised by (directly or transitively) - // tasks in this group. - descendants: TaskSet, -} -type TaskGroupArc = Exclusive<Option<TaskGroupData>>; - -type TaskGroupInner<'self> = &'self mut Option<TaskGroupData>; - -// A taskgroup is 'dead' when nothing can cause it to fail; only members can. -fn taskgroup_is_dead(tg: &TaskGroupData) -> bool { - tg.members.is_empty() -} - -// A list-like structure by which taskgroups keep track of all ancestor groups -// which may kill them. Needed for tasks to be able to remove themselves from -// ancestor groups upon exit. The list has a node for each "generation", and -// ends either at the root taskgroup (which has no ancestors) or at a -// taskgroup which was spawned-unlinked. Tasks from intermediate generations -// have references to the middle of the list; when intermediate generations -// die, their node in the list will be collected at a descendant's spawn-time. -struct AncestorNode { - // Since the ancestor list is recursive, we end up with references to - // exclusives within other exclusives. This is dangerous business (if - // circular references arise, deadlock and memory leaks are imminent). - // Hence we assert that this counter monotonically decreases as we - // approach the tail of the list. - generation: uint, - // Handle to the tasks in the group of the current generation. - parent_group: TaskGroupArc, - // Recursive rest of the list. - ancestors: AncestorList, -} - -struct AncestorList(Option<Exclusive<AncestorNode>>); - -// Accessors for taskgroup arcs and ancestor arcs that wrap the unsafety. -#[inline] -fn access_group<U>(x: &TaskGroupArc, blk: |TaskGroupInner| -> U) -> U { - unsafe { - x.with(blk) - } -} - -#[inline] -fn access_ancestors<U>( - x: &Exclusive<AncestorNode>, - blk: |x: &mut AncestorNode| -> U) - -> U { - unsafe { - x.with(blk) - } -} - -#[inline] #[cfg(test)] -fn check_generation(younger: uint, older: uint) { assert!(younger > older); } -#[inline] #[cfg(not(test))] -fn check_generation(_younger: uint, _older: uint) { } - -#[inline] #[cfg(test)] -fn incr_generation(ancestors: &AncestorList) -> uint { - ancestors.as_ref().map_default(0, |arc| access_ancestors(arc, |a| a.generation+1)) -} -#[inline] #[cfg(not(test))] -fn incr_generation(_ancestors: &AncestorList) -> uint { 0 } - -// Iterates over an ancestor list. -// (1) Runs forward_blk on each ancestral taskgroup in the list -// (2) If forward_blk "break"s, runs optional bail_blk on all ancestral -// taskgroups that forward_blk already ran on successfully (Note: bail_blk -// is NOT called on the block that forward_blk broke on!). -// (3) As a bonus, coalesces away all 'dead' taskgroup nodes in the list. -fn each_ancestor(list: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool) - -> bool { - // "Kickoff" call - there was no last generation. - return !coalesce(list, bail_blk, forward_blk, uint::max_value); - - // Recursively iterates, and coalesces afterwards if needed. Returns - // whether or not unwinding is needed (i.e., !successful iteration). - fn coalesce(list: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool, - last_generation: uint) -> bool { - let (coalesce_this, early_break) = - iterate(list, bail_blk, forward_blk, last_generation); - // What should our next ancestor end up being? - if coalesce_this.is_some() { - // Needed coalesce. Our next ancestor becomes our old - // ancestor's next ancestor. ("next = old_next->next;") - *list = coalesce_this.unwrap(); - } - return early_break; - } - - // Returns an optional list-to-coalesce and whether unwinding is needed. - // Option<ancestor_list>: - // Whether or not the ancestor taskgroup being iterated over is - // dead or not; i.e., it has no more tasks left in it, whether or not - // it has descendants. If dead, the caller shall coalesce it away. - // bool: - // True if the supplied block did 'break', here or in any recursive - // calls. If so, must call the unwinder on all previous nodes. - fn iterate(ancestors: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool, - last_generation: uint) - -> (Option<AncestorList>, bool) { - // At each step of iteration, three booleans are at play which govern - // how the iteration should behave. - // 'nobe_is_dead' - Should the list should be coalesced at this point? - // Largely unrelated to the other two. - // 'need_unwind' - Should we run the bail_blk at this point? (i.e., - // do_continue was false not here, but down the line) - // 'do_continue' - Did the forward_blk succeed at this point? (i.e., - // should we recurse? or should our callers unwind?) - - let forward_blk = Cell::new(forward_blk); - - // The map defaults to None, because if ancestors is None, we're at - // the end of the list, which doesn't make sense to coalesce. - do ancestors.as_ref().map_default((None,false)) |ancestor_arc| { - // NB: Takes a lock! (this ancestor node) - do access_ancestors(ancestor_arc) |nobe| { - // Argh, but we couldn't give it to coalesce() otherwise. - let forward_blk = forward_blk.take(); - // Check monotonicity - check_generation(last_generation, nobe.generation); - /*##########################################################* - * Step 1: Look at this ancestor group (call iterator block). - *##########################################################*/ - let mut nobe_is_dead = false; - let do_continue = - // NB: Takes a lock! (this ancestor node's parent group) - do access_group(&nobe.parent_group) |tg_opt| { - // Decide whether this group is dead. Note that the - // group being *dead* is disjoint from it *failing*. - nobe_is_dead = match *tg_opt { - Some(ref tg) => taskgroup_is_dead(tg), - None => nobe_is_dead - }; - // Call iterator block. (If the group is dead, it's - // safe to skip it. This will leave our KillHandle - // hanging around in the group even after it's freed, - // but that's ok because, by virtue of the group being - // dead, nobody will ever kill-all (for) over it.) - if nobe_is_dead { true } else { forward_blk(tg_opt) } - }; - /*##########################################################* - * Step 2: Recurse on the rest of the list; maybe coalescing. - *##########################################################*/ - // 'need_unwind' is only set if blk returned true above, *and* - // the recursive call early-broke. - let mut need_unwind = false; - if do_continue { - // NB: Takes many locks! (ancestor nodes & parent groups) - need_unwind = coalesce(&mut nobe.ancestors, |tg| bail_blk(tg), - forward_blk, nobe.generation); - } - /*##########################################################* - * Step 3: Maybe unwind; compute return info for our caller. - *##########################################################*/ - if need_unwind && !nobe_is_dead { - do access_group(&nobe.parent_group) |tg_opt| { - bail_blk(tg_opt) - } - } - // Decide whether our caller should unwind. - need_unwind = need_unwind || !do_continue; - // Tell caller whether or not to coalesce and/or unwind - if nobe_is_dead { - // Swap the list out here; the caller replaces us with it. - let rest = util::replace(&mut nobe.ancestors, - AncestorList(None)); - (Some(rest), need_unwind) - } else { - (None, need_unwind) - } - } - } - } -} - -// One of these per task. -pub struct Taskgroup { - // List of tasks with whose fates this one's is intertwined. - priv tasks: TaskGroupArc, // 'none' means the group has failed. - // Lists of tasks who will kill us if they fail, but whom we won't kill. - priv ancestors: AncestorList, - priv notifier: Option<AutoNotify>, -} - -impl Drop for Taskgroup { - // Runs on task exit. - fn drop(&mut self) { - // If we are failing, the whole taskgroup needs to die. - do RuntimeGlue::with_task_handle_and_failing |me, failing| { - if failing { - for x in self.notifier.mut_iter() { - x.task_result = Some(Failure(~LinkedFailure as ~Any)); - } - // Take everybody down with us. After this point, every - // other task in the group will see 'tg' as none, which - // indicates the whole taskgroup is failing (and forbids - // new spawns from succeeding). - let tg = do access_group(&self.tasks) |tg| { tg.take() }; - // It's safe to send kill signals outside the lock, because - // we have a refcount on all kill-handles in the group. - kill_taskgroup(tg, me); - } else { - // Remove ourselves from the group(s). - do access_group(&self.tasks) |tg| { - leave_taskgroup(tg, me, true); - } - } - // It doesn't matter whether this happens before or after dealing - // with our own taskgroup, so long as both happen before we die. - // We remove ourself from every ancestor we can, so no cleanup; no - // break. - do each_ancestor(&mut self.ancestors, |_| {}) |ancestor_group| { - leave_taskgroup(ancestor_group, me, false); - true - }; - } - } -} - -pub fn Taskgroup(tasks: TaskGroupArc, - ancestors: AncestorList, - mut notifier: Option<AutoNotify>) -> Taskgroup { - for x in notifier.mut_iter() { - x.task_result = Some(Success); - } - - Taskgroup { - tasks: tasks, - ancestors: ancestors, - notifier: notifier - } -} - -struct AutoNotify { - notify_chan: Chan<UnwindResult>, - - // XXX: By value self drop would allow this to be a plain UnwindResult - task_result: Option<UnwindResult>, -} - -impl AutoNotify { - pub fn new(chan: Chan<UnwindResult>) -> AutoNotify { - AutoNotify { - notify_chan: chan, - - // Un-set above when taskgroup successfully made. - task_result: Some(Failure(~("AutoNotify::new()") as ~Any)) - } - } -} - -impl Drop for AutoNotify { - fn drop(&mut self) { - let result = self.task_result.take_unwrap(); - - self.notify_chan.send(result); - } -} - -fn enlist_in_taskgroup(state: TaskGroupInner, me: KillHandle, - is_member: bool) -> bool { - let me = Cell::new(me); // :( - // If 'None', the group was failing. Can't enlist. - do state.as_mut().map_default(false) |group| { - (if is_member { - &mut group.members - } else { - &mut group.descendants - }).insert(me.take()); - true - } -} - -// NB: Runs in destructor/post-exit context. Can't 'fail'. -fn leave_taskgroup(state: TaskGroupInner, me: &KillHandle, is_member: bool) { - let me = Cell::new(me); // :( - // If 'None', already failing and we've already gotten a kill signal. - do state.as_mut().map |group| { - (if is_member { - &mut group.members - } else { - &mut group.descendants - }).remove(me.take()); - }; -} - -// NB: Runs in destructor/post-exit context. Can't 'fail'. -fn kill_taskgroup(state: Option<TaskGroupData>, me: &KillHandle) { - // Might already be None, if somebody is failing simultaneously. - // That's ok; only one task needs to do the dirty work. (Might also - // see 'None' if somebody already failed and we got a kill signal.) - do state.map |TaskGroupData { members: members, descendants: descendants }| { - for sibling in members.move_iter() { - // Skip self - killing ourself won't do much good. - if &sibling != me { - RuntimeGlue::kill_task(sibling); - } - } - for child in descendants.move_iter() { - assert!(&child != me); - RuntimeGlue::kill_task(child); - } - }; - // (note: multiple tasks may reach this point) -} - -// FIXME (#2912): Work around core-vs-coretest function duplication. Can't use -// a proper closure because the #[test]s won't understand. Have to fake it. -fn taskgroup_key() -> local_data::Key<@@mut Taskgroup> { - unsafe { cast::transmute(-2) } -} - -// Transitionary. -struct RuntimeGlue; -impl RuntimeGlue { - fn kill_task(mut handle: KillHandle) { - do handle.kill().map |killed_task| { - let killed_task = Cell::new(killed_task); - do Local::borrow |sched: &mut Scheduler| { - sched.enqueue_task(killed_task.take()); - } - }; - } - - fn with_task_handle_and_failing(blk: |&KillHandle, bool|) { - assert!(in_green_task_context()); - unsafe { - // Can't use safe borrow, because the taskgroup destructor needs to - // access the scheduler again to send kill signals to other tasks. - let me: *mut Task = Local::unsafe_borrow(); - blk((*me).death.kill_handle.get_ref(), (*me).unwinder.unwinding) - } - } - - fn with_my_taskgroup<U>(blk: |&Taskgroup| -> U) -> U { - assert!(in_green_task_context()); - unsafe { - // Can't use safe borrow, because creating new hashmaps for the - // tasksets requires an rng, which needs to borrow the sched. - let me: *mut Task = Local::unsafe_borrow(); - blk(match (*me).taskgroup { - None => { - // First task in its (unlinked/unsupervised) taskgroup. - // Lazily initialize. - let mut members = TaskSet::new(); - let my_handle = (*me).death.kill_handle.get_ref().clone(); - members.insert(my_handle); - let tasks = Exclusive::new(Some(TaskGroupData { - members: members, - descendants: TaskSet::new(), - })); - let group = Taskgroup(tasks, AncestorList(None), None); - (*me).taskgroup = Some(group); - (*me).taskgroup.get_ref() - } - Some(ref group) => group, - }) - } - } -} - -// Returns 'None' in the case where the child's TG should be lazily initialized. -fn gen_child_taskgroup(linked: bool, supervised: bool) - -> Option<(TaskGroupArc, AncestorList)> { - if linked || supervised { - // with_my_taskgroup will lazily initialize the parent's taskgroup if - // it doesn't yet exist. We don't want to call it in the unlinked case. - do RuntimeGlue::with_my_taskgroup |spawner_group| { - let ancestors = AncestorList(spawner_group.ancestors.as_ref().map(|x| x.clone())); - if linked { - // Child is in the same group as spawner. - // Child's ancestors are spawner's ancestors. - Some((spawner_group.tasks.clone(), ancestors)) - } else { - // Child is in a separate group from spawner. - let g = Exclusive::new(Some(TaskGroupData { - members: TaskSet::new(), - descendants: TaskSet::new(), - })); - let a = if supervised { - let new_generation = incr_generation(&ancestors); - assert!(new_generation < uint::max_value); - // Child's ancestors start with the spawner. - // Build a new node in the ancestor list. - AncestorList(Some(Exclusive::new(AncestorNode { - generation: new_generation, - parent_group: spawner_group.tasks.clone(), - ancestors: ancestors, - }))) - } else { - // Child has no ancestors. - AncestorList(None) - }; - Some((g, a)) - } - } - } else { - None - } -} - -// Set up membership in taskgroup and descendantship in all ancestor -// groups. If any enlistment fails, Some task was already failing, so -// don't let the child task run, and undo every successful enlistment. -fn enlist_many(child: &KillHandle, child_arc: &TaskGroupArc, - ancestors: &mut AncestorList) -> bool { - // Join this taskgroup. - let mut result = do access_group(child_arc) |child_tg| { - enlist_in_taskgroup(child_tg, child.clone(), true) // member - }; - if result { - // Unwinding function in case any ancestral enlisting fails - let bail: |TaskGroupInner| = |tg| { leave_taskgroup(tg, child, false) }; - // Attempt to join every ancestor group. - result = do each_ancestor(ancestors, bail) |ancestor_tg| { - // Enlist as a descendant, not as an actual member. - // Descendants don't kill ancestor groups on failure. - enlist_in_taskgroup(ancestor_tg, child.clone(), false) - }; - // If any ancestor group fails, need to exit this group too. - if !result { - do access_group(child_arc) |child_tg| { - leave_taskgroup(child_tg, child, true); // member - } - } - } - result -} - pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { assert!(in_green_task_context()); - let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised)); - let indestructible = opts.indestructible; - - let child_wrapper: proc() = || { - // Child task runs this code. - - // If child data is 'None', the enlist is vacuously successful. - let enlist_success = do child_data.take().map_default(true) |child_data| { - let child_data = Cell::new(child_data); // :( - do Local::borrow |me: &mut Task| { - let (child_tg, ancestors) = child_data.take(); - let mut ancestors = ancestors; - let handle = me.death.kill_handle.get_ref(); - // Atomically try to get into all of our taskgroups. - if enlist_many(handle, &child_tg, &mut ancestors) { - // Got in. We can run the provided child body, and can also run - // the taskgroup's exit-time-destructor afterward. - me.taskgroup = Some(Taskgroup(child_tg, ancestors, None)); - true - } else { - false - } - } - }; - - // Should be run after the local-borrowed task is returned. - let f_cell = Cell::new(f); - if enlist_success { - if indestructible { - do unkillable { f_cell.take()() } - } else { - f_cell.take()() - } - } - }; - let mut task = if opts.sched.mode != SingleThreaded { if opts.watched { - Task::build_child(opts.stack_size, child_wrapper) + Task::build_child(opts.stack_size, f) } else { - Task::build_root(opts.stack_size, child_wrapper) + Task::build_root(opts.stack_size, f) } } else { unsafe { @@ -634,9 +127,9 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { // Pin the new task to the new scheduler let new_task = if opts.watched { - Task::build_homed_child(opts.stack_size, child_wrapper, Sched(new_sched_handle)) + Task::build_homed_child(opts.stack_size, f, Sched(new_sched_handle)) } else { - Task::build_homed_root(opts.stack_size, child_wrapper, Sched(new_sched_handle)) + Task::build_homed_root(opts.stack_size, f, Sched(new_sched_handle)) }; // Create a task that will later be used to join with the new scheduler @@ -711,7 +204,6 @@ fn test_spawn_raw_simple() { #[test] fn test_spawn_raw_unsupervise() { let opts = task::TaskOpts { - linked: false, watched: false, notify_chan: None, .. default_task_opts() @@ -740,7 +232,6 @@ fn test_spawn_raw_notify_failure() { let (notify_po, notify_ch) = comm::stream(); let opts = task::TaskOpts { - linked: false, watched: false, notify_chan: Some(notify_ch), .. default_task_opts() |
