about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorBen Blum <bblum@andrew.cmu.edu>2012-07-23 19:27:44 -0400
committerBen Blum <bblum@andrew.cmu.edu>2012-07-23 20:09:16 -0400
commite6efb24f3fa47462a3febd052e5ccc21e2c3ec63 (patch)
tree205f7c9dd961b2bf85d20ce094f6d1878d853325 /src
parent7680f504c2c9934a617ff20b1d852ea191c6bc52 (diff)
downloadrust-e6efb24f3fa47462a3febd052e5ccc21e2c3ec63.tar.gz
rust-e6efb24f3fa47462a3febd052e5ccc21e2c3ec63.zip
Add task::task_builder interface for improved spawning (related #2585)
Diffstat (limited to 'src')
-rw-r--r--src/libcore/task.rs298
1 files changed, 220 insertions, 78 deletions
diff --git a/src/libcore/task.rs b/src/libcore/task.rs
index 4d187a01e7f..288f52919ac 100644
--- a/src/libcore/task.rs
+++ b/src/libcore/task.rs
@@ -35,6 +35,7 @@ export sched_mode;
 export sched_opts;
 export task_opts;
 export builder;
+export task_builder;
 
 export default_task_opts;
 export get_opts;
@@ -46,7 +47,6 @@ export run;
 export future_result;
 export future_task;
 export unsupervise;
-export parent;
 export run_listener;
 export run_with;
 
@@ -77,7 +77,7 @@ export osmain;
 /* Data types */
 
 /// A handle to a task
-enum task = task_id;
+enum task { task_handle(task_id) }
 
 /**
  * Indicates the manner in which a task exited.
@@ -192,6 +192,141 @@ enum builder {
     })
 }
 
+class dummy { let x: (); new() { self.x = (); } drop { } }
+
+// FIXME (#2585): Replace the 'consumed' bit with move mode on self
+enum task_builder = {
+    opts: task_opts,
+    gen_body: fn@(+fn~()) -> fn~(),
+    can_not_copy: option<dummy>,
+    mut consumed: bool,
+};
+
+/**
+ * Generate the base configuration for spawning a task, off of which more
+ * configuration methods can be chained.
+ * For example, task().unlinked().spawn is equivalent to spawn_unlinked.
+ */
+fn task() -> task_builder {
+    task_builder({
+        opts: default_task_opts(),
+        gen_body: |body| body, // Identity function
+        can_not_copy: none,
+        mut consumed: false,
+    })
+}
+
+impl private_methods for task_builder {
+    fn consume() -> task_builder {
+        if self.consumed {
+            fail ~"Cannot copy a task_builder"; // Fake move mode on self
+        }
+        self.consumed = true;
+        task_builder({ can_not_copy: none, mut consumed: false, with *self })
+    }
+}
+
+impl task_builder for task_builder {
+    /**
+     * Decouple the child task's failure from the parent's. If either fails,
+     * the other will not be killed.
+     */
+    fn unlinked() -> task_builder {
+        task_builder({
+            opts: { linked: false with self.opts },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+    /**
+     * Unidirectionally link the child task's failure with the parent's. The
+     * child's failure will not kill the parent, but the parent's will kill
+     * the child.
+     */
+    fn supervised() -> task_builder {
+        task_builder({
+            opts: { linked: false, parented: true with self.opts },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+    /**
+     * Link the child task's and parent task's failures. If either fails, the
+     * other will be killed.
+     */
+    fn linked() -> task_builder {
+        task_builder({
+            opts: { linked: true, parented: false with self.opts },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+
+    /// Configure a future result notification for this task.
+    fn future_result(blk: fn(-future::future<task_result>)) -> task_builder {
+        // Construct the future and give it to the caller.
+        let po = comm::port::<notification>();
+        let ch = comm::chan(po);
+
+        blk(do future::from_fn {
+            alt comm::recv(po) {
+              exit(_, result) { result }
+            }
+        });
+
+        // Reconfigure self to use a notify channel.
+        task_builder({
+            opts: { notify_chan: some(ch) with self.opts },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+    /// Configure a custom scheduler mode for the task.
+    fn sched_mode(mode: sched_mode) -> task_builder {
+        task_builder({
+            opts: { sched: some({ mode: mode, foreign_stack_size: none})
+                    with self.opts },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+    fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> task_builder {
+        let prev_gen_body = self.gen_body;
+        task_builder({
+            gen_body: |body| { wrapper(prev_gen_body(body)) },
+            can_not_copy: none,
+            with *self.consume()
+        })
+    }
+
+    /// Run the task.
+    fn spawn(+f: fn~()) {
+        let x = self.consume();
+        spawn_raw(x.opts, x.gen_body(f));
+    }
+    /// Runs a task, while transfering ownership of one argument to the child.
+    fn spawn_with<A: send>(+arg: A, +f: fn~(+A)) {
+        let arg = ~mut some(arg);
+        do self.spawn {
+            let mut my_arg = none;
+            my_arg <-> *arg;
+            f(option::unwrap(my_arg))
+        }
+    }
+    /// Runs a task with a listening port, returning the associated channel.
+    fn spawn_listener<A: send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
+        let setup_po = comm::port();
+        let setup_ch = comm::chan(setup_po);
+        do self.spawn {
+            let po = comm::port();
+            let ch = comm::chan(po);
+            comm::send(setup_ch, ch);
+            f(po);
+        }
+        comm::recv(setup_po)
+    }
+}
+
 
 /* Task construction */
 
@@ -362,11 +497,6 @@ fn unsupervise(builder: builder) {
     });
 }
 
-fn parent(builder: builder) {
-    //! Configures the new task to be killed if the parent group is killed.
-    set_opts(builder, { parented: true with get_opts(builder) });
-}
-
 fn run_with<A:send>(-builder: builder,
                     +arg: A,
                     +f: fn~(+A)) {
@@ -428,7 +558,7 @@ fn spawn(+f: fn~()) {
      * This function is equivalent to `run(new_builder(), f)`.
      */
 
-    run(builder(), f);
+    task().spawn(f)
 }
 
 fn spawn_unlinked(+f: fn~()) {
@@ -437,9 +567,16 @@ fn spawn_unlinked(+f: fn~()) {
      * task or the child task fails, the other will not be killed.
      */
 
-    let b = builder();
-    unsupervise(b);
-    run(b, f);
+    task().unlinked().spawn(f)
+}
+
+fn spawn_supervised(+f: fn~()) {
+    /*!
+     * Creates a child task unlinked from the current one. If either this
+     * task or the child task fails, the other will not be killed.
+     */
+
+    task().supervised().spawn(f)
 }
 
 fn spawn_with<A:send>(+arg: A, +f: fn~(+A)) {
@@ -453,7 +590,7 @@ fn spawn_with<A:send>(+arg: A, +f: fn~(+A)) {
      * This function is equivalent to `run_with(builder(), arg, f)`.
      */
 
-    run_with(builder(), arg, f)
+    task().spawn_with(arg, f)
 }
 
 fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
@@ -482,7 +619,7 @@ fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
      * This function is equivalent to `run_listener(builder(), f)`.
      */
 
-    run_listener(builder(), f)
+    task().spawn_listener(f)
 }
 
 fn spawn_sched(mode: sched_mode, +f: fn~()) {
@@ -499,9 +636,7 @@ fn spawn_sched(mode: sched_mode, +f: fn~()) {
      * greater than zero.
      */
 
-    let mut builder = builder();
-    set_sched_mode(builder, mode);
-    run(builder, f);
+    task().sched_mode(mode).spawn(f)
 }
 
 fn try<T:send>(+f: fn~() -> T) -> result<T,()> {
@@ -518,13 +653,13 @@ fn try<T:send>(+f: fn~() -> T) -> result<T,()> {
 
     let po = comm::port();
     let ch = comm::chan(po);
-    let mut builder = builder();
-    unsupervise(builder);
-    let result = future_result(builder);
-    do run(builder) {
+
+    let mut result = none;
+
+    do task().unlinked().future_result(|-r| { result = some(r); }).spawn {
         comm::send(ch, f());
     }
-    alt future::get(result) {
+    alt future::get(option::unwrap(result)) {
       success { result::ok(comm::recv(po)) }
       failure { result::err(()) }
     }
@@ -553,7 +688,7 @@ fn failing() -> bool {
 fn get_task() -> task {
     //! Get a handle to the running task
 
-    task(rustrt::get_task_id())
+    task_handle(rustrt::get_task_id())
 }
 
 /**
@@ -1161,6 +1296,16 @@ fn test_spawn_raw_unsupervise() {
     }
 }
 
+#[test] #[should_fail] #[ignore(cfg(windows))]
+fn test_cant_dup_task_builder() {
+    let b = task().unlinked();
+    do b.spawn { }
+    // FIXME(#2585): For now, this is a -runtime- failure, because we haven't
+    // got modes on self. When 2585 is fixed, this test should fail to compile
+    // instead, and should go in tests/compile-fail.
+    do b.spawn { } // b should have been consumed by the previous call
+}
+
 // The following 8 tests test the following 2^3 combinations:
 // {un,}linked {un,}supervised failure propagation {up,down}wards.
 
@@ -1171,8 +1316,8 @@ fn test_spawn_raw_unsupervise() {
 fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
     let po = comm::port();
     let ch = comm::chan(po);
-    do task::spawn_unlinked {
-        do task::spawn_unlinked {
+    do spawn_unlinked {
+        do spawn_unlinked {
             // Give middle task a chance to fail-but-not-kill-us.
             for iter::repeat(8192) { task::yield(); }
             comm::send(ch, ()); // If killed first, grandparent hangs.
@@ -1183,23 +1328,17 @@ fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
 }
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
-    do task::spawn_unlinked { fail; }
+    do spawn_unlinked { fail; }
 }
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
-    let builder = task::builder();
-    task::unsupervise(builder);
-    task::parent(builder);
-    do task::run(builder) { fail; }
+    do spawn_supervised { fail; }
     // Give child a chance to fail-but-not-kill-us.
     for iter::repeat(8192) { task::yield(); }
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_sup_fail_down() {
-    let builder = task::builder();
-    task::unsupervise(builder);
-    task::parent(builder);
-    do task::run(builder) { loop { task::yield(); } }
+    do spawn_supervised { loop { task::yield(); } }
     fail; // Shouldn't leave a child hanging around.
 }
 
@@ -1207,17 +1346,29 @@ fn test_spawn_unlinked_sup_fail_down() {
 fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
     let po = comm::port::<()>();
     let _ch = comm::chan(po);
-    let builder = task::builder();
-    task::parent(builder);
     // Unidirectional "parenting" shouldn't override bidirectional linked.
-    do task::run(builder) { fail; }
+    // We have to cheat with opts - the interface doesn't support them because
+    // they don't make sense (redundant with task().supervised()).
+    let b0 = task();
+    let b1 = task_builder({
+        opts: { linked: true, parented: true with b0.opts },
+        can_not_copy: none,
+        with *b0
+    });
+    do b1.spawn { fail; }
     comm::recv(po); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
-    let builder = task::builder();
-    task::parent(builder);
-    do task::run(builder) { loop { task::yield(); } }
+    // We have to cheat with opts - the interface doesn't support them because
+    // they don't make sense (redundant with task().supervised()).
+    let b0 = task();
+    let b1 = task_builder({
+        opts: { linked: true, parented: true with b0.opts },
+        can_not_copy: none,
+        with *b0
+    });
+    do b1.spawn { loop { task::yield(); } }
     fail; // *both* mechanisms would be wrong if this didn't kill the child...
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
@@ -1225,13 +1376,19 @@ fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
     let po = comm::port::<()>();
     let _ch = comm::chan(po);
     // Default options are to spawn linked & unsupervised.
-    do task::spawn { fail; }
+    do spawn { fail; }
     comm::recv(po); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
     // Default options are to spawn linked & unsupervised.
-    do task::spawn { loop { task::yield(); } }
+    do spawn { loop { task::yield(); } }
+    fail;
+}
+#[test] #[should_fail] #[ignore(cfg(windows))]
+fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
+    // Make sure the above test is the same as this one.
+    do task().linked().spawn { loop { task::yield(); } }
     fail;
 }
 
@@ -1240,14 +1397,8 @@ fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
 #[test] #[should_fail] // #[ignore(cfg(windows))]
 #[ignore] // FIXME (#1868) (bblum) make this work
 fn test_spawn_unlinked_sup_propagate_grandchild() {
-    let builder = task::builder();
-    task::unsupervise(builder);
-    task::parent(builder);
-    do task::run(builder) {
-        let builder = task::builder();
-        task::unsupervise(builder);
-        task::parent(builder);
-        do task::run(builder) {
+    do spawn_supervised {
+        do spawn_supervised {
             loop { task::yield(); }
         }
     }
@@ -1290,8 +1441,7 @@ fn test_spawn_raw_notify() {
 fn test_run_basic() {
     let po = comm::port();
     let ch = comm::chan(po);
-    let buildr = builder();
-    do run(buildr) {
+    do task().spawn {
         comm::send(ch, ());
     }
     comm::recv(po);
@@ -1301,30 +1451,29 @@ fn test_run_basic() {
 fn test_add_wrapper() {
     let po = comm::port();
     let ch = comm::chan(po);
-    let buildr = builder();
-    do add_wrapper(buildr) |body| {
+    let b0 = task();
+    let b1 = do b0.add_wrapper |body| {
         fn~() {
             body();
             comm::send(ch, ());
         }
-    }
-    do run(buildr) { }
+    };
+    do b1.spawn { }
     comm::recv(po);
 }
 
 #[test]
 #[ignore(cfg(windows))]
 fn test_future_result() {
-    let buildr = builder();
-    let result = future_result(buildr);
-    do run(buildr) { }
-    assert future::get(result) == success;
+    let mut result = none;
+    do task().future_result(|-r| { result = some(r); }).spawn { }
+    assert future::get(option::unwrap(result)) == success;
 
-    let buildr = builder();
-    let result = future_result(buildr);
-    unsupervise(buildr);
-    do run(buildr) { fail }
-    assert future::get(result) == failure;
+    result = none;
+    do task().future_result(|-r| { result = some(r); }).unlinked().spawn {
+        fail;
+    }
+    assert future::get(option::unwrap(result)) == failure;
 }
 
 #[test]
@@ -1525,20 +1674,18 @@ fn test_avoid_copying_the_body_spawn_listener() {
 }
 
 #[test]
-fn test_avoid_copying_the_body_run() {
+fn test_avoid_copying_the_body_task_spawn() {
     do avoid_copying_the_body |f| {
-        let buildr = builder();
-        do run(buildr) {
+        do task().spawn {
             f();
         }
     }
 }
 
 #[test]
-fn test_avoid_copying_the_body_run_listener() {
+fn test_avoid_copying_the_body_spawn_listener() {
     do avoid_copying_the_body |f| {
-        let buildr = builder();
-        run_listener(buildr, fn~(move f, _po: comm::port<int>) {
+        task().spawn_listener(fn~(move f, _po: comm::port<int>) {
             f();
         });
     }
@@ -1565,11 +1712,9 @@ fn test_avoid_copying_the_body_future_task() {
 }
 
 #[test]
-fn test_avoid_copying_the_body_unsupervise() {
+fn test_avoid_copying_the_body_unlinked() {
     do avoid_copying_the_body |f| {
-        let buildr = builder();
-        unsupervise(buildr);
-        do run(buildr) {
+        do spawn_unlinked {
             f();
         }
     }
@@ -1577,12 +1722,9 @@ fn test_avoid_copying_the_body_unsupervise() {
 
 #[test]
 fn test_osmain() {
-    let buildr = builder();
-    set_sched_mode(buildr, osmain);
-
     let po = comm::port();
     let ch = comm::chan(po);
-    do run(buildr) {
+    do task().sched_mode(osmain).spawn {
         comm::send(ch, ());
     }
     comm::recv(po);