about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-13 16:26:02 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-24 19:59:53 -0800
commit3893716390f2c4857b7e8b1705a6344f96b85bb6 (patch)
tree1a01099d27673c9906267ce82c024b33c61c1f13
parentafd4e2ad8dc4112b99c8d30996ff0bb5b0516b53 (diff)
downloadrust-3893716390f2c4857b7e8b1705a6344f96b85bb6.tar.gz
rust-3893716390f2c4857b7e8b1705a6344f96b85bb6.zip
Finalize the green::Pool type
The scheduler pool now has a much more simplified interface. There is now a
clear distinction between creating the pool and then interacting the pool. When
a pool is created, all schedulers are not active, and only later if a spawn is
done does activity occur.

There are four operations that you can do on a pool:

1. Create a new pool. The only argument to this function is the configuration
   for the scheduler pool. Currently the only configuration parameter is the
   number of threads to initially spawn.

2. Spawn a task into this pool. This takes a procedure and task configuration
   options and spawns a new task into the pool of schedulers.

3. Spawn a new scheduler into the pool. This will return a handle on which to
   communicate with the scheduler in order to do something like a pinned task.

4. Shut down the scheduler pool. This will consume the scheduler pool, request
   all of the schedulers to shut down, and then wait on all the scheduler
   threads. Currently this will block the invoking OS thread, but I plan on
   making 'Thread::join' not a thread-blocking call.

These operations can be used to encode all current usage of M:N schedulers, as
well as providing a simple interface through which a pool can be modified. There
is currently no way to remove a scheduler from a pool of scheduler, as there's
no way to guarantee that a scheduler has exited. This may be added in the
future, however (as necessary).
-rw-r--r--src/libgreen/lib.rs276
-rw-r--r--src/libgreen/sched.rs5
-rw-r--r--src/libgreen/task.rs48
-rw-r--r--src/libnative/lib.rs5
-rw-r--r--src/libnative/task.rs50
-rw-r--r--src/libstd/rt/task.rs20
-rw-r--r--src/libstd/task.rs31
7 files changed, 185 insertions, 250 deletions
diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs
index 6530316a627..f4903ea38d2 100644
--- a/src/libgreen/lib.rs
+++ b/src/libgreen/lib.rs
@@ -39,13 +39,15 @@ use std::rt::task::Task;
 use std::rt::rtio;
 use std::sync::deque;
 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
-use std::task::TaskResult;
+use std::task::TaskOpts;
 use std::vec;
 use std::util;
+use stdtask = std::rt::task;
 
-use sched::{Shutdown, Scheduler, SchedHandle};
+use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
 use sleeper_list::SleeperList;
-use task::{GreenTask, HomeSched};
+use stack::StackPool;
+use task::GreenTask;
 
 mod macros;
 
