diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-11-21 16:55:40 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-11-24 21:21:12 -0800 |
| commit | acca9e3834842ee8d8104abe9b8b9bb88861793c (patch) | |
| tree | 5b2ab145e0853d74661cae51726baceaca2ab884 /src/libstd/task | |
| parent | 85a1eff3a99c5d60bf34ca6f08f87ed341329af0 (diff) | |
| download | rust-acca9e3834842ee8d8104abe9b8b9bb88861793c.tar.gz rust-acca9e3834842ee8d8104abe9b8b9bb88861793c.zip | |
Remove linked failure from the runtime
The reasons for doing this are: * The model on which linked failure is based is inherently complex * The implementation is also very complex, and there are few remaining who fully understand the implementation * There are existing race conditions in the core context switching function of the scheduler, and possibly others. * It's unclear whether this model of linked failure maps well to a 1:1 threading model Linked failure is often a desired aspect of tasks, but we would like to take a much more conservative approach in re-implementing linked failure if at all. Closes #8674 Closes #8318 Closes #8863
Diffstat (limited to 'src/libstd/task')
| -rw-r--r-- | src/libstd/task/mod.rs | 649 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 531 |
2 files changed, 18 insertions, 1162 deletions
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f9b918d6d12..485fe9edf0e 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -62,16 +62,12 @@ use rt::in_green_task_context; use rt::local::Local; use rt::task::{UnwindResult, Success, Failure}; use send_str::{SendStr, IntoSendStr}; -use unstable::finally::Finally; use util; #[cfg(test)] use any::Any; -#[cfg(test)] use cast; #[cfg(test)] use comm::SharedChan; -#[cfg(test)] use comm; #[cfg(test)] use ptr; #[cfg(test)] use result; -#[cfg(test)] use task; pub mod spawn; @@ -86,8 +82,6 @@ pub mod spawn; /// children tasks complete, recommend using a result future. pub type TaskResult = Result<(), ~Any>; -pub struct LinkedFailure; - pub struct TaskResultPort { priv port: Port<UnwindResult> } @@ -141,24 +135,11 @@ pub struct SchedOpts { * * # Fields * - * * linked - Propagate failure bidirectionally between child and parent. - * True by default. If both this and 'supervised' are false, then - * either task's failure will not affect the other ("unlinked"). - * - * * supervised - Propagate failure unidirectionally from parent to child, - * but not from child to parent. False by default. - * * * watched - Make parent task collect exit status notifications from child * before reporting its own exit status. (This delays the parent * task's death and cleanup until after all transitively watched * children also exit.) True by default. * - * * indestructible - Configures the task to ignore kill signals received from - * linked failure. This may cause process hangs during - * failure if not used carefully, but causes task blocking - * code paths (e.g. port recv() calls) to be faster by 2 - * atomic operations. False by default. - * * * notify_chan - Enable lifecycle notifications on the given channel * * * name - A name for the task-to-be, for identification in failure messages. @@ -169,10 +150,7 @@ pub struct SchedOpts { * scheduler other tasks will be impeded or even blocked indefinitely. */ pub struct TaskOpts { - priv linked: bool, - priv supervised: bool, priv watched: bool, - priv indestructible: bool, priv notify_chan: Option<Chan<UnwindResult>>, name: Option<SendStr>, sched: SchedOpts, @@ -191,13 +169,10 @@ pub struct TaskOpts { // when you try to reuse the builder to spawn a new task. We'll just // sidestep that whole issue by making builders uncopyable and making // the run function move them in. - -// FIXME (#3724): Replace the 'consumed' bit with move mode on self pub struct TaskBuilder { opts: TaskOpts, priv gen_body: Option<proc(v: proc()) -> proc()>, priv can_not_copy: Option<util::NonCopyable>, - priv consumed: bool, } /** @@ -210,25 +185,17 @@ pub fn task() -> TaskBuilder { opts: default_task_opts(), gen_body: None, can_not_copy: None, - consumed: false, } } impl TaskBuilder { - fn consume(&mut self) -> TaskBuilder { - if self.consumed { - fail!("Cannot copy a task_builder"); // Fake move mode on self - } - self.consumed = true; + fn consume(mut self) -> TaskBuilder { let gen_body = self.gen_body.take(); let notify_chan = self.opts.notify_chan.take(); let name = self.opts.name.take(); TaskBuilder { opts: TaskOpts { - linked: self.opts.linked, - supervised: self.opts.supervised, watched: self.opts.watched, - indestructible: self.opts.indestructible, notify_chan: notify_chan, name: name, sched: self.opts.sched, @@ -236,34 +203,9 @@ impl TaskBuilder { }, gen_body: gen_body, can_not_copy: None, - consumed: false } } - /// Decouple the child task's failure from the parent's. If either fails, - /// the other will not be killed. - pub fn unlinked(&mut self) { - self.opts.linked = false; - self.opts.watched = false; - } - - /// Unidirectionally link the child task's failure with the parent's. The - /// child's failure will not kill the parent, but the parent's will kill - /// the child. - pub fn supervised(&mut self) { - self.opts.supervised = true; - self.opts.linked = false; - self.opts.watched = false; - } - - /// Link the child task's and parent task's failures. If either fails, the - /// other will be killed. - pub fn linked(&mut self) { - self.opts.linked = true; - self.opts.supervised = false; - self.opts.watched = true; - } - /// Cause the parent task to collect the child's exit status (and that of /// all transitively-watched grandchildren) before reporting its own. pub fn watched(&mut self) { @@ -276,13 +218,6 @@ impl TaskBuilder { self.opts.watched = false; } - /// Cause the child task to ignore any kill signals received from linked - /// failure. This optimizes context switching, at the possible expense of - /// process hangs in the case of unexpected failure. - pub fn indestructible(&mut self) { - self.opts.indestructible = true; - } - /// Get a future representing the exit status of the task. /// /// Taking the value of the future will block until the child task @@ -372,16 +307,13 @@ impl TaskBuilder { * When spawning into a new scheduler, the number of threads requested * must be greater than zero. */ - pub fn spawn(&mut self, f: proc()) { + pub fn spawn(mut self, f: proc()) { let gen_body = self.gen_body.take(); let notify_chan = self.opts.notify_chan.take(); let name = self.opts.name.take(); let x = self.consume(); let opts = TaskOpts { - linked: x.opts.linked, - supervised: x.opts.supervised, watched: x.opts.watched, - indestructible: x.opts.indestructible, notify_chan: notify_chan, name: name, sched: x.opts.sched, @@ -398,14 +330,6 @@ impl TaskBuilder { spawn::spawn_raw(opts, f); } - /// Runs a task, while transferring ownership of one argument to the child. - pub fn spawn_with<A:Send>(&mut self, arg: A, f: proc(v: A)) { - let arg = Cell::new(arg); - do self.spawn { - f(arg.take()); - } - } - /** * Execute a function in another task and return either the return value * of the function or result::err. @@ -419,7 +343,7 @@ impl TaskBuilder { * # Failure * Fails if a future_result was already set for this task. */ - pub fn try<T:Send>(&mut self, f: proc() -> T) -> Result<T, ~Any> { + pub fn try<T:Send>(mut self, f: proc() -> T) -> Result<T, ~Any> { let (po, ch) = stream::<T>(); let result = self.future_result(); @@ -447,10 +371,7 @@ pub fn default_task_opts() -> TaskOpts { */ TaskOpts { - linked: true, - supervised: false, watched: true, - indestructible: false, notify_chan: None, name: None, sched: SchedOpts { @@ -469,56 +390,10 @@ pub fn default_task_opts() -> TaskOpts { /// /// This function is equivalent to `task().spawn(f)`. pub fn spawn(f: proc()) { - let mut task = task(); + let task = task(); task.spawn(f) } -/// Creates a child task unlinked from the current one. If either this -/// task or the child task fails, the other will not be killed. -pub fn spawn_unlinked(f: proc()) { - let mut task = task(); - task.unlinked(); - task.spawn(f) -} - -pub fn spawn_supervised(f: proc()) { - /*! - * Creates a child task supervised by the current one. If the child - * task fails, the parent will not be killed, but if the parent fails, - * the child will be killed. - */ - - let mut task = task(); - task.supervised(); - task.spawn(f) -} - -/// Creates a child task that cannot be killed by linked failure. This causes -/// its context-switch path to be faster by 2 atomic swap operations. -/// (Note that this convenience wrapper still uses linked-failure, so the -/// child's children will still be killable by the parent. For the fastest -/// possible spawn mode, use task::task().unlinked().indestructible().spawn.) -pub fn spawn_indestructible(f: proc()) { - let mut task = task(); - task.indestructible(); - task.spawn(f) -} - -pub fn spawn_with<A:Send>(arg: A, f: proc(v: A)) { - /*! - * Runs a task, while transferring ownership of one argument to the - * child. - * - * This is useful for transferring ownership of noncopyables to - * another task. - * - * This function is equivalent to `task().spawn_with(arg, f)`. - */ - - let mut task = task(); - task.spawn_with(arg, f) -} - pub fn spawn_sched(mode: SchedMode, f: proc()) { /*! * Creates a new task on a new or existing scheduler. @@ -545,8 +420,7 @@ pub fn try<T:Send>(f: proc() -> T) -> Result<T, ~Any> { * This is equivalent to task().supervised().try. */ - let mut task = task(); - task.supervised(); + let task = task(); task.try(f) } @@ -590,159 +464,6 @@ pub fn failing() -> bool { } } -/** - * Temporarily make the task unkillable - * - * # Example - * - * ``` - * do task::unkillable { - * // detach / deschedule / destroy must all be called together - * rustrt::rust_port_detach(po); - * // This must not result in the current task being killed - * task::deschedule(); - * rustrt::rust_port_destroy(po); - * } - * ``` - */ -pub fn unkillable<U>(f: || -> U) -> U { - use rt::task::Task; - - unsafe { - if in_green_task_context() { - // The inhibits/allows might fail and need to borrow the task. - let t: *mut Task = Local::unsafe_borrow(); - do (|| { - (*t).death.inhibit_kill((*t).unwinder.unwinding); - f() - }).finally { - (*t).death.allow_kill((*t).unwinder.unwinding); - } - } else { - // FIXME(#3095): This should be an rtabort as soon as the scheduler - // no longer uses a workqueue implemented with an Exclusive. - f() - } - } -} - -/** - * Makes killable a task marked as unkillable. This - * is meant to be used only nested in unkillable. - * - * # Example - * - * ``` - * do task::unkillable { - * do task::rekillable { - * // Task is killable - * } - * // Task is unkillable again - * } - */ -pub fn rekillable<U>(f: || -> U) -> U { - use rt::task::Task; - - unsafe { - if in_green_task_context() { - let t: *mut Task = Local::unsafe_borrow(); - do (|| { - (*t).death.allow_kill((*t).unwinder.unwinding); - f() - }).finally { - (*t).death.inhibit_kill((*t).unwinder.unwinding); - } - } else { - // FIXME(#3095): As in unkillable(). - f() - } - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_kill_unkillable_task() { - use rt::test::*; - - // Attempt to test that when a kill signal is received at the start of an - // unkillable section, 'unkillable' unwinds correctly. This is actually - // quite a difficult race to expose, as the kill has to happen on a second - // CPU, *after* the spawner is already switched-back-to (and passes the - // killed check at the start of its timeslice). As far as I know, it's not - // possible to make this race deterministic, or even more likely to happen. - do run_in_uv_task { - do task::try { - do task::spawn { - fail!(); - } - do task::unkillable { } - }; - } -} - -#[test] -#[ignore(cfg(windows))] -fn test_kill_rekillable_task() { - use rt::test::*; - - // Tests that when a kill signal is received, 'rekillable' and - // 'unkillable' unwind correctly in conjunction with each other. - do run_in_uv_task { - do task::try { - do task::unkillable { - do task::rekillable { - do task::spawn { - fail!(); - } - } - } - }; - } -} - -#[test] -#[should_fail] -#[ignore(cfg(windows))] -fn test_rekillable_not_nested() { - do rekillable { - // This should fail before - // receiving anything since - // this block should be nested - // into a unkillable block. - deschedule(); - } -} - - -#[test] -#[ignore(cfg(windows))] -fn test_rekillable_nested_failure() { - - let result = do task::try { - do unkillable { - do rekillable { - let (port,chan) = comm::stream(); - do task::spawn { chan.send(()); fail!(); } - port.recv(); // wait for child to exist - port.recv(); // block forever, expect to get killed. - } - } - }; - assert!(result.is_err()); -} - - -#[test] #[should_fail] #[ignore(cfg(windows))] -fn test_cant_dup_task_builder() { - let mut builder = task(); - builder.unlinked(); - do builder.spawn {} - // FIXME(#3724): For now, this is a -runtime- failure, because we haven't - // got move mode on self. When 3724 is fixed, this test should fail to - // compile instead, and should go in tests/compile-fail. - do builder.spawn {} // b should have been consumed by the previous call -} - // The following 8 tests test the following 2^3 combinations: // {un,}linked {un,}supervised failure propagation {up,down}wards. @@ -752,207 +473,6 @@ fn test_cant_dup_task_builder() { #[cfg(test)] fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - use rt::test::run_in_uv_task; - do run_in_uv_task { - let (po, ch) = stream(); - let ch = SharedChan::new(ch); - do spawn_unlinked { - let ch = ch.clone(); - do spawn_unlinked { - // Give middle task a chance to fail-but-not-kill-us. - do 16.times { task::deschedule(); } - ch.send(()); // If killed first, grandparent hangs. - } - fail!(); // Shouldn't kill either (grand)parent or (grand)child. - } - po.recv(); - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - do spawn_unlinked { fail!(); } - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - do spawn_supervised { fail!(); } - // Give child a chance to fail-but-not-kill-us. - do 16.times { task::deschedule(); } - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_unlinked_sup_fail_down() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - do spawn_supervised { block_forever(); } - fail!(); // Shouldn't leave a child hanging around. - }; - assert!(result.is_err()); - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Unidirectional "parenting" shouldn't override bidirectional linked. - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - - do b0.spawn { - fail!(); - } - block_forever(); // We should get punted awake - }; - assert!(result.is_err()); - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_sup_fail_down() { // parent fails; child fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // We have to cheat with opts - the interface doesn't support them because - // they don't make sense (redundant with task().supervised()). - let mut b0 = task(); - b0.opts.linked = true; - b0.opts.supervised = true; - do b0.spawn { block_forever(); } - fail!(); // *both* mechanisms would be wrong if this didn't kill the child - }; - assert!(result.is_err()); - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Default options are to spawn linked & unsupervised. - do spawn { fail!(); } - block_forever(); // We should get punted awake - }; - assert!(result.is_err()); - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Default options are to spawn linked & unsupervised. - do spawn { block_forever(); } - fail!(); - }; - assert!(result.is_err()); - } -} -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Make sure the above test is the same as this one. - let mut builder = task(); - builder.linked(); - do builder.spawn { block_forever(); } - fail!(); - }; - assert!(result.is_err()); - } -} - -// A couple bonus linked failure tests - testing for failure propagation even -// when the middle task exits successfully early before kill signals are sent. - -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_failure_propagate_grandchild() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Middle task exits; does grandparent's failure propagate across the gap? - do spawn_supervised { - do spawn_supervised { block_forever(); } - } - do 16.times { task::deschedule(); } - fail!(); - }; - assert!(result.is_err()); - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_failure_propagate_secondborn() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // First-born child exits; does parent's failure propagate to sibling? - do spawn_supervised { - do spawn { block_forever(); } // linked - } - do 16.times { task::deschedule(); } - fail!(); - }; - assert!(result.is_err()); - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_failure_propagate_nephew_or_niece() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Our sibling exits; does our failure propagate to sibling's child? - do spawn { // linked - do spawn_supervised { block_forever(); } - } - do 16.times { task::deschedule(); } - fail!(); - }; - assert!(result.is_err()); - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_linked_sup_propagate_sibling() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result: Result<(), ~Any> = do try { - // Middle sibling exits - does eldest's failure propagate to youngest? - do spawn { // linked - do spawn { block_forever(); } // linked - } - do 16.times { task::deschedule(); } - fail!(); - }; - assert!(result.is_err()); - } -} - #[test] fn test_unnamed_task() { use rt::test::run_in_uv_task; @@ -1014,7 +534,7 @@ fn test_send_named_task() { #[test] fn test_run_basic() { let (po, ch) = stream::<()>(); - let mut builder = task(); + let builder = task(); do builder.spawn { ch.send(()); } @@ -1053,7 +573,6 @@ fn test_future_result() { let mut builder = task(); let result = builder.future_result(); - builder.unlinked(); do builder.spawn { fail!(); } @@ -1224,7 +743,7 @@ fn test_avoid_copying_the_body_spawn() { #[test] fn test_avoid_copying_the_body_task_spawn() { do avoid_copying_the_body |f| { - let mut builder = task(); + let builder = task(); do builder.spawn || { f(); } @@ -1241,86 +760,6 @@ fn test_avoid_copying_the_body_try() { } #[test] -fn test_avoid_copying_the_body_unlinked() { - do avoid_copying_the_body |f| { - do spawn_unlinked || { - f(); - } - } -} - -#[ignore(reason = "linked failure")] -#[test] -#[should_fail] -fn test_unkillable() { - let (po, ch) = stream(); - - // We want to do this after failing - do spawn_unlinked { - do 10.times { deschedule() } - ch.send(()); - } - - do spawn { - deschedule(); - // We want to fail after the unkillable task - // blocks on recv - fail!(); - } - - unsafe { - do unkillable { - let p = ~0; - let pp: *uint = cast::transmute(p); - - // If we are killed here then the box will leak - po.recv(); - - let _p: ~int = cast::transmute(pp); - } - } - - // Now we can be killed - po.recv(); -} - -#[ignore(reason = "linked failure")] -#[test] -#[should_fail] -fn test_unkillable_nested() { - let (po, ch) = comm::stream(); - - // We want to do this after failing - do spawn_unlinked || { - do 10.times { deschedule() } - ch.send(()); - } - - do spawn { - deschedule(); - // We want to fail after the unkillable task - // blocks on recv - fail!(); - } - - unsafe { - do unkillable { - do unkillable {} // Here's the difference from the previous test. - let p = ~0; - let pp: *uint = cast::transmute(p); - - // If we are killed here then the box will leak - po.recv(); - - let _p: ~int = cast::transmute(pp); - } - } - - // Now we can be killed - po.recv(); -} - -#[test] fn test_child_doesnt_ref_parent() { // If the child refcounts the parent task, this will stack overflow when // climbing the task tree to dereference each ancestor. (See #1789) @@ -1350,67 +789,6 @@ fn test_simple_newsched_spawn() { } } -#[ignore(reason = "linked failure")] -#[test] -fn test_spawn_watched() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result = do try { - let mut t = task(); - t.unlinked(); - t.watched(); - do t.spawn { - let mut t = task(); - t.unlinked(); - t.watched(); - do t.spawn { - task::deschedule(); - fail!(); - } - } - }; - assert!(result.is_err()); - } -} - -#[ignore(reason = "linked failure")] -#[test] -fn test_indestructible() { - use rt::test::run_in_uv_task; - do run_in_uv_task { - let result = do try { - let mut t = task(); - t.watched(); - t.supervised(); - t.indestructible(); - do t.spawn { - let (p1, _c1) = stream::<()>(); - let (p2, c2) = stream::<()>(); - let (p3, c3) = stream::<()>(); - let mut t = task(); - t.unwatched(); - do t.spawn { - do (|| { - p1.recv(); // would deadlock if not killed - }).finally { - c2.send(()); - }; - } - let mut t = task(); - t.unwatched(); - do t.spawn { - p3.recv(); - task::deschedule(); - fail!(); - } - c3.send(()); - p2.recv(); - } - }; - assert!(result.is_ok()); - } -} - #[test] fn test_try_fail_message_static_str() { match do try { @@ -1455,19 +833,6 @@ fn test_try_fail_message_any() { } } -#[ignore(reason = "linked failure")] -#[test] -fn test_try_fail_message_linked() { - match do try { - do spawn { - fail!() - } - } { - Err(ref e) if e.is::<LinkedFailure>() => {} - Err(_) | Ok(()) => fail!() - } -} - #[test] fn test_try_fail_message_unit_struct() { struct Juju; diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 66a2e8cc5e0..578839d4542 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -9,6 +9,10 @@ // except according to those terms. /*!************************************************************************** + * + * WARNING: linked failure has been removed since this doc comment was written, + * but it was so pretty that I didn't want to remove it. + * * Spawning & linked failure * * Several data structures are involved in task management to allow properly @@ -73,541 +77,30 @@ use prelude::*; -use cast::transmute; -use cast; use cell::Cell; -use comm::{Chan, GenericChan, oneshot}; -use container::MutableMap; -use hashmap::{HashSet, HashSetMoveIterator}; -use local_data; +use comm::{GenericChan, oneshot}; use rt::local::Local; use rt::sched::{Scheduler, Shutdown, TaskFromFriend}; use rt::task::{Task, Sched}; -use rt::task::{UnwindResult, Success, Failure}; +use rt::task::UnwindResult; use rt::thread::Thread; use rt::work_queue::WorkQueue; -use rt::{in_green_task_context, new_event_loop, KillHandle}; -use task::LinkedFailure; +use rt::{in_green_task_context, new_event_loop}; use task::SingleThreaded; use task::TaskOpts; -use task::unkillable; -use uint; -use unstable::sync::Exclusive; -use util; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; #[cfg(test)] use task; -struct TaskSet(HashSet<KillHandle>); - -impl TaskSet { - #[inline] - fn new() -> TaskSet { - TaskSet(HashSet::new()) - } - #[inline] - fn insert(&mut self, task: KillHandle) { - let didnt_overwrite = (**self).insert(task); - assert!(didnt_overwrite); - } - #[inline] - fn remove(&mut self, task: &KillHandle) { - let was_present = (**self).remove(task); - assert!(was_present); - } - #[inline] - fn move_iter(self) -> HashSetMoveIterator<KillHandle> { - (*self).move_iter() - } -} - -// One of these per group of linked-failure tasks. -struct TaskGroupData { - // All tasks which might kill this group. When this is empty, the group - // can be "GC"ed (i.e., its link in the ancestor list can be removed). - members: TaskSet, - // All tasks unidirectionally supervised by (directly or transitively) - // tasks in this group. - descendants: TaskSet, -} -type TaskGroupArc = Exclusive<Option<TaskGroupData>>; - -type TaskGroupInner<'self> = &'self mut Option<TaskGroupData>; - -// A taskgroup is 'dead' when nothing can cause it to fail; only members can. -fn taskgroup_is_dead(tg: &TaskGroupData) -> bool { - tg.members.is_empty() -} - -// A list-like structure by which taskgroups keep track of all ancestor groups -// which may kill them. Needed for tasks to be able to remove themselves from -// ancestor groups upon exit. The list has a node for each "generation", and -// ends either at the root taskgroup (which has no ancestors) or at a -// taskgroup which was spawned-unlinked. Tasks from intermediate generations -// have references to the middle of the list; when intermediate generations -// die, their node in the list will be collected at a descendant's spawn-time. -struct AncestorNode { - // Since the ancestor list is recursive, we end up with references to - // exclusives within other exclusives. This is dangerous business (if - // circular references arise, deadlock and memory leaks are imminent). - // Hence we assert that this counter monotonically decreases as we - // approach the tail of the list. - generation: uint, - // Handle to the tasks in the group of the current generation. - parent_group: TaskGroupArc, - // Recursive rest of the list. - ancestors: AncestorList, -} - -struct AncestorList(Option<Exclusive<AncestorNode>>); - -// Accessors for taskgroup arcs and ancestor arcs that wrap the unsafety. -#[inline] -fn access_group<U>(x: &TaskGroupArc, blk: |TaskGroupInner| -> U) -> U { - unsafe { - x.with(blk) - } -} - -#[inline] -fn access_ancestors<U>( - x: &Exclusive<AncestorNode>, - blk: |x: &mut AncestorNode| -> U) - -> U { - unsafe { - x.with(blk) - } -} - -#[inline] #[cfg(test)] -fn check_generation(younger: uint, older: uint) { assert!(younger > older); } -#[inline] #[cfg(not(test))] -fn check_generation(_younger: uint, _older: uint) { } - -#[inline] #[cfg(test)] -fn incr_generation(ancestors: &AncestorList) -> uint { - ancestors.as_ref().map_default(0, |arc| access_ancestors(arc, |a| a.generation+1)) -} -#[inline] #[cfg(not(test))] -fn incr_generation(_ancestors: &AncestorList) -> uint { 0 } - -// Iterates over an ancestor list. -// (1) Runs forward_blk on each ancestral taskgroup in the list -// (2) If forward_blk "break"s, runs optional bail_blk on all ancestral -// taskgroups that forward_blk already ran on successfully (Note: bail_blk -// is NOT called on the block that forward_blk broke on!). -// (3) As a bonus, coalesces away all 'dead' taskgroup nodes in the list. -fn each_ancestor(list: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool) - -> bool { - // "Kickoff" call - there was no last generation. - return !coalesce(list, bail_blk, forward_blk, uint::max_value); - - // Recursively iterates, and coalesces afterwards if needed. Returns - // whether or not unwinding is needed (i.e., !successful iteration). - fn coalesce(list: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool, - last_generation: uint) -> bool { - let (coalesce_this, early_break) = - iterate(list, bail_blk, forward_blk, last_generation); - // What should our next ancestor end up being? - if coalesce_this.is_some() { - // Needed coalesce. Our next ancestor becomes our old - // ancestor's next ancestor. ("next = old_next->next;") - *list = coalesce_this.unwrap(); - } - return early_break; - } - - // Returns an optional list-to-coalesce and whether unwinding is needed. - // Option<ancestor_list>: - // Whether or not the ancestor taskgroup being iterated over is - // dead or not; i.e., it has no more tasks left in it, whether or not - // it has descendants. If dead, the caller shall coalesce it away. - // bool: - // True if the supplied block did 'break', here or in any recursive - // calls. If so, must call the unwinder on all previous nodes. - fn iterate(ancestors: &mut AncestorList, - bail_blk: |TaskGroupInner|, - forward_blk: |TaskGroupInner| -> bool, - last_generation: uint) - -> (Option<AncestorList>, bool) { - // At each step of iteration, three booleans are at play which govern - // how the iteration should behave. - // 'nobe_is_dead' - Should the list should be coalesced at this point? - // Largely unrelated to the other two. - // 'need_unwind' - Should we run the bail_blk at this point? (i.e., - // do_continue was false not here, but down the line) - // 'do_continue' - Did the forward_blk succeed at this point? (i.e., - // should we recurse? or should our callers unwind?) - - let forward_blk = Cell::new(forward_blk); - - // The map defaults to None, because if ancestors is None, we're at - // the end of the list, which doesn't make sense to coalesce. - do ancestors.as_ref().map_default((None,false)) |ancestor_arc| { - // NB: Takes a lock! (this ancestor node) - do access_ancestors(ancestor_arc) |nobe| { - // Argh, but we couldn't give it to coalesce() otherwise. - let forward_blk = forward_blk.take(); - // Check monotonicity - check_generation(last_generation, nobe.generation); - /*##########################################################* - * Step 1: Look at this ancestor group (call iterator block). - *##########################################################*/ - let mut nobe_is_dead = false; - let do_continue = - // NB: Takes a lock! (this ancestor node's parent group) - do access_group(&nobe.parent_group) |tg_opt| { - // Decide whether this group is dead. Note that the - // group being *dead* is disjoint from it *failing*. - nobe_is_dead = match *tg_opt { - Some(ref tg) => taskgroup_is_dead(tg), - None => nobe_is_dead - }; - // Call iterator block. (If the group is dead, it's - // safe to skip it. This will leave our KillHandle - // hanging around in the group even after it's freed, - // but that's ok because, by virtue of the group being - // dead, nobody will ever kill-all (for) over it.) - if nobe_is_dead { true } else { forward_blk(tg_opt) } - }; - /*##########################################################* - * Step 2: Recurse on the rest of the list; maybe coalescing. - *##########################################################*/ - // 'need_unwind' is only set if blk returned true above, *and* - // the recursive call early-broke. - let mut need_unwind = false; - if do_continue { - // NB: Takes many locks! (ancestor nodes & parent groups) - need_unwind = coalesce(&mut nobe.ancestors, |tg| bail_blk(tg), - forward_blk, nobe.generation); - } - /*##########################################################* - * Step 3: Maybe unwind; compute return info for our caller. - *##########################################################*/ - if need_unwind && !nobe_is_dead { - do access_group(&nobe.parent_group) |tg_opt| { - bail_blk(tg_opt) - } - } - // Decide whether our caller should unwind. - need_unwind = need_unwind || !do_continue; - // Tell caller whether or not to coalesce and/or unwind - if nobe_is_dead { - // Swap the list out here; the caller replaces us with it. - let rest = util::replace(&mut nobe.ancestors, - AncestorList(None)); - (Some(rest), need_unwind) - } else { - (None, need_unwind) - } - } - } - } -} - -// One of these per task. -pub struct Taskgroup { - // List of tasks with whose fates this one's is intertwined. - priv tasks: TaskGroupArc, // 'none' means the group has failed. - // Lists of tasks who will kill us if they fail, but whom we won't kill. - priv ancestors: AncestorList, - priv notifier: Option<AutoNotify>, -} - -impl Drop for Taskgroup { - // Runs on task exit. - fn drop(&mut self) { - // If we are failing, the whole taskgroup needs to die. - do RuntimeGlue::with_task_handle_and_failing |me, failing| { - if failing { - for x in self.notifier.mut_iter() { - x.task_result = Some(Failure(~LinkedFailure as ~Any)); - } - // Take everybody down with us. After this point, every - // other task in the group will see 'tg' as none, which - // indicates the whole taskgroup is failing (and forbids - // new spawns from succeeding). - let tg = do access_group(&self.tasks) |tg| { tg.take() }; - // It's safe to send kill signals outside the lock, because - // we have a refcount on all kill-handles in the group. - kill_taskgroup(tg, me); - } else { - // Remove ourselves from the group(s). - do access_group(&self.tasks) |tg| { - leave_taskgroup(tg, me, true); - } - } - // It doesn't matter whether this happens before or after dealing - // with our own taskgroup, so long as both happen before we die. - // We remove ourself from every ancestor we can, so no cleanup; no - // break. - do each_ancestor(&mut self.ancestors, |_| {}) |ancestor_group| { - leave_taskgroup(ancestor_group, me, false); - true - }; - } - } -} - -pub fn Taskgroup(tasks: TaskGroupArc, - ancestors: AncestorList, - mut notifier: Option<AutoNotify>) -> Taskgroup { - for x in notifier.mut_iter() { - x.task_result = Some(Success); - } - - Taskgroup { - tasks: tasks, - ancestors: ancestors, - notifier: notifier - } -} - -struct AutoNotify { - notify_chan: Chan<UnwindResult>, - - // XXX: By value self drop would allow this to be a plain UnwindResult - task_result: Option<UnwindResult>, -} - -impl AutoNotify { - pub fn new(chan: Chan<UnwindResult>) -> AutoNotify { - AutoNotify { - notify_chan: chan, - - // Un-set above when taskgroup successfully made. - task_result: Some(Failure(~("AutoNotify::new()") as ~Any)) - } - } -} - -impl Drop for AutoNotify { - fn drop(&mut self) { - let result = self.task_result.take_unwrap(); - - self.notify_chan.send(result); - } -} - -fn enlist_in_taskgroup(state: TaskGroupInner, me: KillHandle, - is_member: bool) -> bool { - let me = Cell::new(me); // :( - // If 'None', the group was failing. Can't enlist. - do state.as_mut().map_default(false) |group| { - (if is_member { - &mut group.members - } else { - &mut group.descendants - }).insert(me.take()); - true - } -} - -// NB: Runs in destructor/post-exit context. Can't 'fail'. -fn leave_taskgroup(state: TaskGroupInner, me: &KillHandle, is_member: bool) { - let me = Cell::new(me); // :( - // If 'None', already failing and we've already gotten a kill signal. - do state.as_mut().map |group| { - (if is_member { - &mut group.members - } else { - &mut group.descendants - }).remove(me.take()); - }; -} - -// NB: Runs in destructor/post-exit context. Can't 'fail'. -fn kill_taskgroup(state: Option<TaskGroupData>, me: &KillHandle) { - // Might already be None, if somebody is failing simultaneously. - // That's ok; only one task needs to do the dirty work. (Might also - // see 'None' if somebody already failed and we got a kill signal.) - do state.map |TaskGroupData { members: members, descendants: descendants }| { - for sibling in members.move_iter() { - // Skip self - killing ourself won't do much good. - if &sibling != me { - RuntimeGlue::kill_task(sibling); - } - } - for child in descendants.move_iter() { - assert!(&child != me); - RuntimeGlue::kill_task(child); - } - }; - // (note: multiple tasks may reach this point) -} - -// FIXME (#2912): Work around core-vs-coretest function duplication. Can't use -// a proper closure because the #[test]s won't understand. Have to fake it. -fn taskgroup_key() -> local_data::Key<@@mut Taskgroup> { - unsafe { cast::transmute(-2) } -} - -// Transitionary. -struct RuntimeGlue; -impl RuntimeGlue { - fn kill_task(mut handle: KillHandle) { - do handle.kill().map |killed_task| { - let killed_task = Cell::new(killed_task); - do Local::borrow |sched: &mut Scheduler| { - sched.enqueue_task(killed_task.take()); - } - }; - } - - fn with_task_handle_and_failing(blk: |&KillHandle, bool|) { - assert!(in_green_task_context()); - unsafe { - // Can't use safe borrow, because the taskgroup destructor needs to - // access the scheduler again to send kill signals to other tasks. - let me: *mut Task = Local::unsafe_borrow(); - blk((*me).death.kill_handle.get_ref(), (*me).unwinder.unwinding) - } - } - - fn with_my_taskgroup<U>(blk: |&Taskgroup| -> U) -> U { - assert!(in_green_task_context()); - unsafe { - // Can't use safe borrow, because creating new hashmaps for the - // tasksets requires an rng, which needs to borrow the sched. - let me: *mut Task = Local::unsafe_borrow(); - blk(match (*me).taskgroup { - None => { - // First task in its (unlinked/unsupervised) taskgroup. - // Lazily initialize. - let mut members = TaskSet::new(); - let my_handle = (*me).death.kill_handle.get_ref().clone(); - members.insert(my_handle); - let tasks = Exclusive::new(Some(TaskGroupData { - members: members, - descendants: TaskSet::new(), - })); - let group = Taskgroup(tasks, AncestorList(None), None); - (*me).taskgroup = Some(group); - (*me).taskgroup.get_ref() - } - Some(ref group) => group, - }) - } - } -} - -// Returns 'None' in the case where the child's TG should be lazily initialized. -fn gen_child_taskgroup(linked: bool, supervised: bool) - -> Option<(TaskGroupArc, AncestorList)> { - if linked || supervised { - // with_my_taskgroup will lazily initialize the parent's taskgroup if - // it doesn't yet exist. We don't want to call it in the unlinked case. - do RuntimeGlue::with_my_taskgroup |spawner_group| { - let ancestors = AncestorList(spawner_group.ancestors.as_ref().map(|x| x.clone())); - if linked { - // Child is in the same group as spawner. - // Child's ancestors are spawner's ancestors. - Some((spawner_group.tasks.clone(), ancestors)) - } else { - // Child is in a separate group from spawner. - let g = Exclusive::new(Some(TaskGroupData { - members: TaskSet::new(), - descendants: TaskSet::new(), - })); - let a = if supervised { - let new_generation = incr_generation(&ancestors); - assert!(new_generation < uint::max_value); - // Child's ancestors start with the spawner. - // Build a new node in the ancestor list. - AncestorList(Some(Exclusive::new(AncestorNode { - generation: new_generation, - parent_group: spawner_group.tasks.clone(), - ancestors: ancestors, - }))) - } else { - // Child has no ancestors. - AncestorList(None) - }; - Some((g, a)) - } - } - } else { - None - } -} - -// Set up membership in taskgroup and descendantship in all ancestor -// groups. If any enlistment fails, Some task was already failing, so -// don't let the child task run, and undo every successful enlistment. -fn enlist_many(child: &KillHandle, child_arc: &TaskGroupArc, - ancestors: &mut AncestorList) -> bool { - // Join this taskgroup. - let mut result = do access_group(child_arc) |child_tg| { - enlist_in_taskgroup(child_tg, child.clone(), true) // member - }; - if result { - // Unwinding function in case any ancestral enlisting fails - let bail: |TaskGroupInner| = |tg| { leave_taskgroup(tg, child, false) }; - // Attempt to join every ancestor group. - result = do each_ancestor(ancestors, bail) |ancestor_tg| { - // Enlist as a descendant, not as an actual member. - // Descendants don't kill ancestor groups on failure. - enlist_in_taskgroup(ancestor_tg, child.clone(), false) - }; - // If any ancestor group fails, need to exit this group too. - if !result { - do access_group(child_arc) |child_tg| { - leave_taskgroup(child_tg, child, true); // member - } - } - } - result -} - pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { assert!(in_green_task_context()); - let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised)); - let indestructible = opts.indestructible; - - let child_wrapper: proc() = || { - // Child task runs this code. - - // If child data is 'None', the enlist is vacuously successful. - let enlist_success = do child_data.take().map_default(true) |child_data| { - let child_data = Cell::new(child_data); // :( - do Local::borrow |me: &mut Task| { - let (child_tg, ancestors) = child_data.take(); - let mut ancestors = ancestors; - let handle = me.death.kill_handle.get_ref(); - // Atomically try to get into all of our taskgroups. - if enlist_many(handle, &child_tg, &mut ancestors) { - // Got in. We can run the provided child body, and can also run - // the taskgroup's exit-time-destructor afterward. - me.taskgroup = Some(Taskgroup(child_tg, ancestors, None)); - true - } else { - false - } - } - }; - - // Should be run after the local-borrowed task is returned. - let f_cell = Cell::new(f); - if enlist_success { - if indestructible { - do unkillable { f_cell.take()() } - } else { - f_cell.take()() - } - } - }; - let mut task = if opts.sched.mode != SingleThreaded { if opts.watched { - Task::build_child(opts.stack_size, child_wrapper) + Task::build_child(opts.stack_size, f) } else { - Task::build_root(opts.stack_size, child_wrapper) + Task::build_root(opts.stack_size, f) } } else { unsafe { @@ -634,9 +127,9 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { // Pin the new task to the new scheduler let new_task = if opts.watched { - Task::build_homed_child(opts.stack_size, child_wrapper, Sched(new_sched_handle)) + Task::build_homed_child(opts.stack_size, f, Sched(new_sched_handle)) } else { - Task::build_homed_root(opts.stack_size, child_wrapper, Sched(new_sched_handle)) + Task::build_homed_root(opts.stack_size, f, Sched(new_sched_handle)) }; // Create a task that will later be used to join with the new scheduler @@ -711,7 +204,6 @@ fn test_spawn_raw_simple() { #[test] fn test_spawn_raw_unsupervise() { let opts = task::TaskOpts { - linked: false, watched: false, notify_chan: None, .. default_task_opts() @@ -740,7 +232,6 @@ fn test_spawn_raw_notify_failure() { let (notify_po, notify_ch) = comm::stream(); let opts = task::TaskOpts { - linked: false, watched: false, notify_chan: Some(notify_ch), .. default_task_opts() |
