diff options
Diffstat (limited to 'src/libcore/task/mod.rs')
| -rw-r--r-- | src/libcore/task/mod.rs | 279 |
1 files changed, 114 insertions, 165 deletions
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index fd695c16ea7..e58aa14572b 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -156,7 +156,7 @@ pub struct SchedOpts { pub struct TaskOpts { linked: bool, supervised: bool, - mut notify_chan: Option<Chan<TaskResult>>, + notify_chan: Option<Chan<TaskResult>>, sched: SchedOpts } @@ -176,9 +176,9 @@ pub struct TaskOpts { // FIXME (#3724): Replace the 'consumed' bit with move mode on self pub struct TaskBuilder { opts: TaskOpts, - mut gen_body: Option<~fn(v: ~fn()) -> ~fn()>, + gen_body: Option<~fn(v: ~fn()) -> ~fn()>, can_not_copy: Option<util::NonCopyable>, - mut consumed: bool, + consumed: bool, } /** @@ -191,13 +191,13 @@ pub fn task() -> TaskBuilder { opts: default_task_opts(), gen_body: None, can_not_copy: None, - mut consumed: false, + consumed: false, } } #[doc(hidden)] // FIXME #3538 priv impl TaskBuilder { - fn consume(&self) -> TaskBuilder { + fn consume(&mut self) -> TaskBuilder { if self.consumed { fail!(~"Cannot copy a task_builder"); // Fake move mode on self } @@ -219,57 +219,25 @@ priv impl TaskBuilder { } pub impl TaskBuilder { - /** - * Decouple the child task's failure from the parent's. If either fails, - * the other will not be killed. - */ - fn unlinked(&self) -> TaskBuilder { - let notify_chan = replace(&mut self.opts.notify_chan, None); - TaskBuilder { - opts: TaskOpts { - linked: false, - supervised: self.opts.supervised, - notify_chan: notify_chan, - sched: self.opts.sched - }, - can_not_copy: None, - .. self.consume() - } + /// Decouple the child task's failure from the parent's. If either fails, + /// the other will not be killed. + fn unlinked(&mut self) { + self.opts.linked = false; } - /** - * Unidirectionally link the child task's failure with the parent's. The - * child's failure will not kill the parent, but the parent's will kill - * the child. - */ - fn supervised(&self) -> TaskBuilder { - let notify_chan = replace(&mut self.opts.notify_chan, None); - TaskBuilder { - opts: TaskOpts { - linked: false, - supervised: true, - notify_chan: notify_chan, - sched: self.opts.sched - }, - can_not_copy: None, - .. self.consume() - } + + /// Unidirectionally link the child task's failure with the parent's. The + /// child's failure will not kill the parent, but the parent's will kill + /// the child. + fn supervised(&mut self) { + self.opts.supervised = true; + self.opts.linked = false; } - /** - * Link the child task's and parent task's failures. If either fails, the - * other will be killed. - */ - fn linked(&self) -> TaskBuilder { - let notify_chan = replace(&mut self.opts.notify_chan, None); - TaskBuilder { - opts: TaskOpts { - linked: true, - supervised: false, - notify_chan: notify_chan, - sched: self.opts.sched - }, - can_not_copy: None, - .. self.consume() - } + + /// Link the child task's and parent task's failures. If either fails, the + /// other will be killed. + fn linked(&mut self) { + self.opts.linked = true; + self.opts.supervised = false; } /** @@ -289,7 +257,7 @@ pub impl TaskBuilder { * # Failure * Fails if a future_result was already set for this task. */ - fn future_result(&self, blk: &fn(v: Port<TaskResult>)) -> TaskBuilder { + fn future_result(&mut self, blk: &fn(v: Port<TaskResult>)) { // FIXME (#3725): Once linked failure and notification are // handled in the library, I can imagine implementing this by just // registering an arbitrary number of task::on_exit handlers and @@ -305,30 +273,12 @@ pub impl TaskBuilder { blk(notify_pipe_po); // Reconfigure self to use a notify channel. - TaskBuilder { - opts: TaskOpts { - linked: self.opts.linked, - supervised: self.opts.supervised, - notify_chan: Some(notify_pipe_ch), - sched: self.opts.sched - }, - can_not_copy: None, - .. self.consume() - } + self.opts.notify_chan = Some(notify_pipe_ch); } + /// Configure a custom scheduler mode for the task. - fn sched_mode(&self, mode: SchedMode) -> TaskBuilder { - let notify_chan = replace(&mut self.opts.notify_chan, None); - TaskBuilder { - opts: TaskOpts { - linked: self.opts.linked, - supervised: self.opts.supervised, - notify_chan: notify_chan, - sched: SchedOpts { mode: mode, foreign_stack_size: None} - }, - can_not_copy: None, - .. self.consume() - } + fn sched_mode(&mut self, mode: SchedMode) { + self.opts.sched.mode = mode; } /** @@ -343,7 +293,7 @@ pub impl TaskBuilder { * generator by applying the task body which results from the * existing body generator to the new body generator. */ - fn add_wrapper(&self, wrapper: ~fn(v: ~fn()) -> ~fn()) -> TaskBuilder { + fn add_wrapper(&mut self, wrapper: ~fn(v: ~fn()) -> ~fn()) { let prev_gen_body = replace(&mut self.gen_body, None); let prev_gen_body = match prev_gen_body { Some(gen) => gen, @@ -360,18 +310,7 @@ pub impl TaskBuilder { }; f }; - let notify_chan = replace(&mut self.opts.notify_chan, None); - TaskBuilder { - opts: TaskOpts { - linked: self.opts.linked, - supervised: self.opts.supervised, - notify_chan: notify_chan, - sched: self.opts.sched - }, - gen_body: Some(next_gen_body), - can_not_copy: None, - .. self.consume() - } + self.gen_body = Some(next_gen_body); } /** @@ -386,7 +325,7 @@ pub impl TaskBuilder { * When spawning into a new scheduler, the number of threads requested * must be greater than zero. */ - fn spawn(&self, f: ~fn()) { + fn spawn(&mut self, f: ~fn()) { let gen_body = replace(&mut self.gen_body, None); let notify_chan = replace(&mut self.opts.notify_chan, None); let x = self.consume(); @@ -406,8 +345,9 @@ pub impl TaskBuilder { }; spawn::spawn_raw(opts, f); } + /// Runs a task, while transfering ownership of one argument to the child. - fn spawn_with<A:Owned>(&self, arg: A, f: ~fn(v: A)) { + fn spawn_with<A:Owned>(&mut self, arg: A, f: ~fn(v: A)) { let arg = Cell(arg); do self.spawn { f(arg.take()); @@ -427,16 +367,16 @@ pub impl TaskBuilder { * # Failure * Fails if a future_result was already set for this task. */ - fn try<T:Owned>(&self, f: ~fn() -> T) -> Result<T,()> { + fn try<T:Owned>(&mut self, f: ~fn() -> T) -> Result<T,()> { let (po, ch) = stream::<T>(); let mut result = None; - let fr_task_builder = self.future_result(|+r| { - result = Some(r); - }); - do fr_task_builder.spawn || { + self.future_result(|r| { result = Some(r); }); + + do self.spawn { ch.send(f()); } + match result.unwrap().recv() { Success => result::Ok(po.recv()), Failure => result::Err(()) @@ -468,26 +408,23 @@ pub fn default_task_opts() -> TaskOpts { /* Spawn convenience functions */ +/// Creates and executes a new child task +/// +/// Sets up a new task with its own call stack and schedules it to run +/// the provided unique closure. +/// +/// This function is equivalent to `task().spawn(f)`. pub fn spawn(f: ~fn()) { - /*! - * Creates and executes a new child task - * - * Sets up a new task with its own call stack and schedules it to run - * the provided unique closure. - * - * This function is equivalent to `task().spawn(f)`. - */ - - task().spawn(f) + let mut task = task(); + task.spawn(f) } +/// Creates a child task unlinked from the current one. If either this +/// task or the child task fails, the other will not be killed. pub fn spawn_unlinked(f: ~fn()) { - /*! - * Creates a child task unlinked from the current one. If either this - * task or the child task fails, the other will not be killed. - */ - - task().unlinked().spawn(f) + let mut task = task(); + task.unlinked(); + task.spawn(f) } pub fn spawn_supervised(f: ~fn()) { @@ -497,7 +434,9 @@ pub fn spawn_supervised(f: ~fn()) { * the child will be killed. */ - task().supervised().spawn(f) + let mut task = task(); + task.supervised(); + task.spawn(f) } pub fn spawn_with<A:Owned>(arg: A, f: ~fn(v: A)) { @@ -511,7 +450,8 @@ pub fn spawn_with<A:Owned>(arg: A, f: ~fn(v: A)) { * This function is equivalent to `task().spawn_with(arg, f)`. */ - task().spawn_with(arg, f) + let mut task = task(); + task.spawn_with(arg, f) } pub fn spawn_sched(mode: SchedMode, f: ~fn()) { @@ -527,7 +467,9 @@ pub fn spawn_sched(mode: SchedMode, f: ~fn()) { * greater than zero. */ - task().sched_mode(mode).spawn(f) + let mut task = task(); + task.sched_mode(mode); + task.spawn(f) } pub fn try<T:Owned>(f: ~fn() -> T) -> Result<T,()> { @@ -538,7 +480,9 @@ pub fn try<T:Owned>(f: ~fn() -> T) -> Result<T,()> { * This is equivalent to task().supervised().try. */ - task().supervised().try(f) + let mut task = task(); + task.supervised(); + task.try(f) } @@ -653,12 +597,13 @@ pub unsafe fn atomically<U>(f: &fn() -> U) -> U { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_cant_dup_task_builder() { - let b = task().unlinked(); - do b.spawn { } + let mut builder = task(); + builder.unlinked(); + do builder.spawn {} // FIXME(#3724): For now, this is a -runtime- failure, because we haven't // got move mode on self. When 3724 is fixed, this test should fail to // compile instead, and should go in tests/compile-fail. - do b.spawn { } // b should have been consumed by the previous call + do builder.spawn {} // b should have been consumed by the previous call } // The following 8 tests test the following 2^3 combinations: @@ -701,43 +646,31 @@ fn test_spawn_unlinked_sup_fail_down() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails let (po, _ch) = stream::<()>(); + // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). - let opts = { - let mut opts = default_task_opts(); - opts.linked = true; - opts.supervised = true; - opts - }; + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; - let b0 = task(); - let b1 = TaskBuilder { - opts: opts, - can_not_copy: None, - .. b0 - }; - do b1.spawn { fail!(); } + do b0.spawn { + fail!(); + } po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). - let opts = { - let mut opts = default_task_opts(); - opts.linked = true; - opts.supervised = true; - opts - }; - - let b0 = task(); - let b1 = TaskBuilder { - opts: opts, - can_not_copy: None, - .. b0 - }; - do b1.spawn { loop { task::yield(); } } + let mut b0 = task(); + b0.opts.linked = true; + b0.opts.supervised = true; + do b0.spawn { + loop { + task::yield(); + } + } fail!(); // *both* mechanisms would be wrong if this didn't kill the child } #[test] #[should_fail] #[ignore(cfg(windows))] @@ -756,7 +689,13 @@ fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails // Make sure the above test is the same as this one. - do task().linked().spawn { loop { task::yield(); } } + let mut builder = task(); + builder.linked(); + do builder.spawn { + loop { + task::yield(); + } + } fail!(); } @@ -814,7 +753,8 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_run_basic() { let (po, ch) = stream::<()>(); - do task().spawn { + let mut builder = task(); + do builder.spawn { ch.send(()); } po.recv(); @@ -822,24 +762,24 @@ fn test_run_basic() { #[cfg(test)] struct Wrapper { - mut f: Option<Chan<()>> + f: Option<Chan<()>> } #[test] fn test_add_wrapper() { let (po, ch) = stream::<()>(); - let b0 = task(); - let ch = Wrapper { f: Some(ch) }; - let b1 = do b0.add_wrapper |body| { - let ch = Wrapper { f: Some(ch.f.swap_unwrap()) }; + let mut b0 = task(); + let ch = Cell(ch); + do b0.add_wrapper |body| { + let ch = Cell(ch.take()); let result: ~fn() = || { - let ch = ch.f.swap_unwrap(); + let mut ch = ch.take(); body(); ch.send(()); }; result }; - do b1.spawn { } + do b0.spawn { } po.recv(); } @@ -847,12 +787,16 @@ fn test_add_wrapper() { #[ignore(cfg(windows))] fn test_future_result() { let mut result = None; - do task().future_result(|+r| { result = Some(r); }).spawn { } + let mut builder = task(); + builder.future_result(|r| result = Some(r)); + do builder.spawn {} assert!(result.unwrap().recv() == Success); result = None; - do task().future_result(|+r| - { result = Some(r); }).unlinked().spawn { + let mut builder = task(); + builder.future_result(|r| result = Some(r)); + builder.unlinked(); + do builder.spawn { fail!(); } assert!(result.unwrap().recv() == Failure); @@ -860,7 +804,9 @@ fn test_future_result() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_back_to_the_future_result() { - let _ = task().future_result(util::ignore).future_result(util::ignore); + let mut builder = task(); + builder.future_result(util::ignore); + builder.future_result(util::ignore); } #[test] @@ -922,12 +868,12 @@ fn test_spawn_sched_childs_on_default_sched() { // Assuming tests run on the default scheduler let default_id = unsafe { rt::rust_get_sched_id() }; - let ch = Wrapper { f: Some(ch) }; + let ch = Cell(ch); do spawn_sched(SingleThreaded) { let parent_sched_id = unsafe { rt::rust_get_sched_id() }; - let ch = Wrapper { f: Some(ch.f.swap_unwrap()) }; + let ch = Cell(ch.take()); do spawn { - let ch = ch.f.swap_unwrap(); + let ch = ch.take(); let child_sched_id = unsafe { rt::rust_get_sched_id() }; assert!(parent_sched_id != child_sched_id); assert!(child_sched_id == default_id); @@ -1035,7 +981,8 @@ fn test_avoid_copying_the_body_spawn() { #[test] fn test_avoid_copying_the_body_task_spawn() { do avoid_copying_the_body |f| { - do task().spawn || { + let mut builder = task(); + do builder.spawn || { f(); } } @@ -1062,7 +1009,9 @@ fn test_avoid_copying_the_body_unlinked() { #[test] fn test_platform_thread() { let (po, ch) = stream(); - do task().sched_mode(PlatformThread).spawn { + let mut builder = task(); + builder.sched_mode(PlatformThread); + do builder.spawn { ch.send(()); } po.recv(); |