@@ -103,37 +105,17 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
 /// This function will not return until all schedulers in the associated pool
 /// have returned.
 pub fn run(main: proc()) -> int {
-    let config = Config {
-        shutdown_after_main_exits: true,
-        ..Config::new()
-    };
-    Pool::spawn(config, main).wait();
+    let mut pool = Pool::new(Config::new());
+    pool.spawn(TaskOpts::new(), main);
+    unsafe { stdtask::wait_for_completion(); }
+    pool.shutdown();
     os::get_exit_status()
 }
 
 /// Configuration of how an M:N pool of schedulers is spawned.
 pub struct Config {
-    /// If this flag is set, then when schedulers are spawned via the `start`
-    /// and `run` functions the thread invoking `start` and `run` will have a
-    /// scheduler spawned on it. This scheduler will be "special" in that the
-    /// main task will be pinned to the scheduler and it will not participate in
-    /// work stealing.
-    ///
-    /// If the `spawn` function is used to create a pool of schedulers, then
-    /// this option has no effect.
-    use_main_thread: bool,
-
     /// The number of schedulers (OS threads) to spawn into this M:N pool.
     threads: uint,
-
-    /// When the main function exits, this flag will dictate whether a shutdown
-    /// is requested of all schedulers. If this flag is `true`, this means that
-    /// schedulers will shut down as soon as possible after the main task exits
-    /// (but some may stay alive longer for things like I/O or other tasks).
-    ///
-    /// If this flag is `false`, then no action is taken when the `main` task
-    /// exits. The scheduler pool is then shut down via the `wait()` function.
-    shutdown_after_main_exits: bool,
 }
 
 impl Config {
@@ -141,9 +123,7 @@ impl Config {
     /// variables of this process.
     pub fn new() -> Config {
         Config {
-            use_main_thread: false,
             threads: rt::default_sched_threads(),
-            shutdown_after_main_exits: false,
         }
     }
 }
@@ -151,8 +131,14 @@ impl Config {
 /// A structure representing a handle to a pool of schedulers. This handle is
 /// used to keep the pool alive and also reap the status from the pool.
 pub struct Pool {
+    priv id: uint,
     priv threads: ~[Thread<()>],
-    priv handles: Option<~[SchedHandle]>,
+    priv handles: ~[SchedHandle],
+    priv stealers: ~[deque::Stealer<~task::GreenTask>],
+    priv next_friend: uint,
+    priv stack_pool: StackPool,
+    priv deque_pool: deque::BufferPool<~task::GreenTask>,
+    priv sleepers: SleeperList,
 }
 
 impl Pool {
@@ -160,177 +146,125 @@ impl Pool {
     ///
     /// This will configure the pool according to the `config` parameter, and
     /// initially run `main` inside the pool of schedulers.
-    pub fn spawn(config: Config, main: proc()) -> Pool {
+    pub fn new(config: Config) -> Pool {
         static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
 
-        let Config {
-            threads: nscheds,
-            use_main_thread: use_main_sched,
-            shutdown_after_main_exits
-        } = config;
-
-        let mut main = Some(main);
-        let pool_id = unsafe { POOL_ID.fetch_add(1, SeqCst) };
+        let Config { threads: nscheds } = config;
+        assert!(nscheds > 0);
 
-        // The shared list of sleeping schedulers.
-        let sleepers = SleeperList::new();
+        // The pool of schedulers that will be returned from this function
+        let mut pool = Pool {
+            threads: ~[],
+            handles: ~[],
+            stealers: ~[],
+            id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
+            sleepers: SleeperList::new(),
+            stack_pool: StackPool::new(),
+            deque_pool: deque::BufferPool::new(),
+            next_friend: 0,
+        };
 
         // Create a work queue for each scheduler, ntimes. Create an extra
         // for the main thread if that flag is set. We won't steal from it.
-        let mut pool = deque::BufferPool::new();
-        let arr = vec::from_fn(nscheds, |_| pool.deque());
+        let arr = vec::from_fn(nscheds, |_| pool.deque_pool.deque());
         let (workers, stealers) = vec::unzip(arr.move_iter());
+        pool.stealers = stealers;
 
-        // The schedulers.
-        let mut scheds = ~[];
-        // Handles to the schedulers. When the main task ends these will be
-        // sent the Shutdown message to terminate the schedulers.
-        let mut handles = ~[];
-
+        // Now that we've got all our work queues, create one scheduler per
+        // queue, spawn the scheduler into a thread, and be sure to keep a
+        // handle to the scheduler and the thread to keep them alive.
         for worker in workers.move_iter() {
             rtdebug!("inserting a regular scheduler");
 
-            // Every scheduler is driven by an I/O event loop.
-            let loop_ = new_event_loop();
-            let mut sched = ~Scheduler::new(pool_id,
-                                            loop_,
+            let mut sched = ~Scheduler::new(pool.id,
+                                            new_event_loop(),
                                             worker,
-                                            stealers.clone(),
-                                            sleepers.clone());
-            let handle = sched.make_handle();
-
-            scheds.push(sched);
-            handles.push(handle);
-        }
-
-        // If we need a main-thread task then create a main thread scheduler
-        // that will reject any task that isn't pinned to it
-        let main_sched = if use_main_sched {
-
-            // Create a friend handle.
-            let mut friend_sched = scheds.pop();
-            let friend_handle = friend_sched.make_handle();
-            scheds.push(friend_sched);
-
-            // This scheduler needs a queue that isn't part of the stealee
-            // set.
-            let (worker, _) = pool.deque();
-
-            let main_loop = new_event_loop();
-            let mut main_sched = ~Scheduler::new_special(pool_id,
-                                                         main_loop,
-                                                         worker,
-                                                         stealers.clone(),
-                                                         sleepers.clone(),
-                                                         false,
-                                                         Some(friend_handle));
-            let mut main_handle = main_sched.make_handle();
-            // Allow the scheduler to exit when the main task exits.
-            // Note: sending the shutdown message also prevents the scheduler
-            // from pushing itself to the sleeper list, which is used for
-            // waking up schedulers for work stealing; since this is a
-            // non-work-stealing scheduler it should not be adding itself
-            // to the list.
-            main_handle.send(Shutdown);
-            Some(main_sched)
-        } else {
-            None
-        };
-
-        // The pool of schedulers that will be returned from this function
-        let mut pool = Pool { threads: ~[], handles: None };
-
-        // When the main task exits, after all the tasks in the main
-        // task tree, shut down the schedulers and set the exit code.
-        let mut on_exit = if shutdown_after_main_exits {
-            let handles = handles;
-            Some(proc(exit_success: TaskResult) {
-                let mut handles = handles;
-                for handle in handles.mut_iter() {
-                    handle.send(Shutdown);
-                }
-                if exit_success.is_err() {
-                    os::set_exit_status(rt::DEFAULT_ERROR_CODE);
-                }
-            })
-        } else {
-            pool.handles = Some(handles);
-            None
-        };
-
-        if !use_main_sched {
-
-            // In the case where we do not use a main_thread scheduler we
-            // run the main task in one of our threads.
-
-            let mut main = GreenTask::new(&mut scheds[0].stack_pool, None,
-                                          main.take_unwrap());
-            let mut main_task = ~Task::new();
-            main_task.name = Some(SendStrStatic("<main>"));
-            main_task.death.on_exit = on_exit.take();
-            main.put_task(main_task);
-
-            let sched = scheds.pop();
-            let main = main;
-            let thread = do Thread::start {
-                sched.bootstrap(main);
-            };
-            pool.threads.push(thread);
-        }
-
-        // Run each remaining scheduler in a thread.
-        for sched in scheds.move_rev_iter() {
-            rtdebug!("creating regular schedulers");
-            let thread = do Thread::start {
+                                            pool.stealers.clone(),
+                                            pool.sleepers.clone());
+            pool.handles.push(sched.make_handle());
+            let sched = sched;
+            pool.threads.push(do Thread::start {
                 let mut sched = sched;
                 let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
                     rtdebug!("boostraping a non-primary scheduler");
                 };
                 task.put_task(~Task::new());
                 sched.bootstrap(task);
-            };
-            pool.threads.push(thread);
+            });
         }
 
-        // If we do have a main thread scheduler, run it now.
+        return pool;
+    }
 
-        if use_main_sched {
-            rtdebug!("about to create the main scheduler task");
+    pub fn shutdown(mut self) {
+        self.stealers = ~[];
 
-            let mut main_sched = main_sched.unwrap();
+        for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
+            handle.send(Shutdown);
+        }
+        for thread in util::replace(&mut self.threads, ~[]).move_iter() {
+            thread.join();
+        }
+    }
 
-            let home = HomeSched(main_sched.make_handle());
-            let mut main = GreenTask::new_homed(&mut main_sched.stack_pool, None,
-                                                home, main.take_unwrap());
-            let mut main_task = ~Task::new();
-            main_task.name = Some(SendStrStatic("<main>"));
-            main_task.death.on_exit = on_exit.take();
-            main.put_task(main_task);
-            rtdebug!("bootstrapping main_task");
+    pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
+        let task = GreenTask::configure(&mut self.stack_pool, opts, f);
 
-            main_sched.bootstrap(main);
+        // Figure out someone to send this task to
+        let idx = self.next_friend;
+        self.next_friend += 1;
+        if self.next_friend >= self.handles.len() {
+            self.next_friend = 0;
         }
 
-        return pool;
+        // Jettison the task away!
+        self.handles[idx].send(TaskFromFriend(task));
     }
 
-    /// Waits for the pool of schedulers to exit. If the pool was spawned to
-    /// shutdown after the main task exits, this will simply wait for all the
-    /// scheudlers to exit. If the pool was not spawned like that, this function
-    /// will trigger shutdown of all the active schedulers. The schedulers will
-    /// exit once all tasks in this pool of schedulers has exited.
-    pub fn wait(&mut self) {
-        match self.handles.take() {
-            Some(mut handles) => {
-                for handle in handles.mut_iter() {
-                    handle.send(Shutdown);
-                }
-            }
-            None => {}
+    /// Spawns a new scheduler into this M:N pool. A handle is returned to the
+    /// scheduler for use. The scheduler will not exit as long as this handle is
+    /// active.
+    ///
+    /// The scheduler spawned will participate in work stealing with all of the
+    /// other schedulers currently in the scheduler pool.
+    pub fn spawn_sched(&mut self) -> SchedHandle {
+        let (worker, stealer) = self.deque_pool.deque();
+        self.stealers.push(stealer.clone());
+
+        // Tell all existing schedulers about this new scheduler so they can all
+        // steal work from it
+        for handle in self.handles.mut_iter() {
+            handle.send(NewNeighbor(stealer.clone()));
         }
 
-        for thread in util::replace(&mut self.threads, ~[]).move_iter() {
-            thread.join();
+        // Create the new scheduler, using the same sleeper list as all the
+        // other schedulers as well as having a stealer handle to all other
+        // schedulers.
+        let mut sched = ~Scheduler::new(self.id,
+                                        new_event_loop(),
+                                        worker,
+                                        self.stealers.clone(),
+                                        self.sleepers.clone());
+        let ret = sched.make_handle();
+        self.handles.push(sched.make_handle());
+        let sched = sched;
+        self.threads.push(do Thread::start {
+            let mut sched = sched;
+            let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
+                rtdebug!("boostraping a non-primary scheduler");
+            };
+            task.put_task(~Task::new());
+            sched.bootstrap(task);
+        });
+
+        return ret;
+    }
+}
+
+impl Drop for Pool {
+    fn drop(&mut self) {
+        if self.threads.len() > 0 {
+            fail!("dropping a M:N scheduler pool that wasn't shut down");
         }
     }
 }
diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs
index b0a49f2450a..e349ae1e601 100644
--- a/src/libgreen/sched.rs
+++ b/src/libgreen/sched.rs
@@ -393,6 +393,10 @@ impl Scheduler {
                 stask.put_with_sched(self);
                 return None;
             }
+            Some(NewNeighbor(neighbor)) => {
+                self.work_queues.push(neighbor);
+                return Some((self, stask));
+            }
             None => {
                 return Some((self, stask));
             }
@@ -831,6 +835,7 @@ type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask);
 pub enum SchedMessage {
     Wake,
     Shutdown,
+    NewNeighbor(deque::Stealer<~GreenTask>),
     PinnedTask(~GreenTask),
     TaskFromFriend(~GreenTask),
     RunOnce(~GreenTask),
diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs
index 72e72f2cd99..e07d7f2413f 100644
--- a/src/libgreen/task.rs
+++ b/src/libgreen/task.rs
@@ -55,12 +55,15 @@ pub enum Home {
 }
 
 impl GreenTask {
+    /// Creates a new green task which is not homed to any particular scheduler
+    /// and will not have any contained Task structure.
     pub fn new(stack_pool: &mut StackPool,
                stack_size: Option<uint>,
                start: proc()) -> ~GreenTask {
         GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
     }
 
+    /// Creates a new task (like `new`), but specifies the home for new task.
     pub fn new_homed(stack_pool: &mut StackPool,
                      stack_size: Option<uint>,
                      home: Home,
@@ -71,6 +74,8 @@ impl GreenTask {
         return ops;
     }
 
+    /// Creates a new green task with the specified coroutine and type, this is
+    /// useful when creating scheduler tasks.
     pub fn new_typed(coroutine: Option<Coroutine>,
                      task_type: TaskType) -> ~GreenTask {
         ~GreenTask {
@@ -84,6 +89,31 @@ impl GreenTask {
         }
     }
 
+    /// Creates a new green task with the given configuration options for the
+    /// contained Task object. The given stack pool is also used to allocate a
+    /// new stack for this task.
+    pub fn configure(pool: &mut StackPool,
+                     opts: TaskOpts,
+                     f: proc()) -> ~GreenTask {
+        let TaskOpts {
+            watched: _watched,
+            notify_chan, name, stack_size
+        } = opts;
+
+        let mut green = GreenTask::new(pool, stack_size, f);
+        let mut task = ~Task::new();
+        task.name = name;
+        match notify_chan {
+            Some(chan) => {
+                let on_exit = proc(task_result) { chan.send(task_result) };
+                task.death.on_exit = Some(on_exit);
+            }
+            None => {}
+        }
+        green.put_task(task);
+        return green;
+    }
+
     /// Just like the `maybe_take_runtime` function, this function should *not*
     /// exist. Usage of this function is _strongly_ discouraged. This is an
     /// absolute last resort necessary for converting a libstd task to a green
@@ -367,11 +397,6 @@ impl Runtime for GreenTask {
     fn spawn_sibling(mut ~self, cur_task: ~Task, opts: TaskOpts, f: proc()) {
         self.put_task(cur_task);
 
-        let TaskOpts {
-            watched: _watched,
-            notify_chan, name, stack_size
-        } = opts;
-
         // Spawns a task into the current scheduler. We allocate the new task's
         // stack from the scheduler's stack pool, and then configure it
         // accordingly to `opts`. Afterwards we bootstrap it immediately by
@@ -379,18 +404,7 @@ impl Runtime for GreenTask {
         //
         // Upon returning, our task is back in TLS and we're good to return.
         let mut sched = self.sched.take_unwrap();
-        let mut sibling = GreenTask::new(&mut sched.stack_pool, stack_size, f);
-        let mut sibling_task = ~Task::new();
-        sibling_task.name = name;
-        match notify_chan {
-            Some(chan) => {
-                let on_exit = proc(task_result) { chan.send(task_result) };
-                sibling_task.death.on_exit = Some(on_exit);
-            }
-            None => {}
-        }
-
-        sibling.task = Some(sibling_task);
+        let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
         sched.run_task(self, sibling)
     }
 
diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs
index 40d8f6f0b46..b97d9127277 100644
--- a/src/libnative/lib.rs
+++ b/src/libnative/lib.rs
@@ -32,6 +32,7 @@
 
 use std::os;
 use std::rt;
+use stdtask = std::rt::task;
 
 pub mod io;
 pub mod task;
@@ -81,7 +82,9 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
 pub fn run(main: proc()) -> int {
     // Create a task, run the procedure in it, and then wait for everything.
     task::run(task::new(), main);
-    task::wait_for_completion();
+
+    // Block this OS task waiting for everything to finish.
+    unsafe { stdtask::wait_for_completion() }
 
     os::get_exit_status()
 }
diff --git a/src/libnative/task.rs b/src/libnative/task.rs
index f0502a43990..1aa32bc8a26 100644
--- a/src/libnative/task.rs
+++ b/src/libnative/task.rs
@@ -21,49 +21,13 @@ use std::rt::rtio;
 use std::rt::task::{Task, BlockedTask};
 use std::rt::thread::Thread;
 use std::rt;
-use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
-use std::task::{TaskOpts, default_task_opts};
-use std::unstable::mutex::{Mutex, MUTEX_INIT};
+use std::task::TaskOpts;
+use std::unstable::mutex::Mutex;
 use std::unstable::stack;
 
 use io;
 use task;
 
-static mut THREAD_CNT: AtomicUint = INIT_ATOMIC_UINT;
-static mut LOCK: Mutex = MUTEX_INIT;
-
-/// Waits for all spawned threads to finish completion. This should only be used
-/// by the main task in order to wait for all other tasks to terminate.
-///
-/// This mirrors the same semantics as the green scheduling model.
-pub fn wait_for_completion() {
-    static mut someone_waited: bool = false;
-
-    unsafe {
-        LOCK.lock();
-        assert!(!someone_waited);
-        someone_waited = true;
-        while THREAD_CNT.load(SeqCst) > 0 {
-            LOCK.wait();
-        }
-        LOCK.unlock();
-        LOCK.destroy();
-    }
-
-}
-
-// Signal that a thread has finished execution, possibly waking up a blocker
-// waiting for all threads to have finished.
-fn signal_done() {
-    unsafe {
-        LOCK.lock();
-        if THREAD_CNT.fetch_sub(1, SeqCst) == 1 {
-            LOCK.signal();
-        }
-        LOCK.unlock();
-    }
-}
-
 /// Creates a new Task which is ready to execute as a 1:1 task.
 pub fn new() -> ~Task {
     let mut task = ~Task::new();
@@ -75,15 +39,12 @@ pub fn new() -> ~Task {
 
 /// Spawns a function with the default configuration
 pub fn spawn(f: proc()) {
-    spawn_opts(default_task_opts(), f)
+    spawn_opts(TaskOpts::new(), f)
 }
 
 /// Spawns a new task given the configuration options and a procedure to run
 /// inside the task.
 pub fn spawn_opts(opts: TaskOpts, f: proc()) {
-    // must happen before the spawn, no need to synchronize with a lock.
-    unsafe { THREAD_CNT.fetch_add(1, SeqCst); }
-
     let TaskOpts {
         watched: _watched,
         notify_chan, name, stack_size
@@ -117,7 +78,6 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
         }
 
         run(task, f);
-        signal_done();
     })
 }
 
@@ -290,7 +250,7 @@ mod tests {
 
     #[test]
     fn smoke_opts() {
-        let mut opts = task::default_task_opts();
+        let mut opts = TaskOpts::new();
         opts.name = Some(SendStrStatic("test"));
         opts.stack_size = Some(20 * 4096);
         let (p, c) = Chan::new();
@@ -301,7 +261,7 @@ mod tests {
 
     #[test]
     fn smoke_opts_fail() {
-        let mut opts = task::default_task_opts();
+        let mut opts = TaskOpts::new();
         let (p, c) = Chan::new();
         opts.notify_chan = Some(c);
         spawn_opts(opts, proc() { fail!() });
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index c0164891cd4..91e285b1061 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -33,12 +33,16 @@ use rt::rtio::LocalIo;
 use rt::unwind::Unwinder;
 use send_str::SendStr;
 use sync::arc::UnsafeArc;
-use sync::atomics::{AtomicUint, SeqCst};
+use sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
 use task::{TaskResult, TaskOpts};
 use unstable::finally::Finally;
+use unstable::mutex::{Mutex, MUTEX_INIT};
 
 #[cfg(stage0)] pub use rt::unwind::begin_unwind;
 
+static mut TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT;
+static mut TASK_LOCK: Mutex = MUTEX_INIT;
+
 // The Task struct represents all state associated with a rust
 // task. There are at this point two primary "subtypes" of task,
 // however instead of using a subtype we just have a "task_type" field
@@ -117,6 +121,7 @@ impl Task {
             *cast::transmute::<&~Task, &*mut Task>(&self)
         };
         Local::put(self);
+        unsafe { TASK_COUNT.fetch_add(1, SeqCst); }
 
         // The only try/catch block in the world. Attempt to run the task's
         // client-specified code and catch any failures.
@@ -180,6 +185,11 @@ impl Task {
         unsafe {
             let me: *mut Task = Local::unsafe_borrow();
             (*me).death.collect_failure((*me).unwinder.result());
+            if TASK_COUNT.fetch_sub(1, SeqCst) == 1 {
+                TASK_LOCK.lock();
+                TASK_LOCK.signal();
+                TASK_LOCK.unlock();
+            }
         }
         let mut me: ~Task = Local::take();
         me.destroyed = true;
@@ -376,6 +386,14 @@ impl Drop for Death {
     }
 }
 
+pub unsafe fn wait_for_completion() {
+    TASK_LOCK.lock();
+    while TASK_COUNT.load(SeqCst) > 0 {
+        TASK_LOCK.wait();
+    }
+    TASK_LOCK.unlock();
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
diff --git a/src/libstd/task.rs b/src/libstd/task.rs
index 3b9cde5f44d..836390fb416 100644
--- a/src/libstd/task.rs
+++ b/src/libstd/task.rs
@@ -131,7 +131,7 @@ pub struct TaskBuilder {
  */
 pub fn task() -> TaskBuilder {
     TaskBuilder {
-        opts: default_task_opts(),
+        opts: TaskOpts::new(),
         gen_body: None,
         can_not_copy: None,
     }
@@ -301,22 +301,23 @@ impl TaskBuilder {
     }
 }
 
-
 /* Task construction */
 
-pub fn default_task_opts() -> TaskOpts {
-    /*!
-     * The default task options
-     *
-     * By default all tasks are supervised by their parent, are spawned
-     * into the same scheduler, and do not post lifecycle notifications.
-     */
-
-    TaskOpts {
-        watched: true,
-        notify_chan: None,
-        name: None,
-        stack_size: None
+impl TaskOpts {
+    pub fn new() -> TaskOpts {
+        /*!
+         * The default task options
+         *
+         * By default all tasks are supervised by their parent, are spawned
+         * into the same scheduler, and do not post lifecycle notifications.
+         */
+
+        TaskOpts {
+            watched: true,
+            notify_chan: None,
+            name: None,
+            stack_size: None
+        }
     }
 }