about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-06-15 19:31:46 -0700
committerBrian Anderson <banderson@mozilla.com>2013-06-15 19:31:46 -0700
commit3208fc36bf2c7e99451e21171f82dafef2ea51dc (patch)
tree0ac94c64e1601824d3e326964580a3b84295737a /src/libstd/rt
parentb08c4467980bc712995d421dd50c1cca2948b67b (diff)
parent505ef7e710ff890c0027fadad54997041b7ee93b (diff)
downloadrust-3208fc36bf2c7e99451e21171f82dafef2ea51dc.tar.gz
rust-3208fc36bf2c7e99451e21171f82dafef2ea51dc.zip
Merge remote-tracking branch 'brson/io-wip' into io
Conflicts:
	src/libstd/rt/sched.rs
	src/libstd/rt/task.rs
	src/libstd/rt/test.rs
	src/libstd/task/mod.rs
	src/libstd/task/spawn.rs
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/join_latch.rs645
-rw-r--r--src/libstd/rt/metrics.rs16
-rw-r--r--src/libstd/rt/mod.rs7
-rw-r--r--src/libstd/rt/sched.rs38
-rw-r--r--src/libstd/rt/task.rs60
-rw-r--r--src/libstd/rt/test.rs98
6 files changed, 778 insertions, 86 deletions
diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs
new file mode 100644
index 00000000000..b501699509e
--- /dev/null
+++ b/src/libstd/rt/join_latch.rs
@@ -0,0 +1,645 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! The JoinLatch is a concurrent type that establishes the task
+//! tree and propagates failure.
+//!
+//! Each task gets a JoinLatch that is derived from the JoinLatch
+//! of its parent task. Every latch must be released by either calling
+//! the non-blocking `release` method or the task-blocking `wait` method.
+//! Releasing a latch does not complete until all of its child latches
+//! complete.
+//!
+//! Latches carry a `success` flag that is set to `false` during task
+//! failure and is propagated both from children to parents and parents
+//! to children. The status af this flag may be queried for the purposes
+//! of linked failure.
+//!
+//! In addition to failure propagation the task tree serves to keep the
+//! default task schedulers alive. The runtime only sends the shutdown
+//! message to schedulers once the root task exits.
+//!
+//! Under this scheme tasks that terminate before their children become
+//! 'zombies' since they may not exit until their children do. Zombie
+//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks
+//! themselves allowed to terminate.
+//!
+//! XXX: Propagate flag from parents to children.
+//! XXX: Tombstoning actually doesn't work.
+//! XXX: This could probably be done in a way that doesn't leak tombstones
+//!      longer than the life of the child tasks.
+
+use comm::{GenericPort, Peekable, GenericSmartChan};
+use clone::Clone;
+use container::Container;
+use option::{Option, Some, None};
+use ops::Drop;
+use rt::comm::{SharedChan, Port, stream};
+use rt::local::Local;
+use rt::sched::Scheduler;
+use unstable::atomics::{AtomicUint, SeqCst};
+use util;
+use vec::OwnedVector;
+
+// FIXME #7026: Would prefer this to be an enum
+pub struct JoinLatch {
+    priv parent: Option<ParentLink>,
+    priv child: Option<ChildLink>,
+    closed: bool,
+}
+
+// Shared between parents and all their children.
+struct SharedState {
+    /// Reference count, held by a parent and all children.
+    count: AtomicUint,
+    success: bool
+}
+
+struct ParentLink {
+    shared: *mut SharedState,
+    // For communicating with the parent.
+    chan: SharedChan<Message>
+}
+
+struct ChildLink {
+    shared: ~SharedState,
+    // For receiving from children.
+    port: Port<Message>,
+    chan: SharedChan<Message>,
+    // Prevents dropping the child SharedState reference counts multiple times.
+    dropped_child: bool
+}
+
+// Messages from child latches to parent.
+enum Message {
+    Tombstone(~JoinLatch),
+    ChildrenTerminated
+}
+
+impl JoinLatch {
+    pub fn new_root() -> ~JoinLatch {
+        let this = ~JoinLatch {
+            parent: None,
+            child: None,
+            closed: false
+        };
+        rtdebug!("new root latch %x", this.id());
+        return this;
+    }
+
+    fn id(&self) -> uint {
+        unsafe { ::cast::transmute(&*self) }
+    }
+
+    pub fn new_child(&mut self) -> ~JoinLatch {
+        rtassert!(!self.closed);
+
+        if self.child.is_none() {
+            // This is the first time spawning a child
+            let shared = ~SharedState {
+                count: AtomicUint::new(1),
+                success: true
+            };
+            let (port, chan) = stream();
+            let chan = SharedChan::new(chan);
+            let child = ChildLink {
+                shared: shared,
+                port: port,
+                chan: chan,
+                dropped_child: false
+            };
+            self.child = Some(child);
+        }
+
+        let child_link: &mut ChildLink = self.child.get_mut_ref();
+        let shared_state: *mut SharedState = &mut *child_link.shared;
+
+        child_link.shared.count.fetch_add(1, SeqCst);
+
+        let child = ~JoinLatch {
+            parent: Some(ParentLink {
+                shared: shared_state,
+                chan: child_link.chan.clone()
+            }),
+            child: None,
+            closed: false
+        };
+        rtdebug!("NEW child latch %x", child.id());
+        return child;
+    }
+
+    pub fn release(~self, local_success: bool) {
+        // XXX: This should not block, but there's a bug in the below
+        // code that I can't figure out.
+        self.wait(local_success);
+    }
+
+    // XXX: Should not require ~self
+    fn release_broken(~self, local_success: bool) {
+        rtassert!(!self.closed);
+
+        rtdebug!("releasing %x", self.id());
+
+        let id = self.id();
+        let _ = id; // XXX: `id` is only used in debug statements so appears unused
+        let mut this = self;
+        let mut child_success = true;
+        let mut children_done = false;
+
+        if this.child.is_some() {
+            rtdebug!("releasing children");
+            let child_link: &mut ChildLink = this.child.get_mut_ref();
+            let shared: &mut SharedState = &mut *child_link.shared;
+
+            if !child_link.dropped_child {
+                let last_count = shared.count.fetch_sub(1, SeqCst);
+                rtdebug!("child count before sub %u %x", last_count, id);
+                if last_count == 1 {
+                    assert!(child_link.chan.try_send(ChildrenTerminated));
+                }
+                child_link.dropped_child = true;
+            }
+
+            // Wait for messages from children
+            let mut tombstones = ~[];
+            loop {
+                if child_link.port.peek() {
+                    match child_link.port.recv() {
+                        Tombstone(t) => {
+                            tombstones.push(t);
+                        },
+                        ChildrenTerminated => {
+                            children_done = true;
+                            break;
+                        }
+                    }
+                } else {
+                    break
+                }
+            }
+
+            rtdebug!("releasing %u tombstones %x", tombstones.len(), id);
+
+            // Try to release the tombstones. Those that still have
+            // outstanding will be re-enqueued.  When this task's
+            // parents release their latch we'll end up back here
+            // trying them again.
+            while !tombstones.is_empty() {
+                tombstones.pop().release(true);
+            }
+
+            if children_done {
+                let count = shared.count.load(SeqCst);
+                assert!(count == 0);
+                // self_count is the acquire-read barrier
+                child_success = shared.success;
+            }
+        } else {
+            children_done = true;
+        }
+
+        let total_success = local_success && child_success;
+
+        rtassert!(this.parent.is_some());
+
+        unsafe {
+            {
+                let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                let shared: *mut SharedState = parent_link.shared;
+
+                if !total_success {
+                    // parent_count is the write-wait barrier
+                    (*shared).success = false;
+                }
+            }
+
+            if children_done {
+                rtdebug!("children done");
+                do Local::borrow::<Scheduler, ()> |sched| {
+                    sched.metrics.release_tombstone += 1;
+                }
+                {
+                    rtdebug!("RELEASING parent %x", id);
+                    let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                    let shared: *mut SharedState = parent_link.shared;
+                    let last_count = (*shared).count.fetch_sub(1, SeqCst);
+                    rtdebug!("count before parent sub %u %x", last_count, id);
+                    if last_count == 1 {
+                        assert!(parent_link.chan.try_send(ChildrenTerminated));
+                    }
+                }
+                this.closed = true;
+                util::ignore(this);
+            } else {
+                rtdebug!("children not done");
+                rtdebug!("TOMBSTONING %x", id);
+                do Local::borrow::<Scheduler, ()> |sched| {
+                    sched.metrics.release_no_tombstone += 1;
+                }
+                let chan = {
+                    let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                    parent_link.chan.clone()
+                };
+                assert!(chan.try_send(Tombstone(this)));
+            }
+        }
+    }
+
+    // XXX: Should not require ~self
+    pub fn wait(~self, local_success: bool) -> bool {
+        rtassert!(!self.closed);
+
+        rtdebug!("WAITING %x", self.id());
+
+        let mut this = self;
+        let mut child_success = true;
+
+        if this.child.is_some() {
+            rtdebug!("waiting for children");
+            let child_link: &mut ChildLink = this.child.get_mut_ref();
+            let shared: &mut SharedState = &mut *child_link.shared;
+
+            if !child_link.dropped_child {
+                let last_count = shared.count.fetch_sub(1, SeqCst);
+                rtdebug!("child count before sub %u", last_count);
+                if last_count == 1 {
+                    assert!(child_link.chan.try_send(ChildrenTerminated));
+                }
+                child_link.dropped_child = true;
+            }
+
+            // Wait for messages from children
+            loop {
+                match child_link.port.recv() {
+                    Tombstone(t) => {
+                        t.wait(true);
+                    }
+                    ChildrenTerminated => break
+                }
+            }
+
+            let count = shared.count.load(SeqCst);
+            if count != 0 { ::io::println(fmt!("%u", count)); }
+            assert!(count == 0);
+            // self_count is the acquire-read barrier
+            child_success = shared.success;
+        }
+
+        let total_success = local_success && child_success;
+
+        if this.parent.is_some() {
+            rtdebug!("releasing parent");
+            unsafe {
+                let parent_link: &mut ParentLink = this.parent.get_mut_ref();
+                let shared: *mut SharedState = parent_link.shared;
+
+                if !total_success {
+                    // parent_count is the write-wait barrier
+                    (*shared).success = false;
+                }
+
+                let last_count = (*shared).count.fetch_sub(1, SeqCst);
+                rtdebug!("count before parent sub %u", last_count);
+                if last_count == 1 {
+                    assert!(parent_link.chan.try_send(ChildrenTerminated));
+                }
+            }
+        }
+
+        this.closed = true;
+        util::ignore(this);
+
+        return total_success;
+    }
+}
+
+impl Drop for JoinLatch {
+    fn finalize(&self) {
+        rtdebug!("DESTROYING %x", self.id());
+        rtassert!(self.closed);
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use cell::Cell;
+    use container::Container;
+    use iter::Times;
+    use old_iter::BaseIter;
+    use rt::test::*;
+    use rand;
+    use rand::RngUtil;
+    use vec::{CopyableVector, ImmutableVector};
+
+    #[test]
+    fn success_immediately() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let child_latch = latch.new_child();
+            let child_latch = Cell(child_latch);
+            do spawntask_immediately {
+                let child_latch = child_latch.take();
+                assert!(child_latch.wait(true));
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn success_later() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let child_latch = latch.new_child();
+            let child_latch = Cell(child_latch);
+            do spawntask_later {
+                let child_latch = child_latch.take();
+                assert!(child_latch.wait(true));
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_success() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            for 10.times {
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_random {
+                    let child_latch = child_latch.take();
+                    assert!(child_latch.wait(true));
+                }
+            }
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_failure() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            let spawn = |status| {
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_random {
+                    let child_latch = child_latch.take();
+                    child_latch.wait(status);
+                }
+            };
+
+            for 10.times { spawn(true) }
+            spawn(false);
+            for 10.times { spawn(true) }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_multi_level_success() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            fn child(latch: &mut JoinLatch, i: int) {
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_random {
+                    let mut child_latch = child_latch.take();
+                    if i != 0 {
+                        child(&mut *child_latch, i - 1);
+                        child_latch.wait(true);
+                    } else {
+                        child_latch.wait(true);
+                    }
+                }
+            }
+
+            child(&mut *latch, 10);
+
+            assert!(latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn mt_multi_level_failure() {
+        do run_in_mt_newsched_task {
+            let mut latch = JoinLatch::new_root();
+
+            fn child(latch: &mut JoinLatch, i: int) {
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_random {
+                    let mut child_latch = child_latch.take();
+                    if i != 0 {
+                        child(&mut *child_latch, i - 1);
+                        child_latch.wait(false);
+                    } else {
+                        child_latch.wait(true);
+                    }
+                }
+            }
+
+            child(&mut *latch, 10);
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell(child_latch);
+
+            do spawntask_immediately {
+                let latch = child_latch.take();
+                latch.release(false);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_tombstone() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell(child_latch);
+
+            do spawntask_immediately {
+                let mut latch = child_latch.take();
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_later {
+                    let latch = child_latch.take();
+                    latch.release(false);
+                }
+                latch.release(true);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_no_tombstone() {
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let child_latch = latch.new_child();
+            let child_latch = Cell(child_latch);
+
+            do spawntask_later {
+                let mut latch = child_latch.take();
+                let child_latch = latch.new_child();
+                let child_latch = Cell(child_latch);
+                do spawntask_immediately {
+                    let latch = child_latch.take();
+                    latch.release(false);
+                }
+                latch.release(true);
+            }
+
+            assert!(!latch.wait(true));
+        }
+    }
+
+    #[test]
+    fn release_child_tombstone_stress() {
+        fn rand_orders() -> ~[bool] {
+            let mut v = ~[false,.. 5];
+            v[0] = true;
+            let mut rng = rand::rng();
+            return rng.shuffle(v);
+        }
+
+        fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) {
+            if orders.is_empty() {
+                return (~[], ~[]);
+            } else if orders.len() <= 2 {
+                return (orders.to_owned(), ~[]);
+            }
+            let mut rng = rand::rng();
+            let n = rng.gen_uint_range(1, orders.len());
+            let first = orders.slice(0, n).to_owned();
+            let last = orders.slice(n, orders.len()).to_owned();
+            assert!(first.len() + last.len() == orders.len());
+            return (first, last);
+        }
+
+        for stress_factor().times {
+            do run_in_newsched_task {
+                fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) {
+                    let (my_orders, remaining_orders) = split_orders(orders);
+                    rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders));
+                    rtdebug!("depth: %u", depth);
+                    let mut remaining_orders = remaining_orders;
+                    let mut num = 0;
+                    for my_orders.each |&order| {
+                        let child_latch = latch.new_child();
+                        let child_latch = Cell(child_latch);
+                        let (child_orders, remaining) = split_orders(remaining_orders);
+                        rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining));
+                        remaining_orders = remaining;
+                        let child_orders = Cell(child_orders);
+                        let child_num = num;
+                        let _ = child_num; // XXX unused except in rtdebug!
+                        do spawntask_random {
+                            rtdebug!("depth %u num %u", depth, child_num);
+                            let mut child_latch = child_latch.take();
+                            let child_orders = child_orders.take();
+                            doit(&mut *child_latch, child_orders, depth + 1);
+                            child_latch.release(order);
+                        }
+
+                        num += 1;
+                    }
+                }
+
+                let mut latch = JoinLatch::new_root();
+                let orders = rand_orders();
+                rtdebug!("orders: %?", orders);
+
+                doit(&mut *latch, orders, 0);
+
+                assert!(!latch.wait(true));
+            }
+        }
+    }
+
+    #[test]
+    fn whateverman() {
+        struct Order {
+            immediate: bool,
+            succeed: bool,
+            orders: ~[Order]
+        }
+        fn next(latch: &mut JoinLatch, orders: ~[Order]) {
+            for orders.each |order| {
+                let suborders = copy order.orders;
+                let child_latch = Cell(latch.new_child());
+                let succeed = order.succeed;
+                if order.immediate {
+                    do spawntask_immediately {
+                        let mut child_latch = child_latch.take();
+                        next(&mut *child_latch, copy suborders);
+                        rtdebug!("immediate releasing");
+                        child_latch.release(succeed);
+                    }
+                } else {
+                    do spawntask_later {
+                        let mut child_latch = child_latch.take();
+                        next(&mut *child_latch, copy suborders);
+                        rtdebug!("later releasing");
+                        child_latch.release(succeed);
+                    }
+                }
+            }
+        }
+
+        do run_in_newsched_task {
+            let mut latch = JoinLatch::new_root();
+            let orders = ~[ Order { // 0 0
+                immediate: true,
+                succeed: true,
+                orders: ~[ Order { // 1 0
+                    immediate: true,
+                    succeed: false,
+                    orders: ~[ Order { // 2 0
+                        immediate: false,
+                        succeed: false,
+                        orders: ~[ Order { // 3 0
+                            immediate: true,
+                            succeed: false,
+                            orders: ~[]
+                        }, Order { // 3 1
+                            immediate: false,
+                            succeed: false,
+                            orders: ~[]
+                        }]
+                    }]
+                }]
+            }];
+
+            next(&mut *latch, orders);
+            assert!(!latch.wait(true));
+        }
+    }
+}
diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs
index 70e347fdfb6..b0c0fa5d708 100644
--- a/src/libstd/rt/metrics.rs
+++ b/src/libstd/rt/metrics.rs
@@ -34,7 +34,11 @@ pub struct SchedMetrics {
     // Message receives that do not block the receiver
     rendezvous_recvs: uint,
     // Message receives that block the receiver
-    non_rendezvous_recvs: uint
+    non_rendezvous_recvs: uint,
+    // JoinLatch releases that create tombstones
+    release_tombstone: uint,
+    // JoinLatch releases that do not create tombstones
+    release_no_tombstone: uint,
 }
 
 impl SchedMetrics {
@@ -51,7 +55,9 @@ impl SchedMetrics {
             rendezvous_sends: 0,
             non_rendezvous_sends: 0,
             rendezvous_recvs: 0,
-            non_rendezvous_recvs: 0
+            non_rendezvous_recvs: 0,
+            release_tombstone: 0,
+            release_no_tombstone: 0
         }
     }
 }
@@ -70,6 +76,8 @@ impl ToStr for SchedMetrics {
               non_rendezvous_sends: %u\n\
               rendezvous_recvs: %u\n\
               non_rendezvous_recvs: %u\n\
+              release_tombstone: %u\n\
+              release_no_tombstone: %u\n\
               ",
              self.turns,
              self.messages_received,
@@ -82,7 +90,9 @@ impl ToStr for SchedMetrics {
              self.rendezvous_sends,
              self.non_rendezvous_sends,
              self.rendezvous_recvs,
-             self.non_rendezvous_recvs
+             self.non_rendezvous_recvs,
+             self.release_tombstone,
+             self.release_no_tombstone
         )
     }
 }
\ No newline at end of file
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 3198b285876..5f06c1455a4 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -133,6 +133,9 @@ pub mod local_ptr;
 /// Bindings to pthread/windows thread-local storage.
 pub mod thread_local_storage;
 
+/// A concurrent data structure with which parent tasks wait on child tasks.
+pub mod join_latch;
+
 pub mod metrics;
 
 
@@ -164,7 +167,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
     let sleepers = SleeperList::new();
     let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
     sched.no_sleep = true;
-    let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
+    let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main);
 
     sched.enqueue_task(main_task);
     sched.run();
@@ -238,7 +241,7 @@ fn test_context() {
     do run_in_bare_thread {
         assert_eq!(context(), GlobalContext);
         let mut sched = ~new_test_uv_sched();
-        let task = ~do Coroutine::new(&mut sched.stack_pool) {
+        let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
             assert_eq!(context(), TaskContext);
             let sched = Local::take::<Scheduler>();
             do sched.deschedule_running_task_and_then() |sched, task| {
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 3b8a31d1840..fe553467ebd 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -181,8 +181,10 @@ pub impl Scheduler {
         // XXX: Reenable this once we're using a per-task queue. With a shared
         // queue this is not true
         //assert!(sched.work_queue.is_empty());
-//        let out = sched.metrics.to_str();
-//        rtdebug!("scheduler metrics: %s\n", out);
+        rtdebug!("scheduler metrics: %s\n", {
+            use to_str::ToStr;
+            sched.metrics.to_str()
+        });
         return sched;
     }
 
@@ -728,11 +730,11 @@ pub impl Coroutine {
     // using the AnySched paramter.
 
     fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine {
-        Coroutine::with_task_homed(stack_pool, ~Task::new(), start, home)
+        Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home)
     }
 
-    fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
-        Coroutine::with_task(stack_pool, ~Task::new(), start)
+    fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
+        Coroutine::with_task(stack_pool, ~Task::new_root(), start)
     }
 
     fn with_task_homed(stack_pool: &mut StackPool,
@@ -740,7 +742,7 @@ pub impl Coroutine {
                        start: ~fn(),
                        home: SchedHome) -> Coroutine {
 
-        static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+        static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack
 
         let start = Coroutine::build_start_wrapper(start);
         let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
@@ -930,14 +932,14 @@ mod test {
             };
             let t1f = Cell(t1f);
 
-            let t2f = ~do Coroutine::new(&mut normal_sched.stack_pool) {
+            let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
                 let on_special = Coroutine::on_special();
                 rtdebug!("t2 should not be on special: %b", on_special);
                 assert!(!on_special);
             };
             let t2f = Cell(t2f);
 
-            let t3f = ~do Coroutine::new(&mut normal_sched.stack_pool) {
+            let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
                 // not on special
                 let on_special = Coroutine::on_special();
                 rtdebug!("t3 should not be on special: %b", on_special);
@@ -986,7 +988,7 @@ mod test {
             let t4 = Cell(t4);
 
             // build a main task that runs our four tests
-            let main_task = ~do Coroutine::new(&mut normal_sched.stack_pool) {
+            let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) {
                 // the two tasks that require a normal start location
                 t2.take()();
                 t4.take()();
@@ -1141,7 +1143,7 @@ mod test {
             let task_ran_ptr: *mut bool = &mut task_ran;
 
             let mut sched = ~new_test_uv_sched();
-            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 unsafe { *task_ran_ptr = true; }
             };
             sched.enqueue_task(task);
@@ -1159,7 +1161,7 @@ mod test {
 
             let mut sched = ~new_test_uv_sched();
             for int::range(0, total) |_| {
-                let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                     unsafe { *task_count_ptr = *task_count_ptr + 1; }
                 };
                 sched.enqueue_task(task);
@@ -1176,10 +1178,10 @@ mod test {
             let count_ptr: *mut int = &mut count;
 
             let mut sched = ~new_test_uv_sched();
-            let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
+            let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
                 let mut sched = Local::take::<Scheduler>();
-                let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
+                let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) {
                     unsafe { *count_ptr = *count_ptr + 1; }
                 };
                 // Context switch directly to the new task
@@ -1204,7 +1206,7 @@ mod test {
 
             let mut sched = ~new_test_uv_sched();
 
-            let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 run_task(count_ptr);
             };
             sched.enqueue_task(start_task);
@@ -1214,7 +1216,7 @@ mod test {
 
             fn run_task(count_ptr: *mut int) {
                 do Local::borrow::<Scheduler, ()> |sched| {
-                    let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                    let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                         unsafe {
                             *count_ptr = *count_ptr + 1;
                             if *count_ptr != MAX {
@@ -1232,7 +1234,7 @@ mod test {
     fn test_block_task() {
         do run_in_bare_thread {
             let mut sched = ~new_test_uv_sched();
-            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+            let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
                 let sched = Local::take::<Scheduler>();
                 assert!(sched.in_task_context());
                 do sched.deschedule_running_task_and_then() |sched, task| {
@@ -1279,13 +1281,13 @@ mod test {
             let mut sched1 = ~new_test_uv_sched();
             let handle1 = sched1.make_handle();
             let handle1_cell = Cell(handle1);
-            let task1 = ~do Coroutine::new(&mut sched1.stack_pool) {
+            let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) {
                 chan_cell.take().send(());
             };
             sched1.enqueue_task(task1);
 
             let mut sched2 = ~new_test_uv_sched();
-            let task2 = ~do Coroutine::new(&mut sched2.stack_pool) {
+            let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) {
                 port_cell.take().recv();
                 // Release the other scheduler's handle so it can exit
                 handle1_cell.take();
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 06318ac6623..6e4be3c1ef9 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -16,19 +16,23 @@
 use prelude::*;
 use libc::{c_void, uintptr_t};
 use cast::transmute;
+use option::{Option, Some, None};
 use rt::local::Local;
 use super::local_heap::LocalHeap;
 use rt::logging::StdErrLogger;
 use rt::sched::{SchedHome, AnySched};
+use rt::join_latch::JoinLatch;
 
 pub struct Task {
     heap: LocalHeap,
     gc: GarbageCollector,
     storage: LocalStorage,
     logger: StdErrLogger,
-    unwinder: Option<Unwinder>,
-    destroyed: bool,
-    home: Option<SchedHome>
+    unwinder: Unwinder,
+    home: Option<SchedHome>,
+    join_latch: Option<~JoinLatch>,
+    on_exit: Option<~fn(bool)>,
+    destroyed: bool
 }
 
 pub struct GarbageCollector;
@@ -39,27 +43,31 @@ pub struct Unwinder {
 }
 
 impl Task {
-    pub fn new() -> Task {
+    pub fn new_root() -> Task {
         Task {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
-            unwinder: Some(Unwinder { unwinding: false }),
-            destroyed: false,
-            home: Some(AnySched)
+            unwinder: Unwinder { unwinding: false },
+            home: Some(AnySched),
+            join_latch: Some(JoinLatch::new_root()),
+            on_exit: None,
+            destroyed: false
         }
     }
 
-    pub fn without_unwinding() -> Task {
+    pub fn new_child(&mut self) -> Task {
         Task {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
-            unwinder: None,
-            destroyed: false,
-            home: Some(AnySched)
+            home: Some(AnySched),
+            unwinder: Unwinder { unwinding: false },
+            join_latch: Some(self.join_latch.get_mut_ref().new_child()),
+            on_exit: None,
+            destroyed: false
         }
     }
 
@@ -74,20 +82,24 @@ impl Task {
             assert!(ptr::ref_eq(task, self));
         }
 
-        match self.unwinder {
-            Some(ref mut unwinder) => {
-                // If there's an unwinder then set up the catch block
-                unwinder.try(f);
+        self.unwinder.try(f);
+        self.destroy();
+
+        // Wait for children. Possibly report the exit status.
+        let local_success = !self.unwinder.unwinding;
+        let join_latch = self.join_latch.swap_unwrap();
+        match self.on_exit {
+            Some(ref on_exit) => {
+                let success = join_latch.wait(local_success);
+                (*on_exit)(success);
             }
             None => {
-                // Otherwise, just run the body
-                f()
+                join_latch.release(local_success);
             }
         }
-        self.destroy();
     }
 
-    /// Must be called manually before finalization to clean up
+    /// must be called manually before finalization to clean up
     /// thread-local resources. Some of the routines here expect
     /// Task to be available recursively so this must be
     /// called unsafely, without removing Task from
@@ -233,5 +245,15 @@ mod test {
             assert!(port.recv() == 10);
         }
     }
+
+    #[test]
+    fn linked_failure() {
+        do run_in_newsched_task() {
+            let res = do spawntask_try {
+                spawntask_random(|| fail!());
+            };
+            assert!(res.is_err());
+        }
+    }
 }
 
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index bb284c02541..d35d01cf719 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -18,6 +18,7 @@ use vec::OwnedVector;
 use result::{Result, Ok, Err};
 use unstable::run_in_bare_thread;
 use super::io::net::ip::{IpAddr, Ipv4};
+use rt::comm::oneshot;
 use rt::task::Task;
 use rt::thread::Thread;
 use rt::local::Local;
@@ -47,8 +48,11 @@ pub fn run_in_newsched_task(f: ~fn()) {
 
     do run_in_bare_thread {
         let mut sched = ~new_test_uv_sched();
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
+        new_task.on_exit = Some(on_exit);
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                         ~Task::without_unwinding(),
+                                         new_task,
                                          f.take());
         sched.enqueue_task(task);
         sched.run();
@@ -95,16 +99,20 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
         let f_cell = Cell(f_cell.take());
         let handles = Cell(handles);
-        let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) {
-            f_cell.take()();
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| {
 
             let mut handles = handles.take();
             // Tell schedulers to exit
             for handles.each_mut |handle| {
                 handle.send(Shutdown);
             }
-        };
 
+            rtassert!(exit_status);
+        };
+        new_task.on_exit = Some(on_exit);
+        let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
+                                              new_task, f_cell.take());
         scheds[0].enqueue_task(main_task);
 
         let mut threads = ~[];
@@ -201,7 +209,7 @@ pub fn run_in_mt_newsched_task_random_homed() {
 
         rtdebug!("creating main task");
 
-        let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) {
+        let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
             f_cell.take()();
             let mut handles = handles.take();
             // Tell schedulers to exit
@@ -245,10 +253,13 @@ pub fn spawntask(f: ~fn()) {
     use super::sched::*;
 
     rtdebug!("spawntask taking the scheduler from TLS")
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
     rtdebug!("spawntask scheduling the new task");
     sched.schedule_task(task);
 }
@@ -257,10 +268,13 @@ pub fn spawntask(f: ~fn()) {
 pub fn spawntask_immediately(f: ~fn()) {
     use super::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
     do sched.switch_running_tasks_and_then(task) |sched, task| {
         sched.enqueue_task(task);
     }
@@ -270,10 +284,13 @@ pub fn spawntask_immediately(f: ~fn()) {
 pub fn spawntask_later(f: ~fn()) {
     use super::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
 
     sched.enqueue_task(task);
     Local::put(sched);
@@ -284,13 +301,16 @@ pub fn spawntask_random(f: ~fn()) {
     use super::sched::*;
     use rand::{Rand, rng};
 
-    let mut rng = rng();
-    let run_now: bool = Rand::rand(&mut rng);
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
 
     let mut sched = Local::take::<Scheduler>();
     let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                     ~Task::without_unwinding(),
-                                     f);
+                                     task, f);
+
+    let mut rng = rng();
+    let run_now: bool = Rand::rand(&mut rng);
 
     if run_now {
         do sched.switch_running_tasks_and_then(task) |sched, task| {
@@ -327,7 +347,7 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
         };
 
         ~Coroutine::with_task_homed(&mut sched.stack_pool,
-                                    ~Task::without_unwinding(),
+                                    ~Task::new_root(),
                                     af,
                                     Sched(handle))
     };
@@ -340,47 +360,37 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
 pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     use cell::Cell;
     use super::sched::*;
-    use task;
-    use unstable::finally::Finally;
-
-    // Our status variables will be filled in from the scheduler context
-    let mut failed = false;
-    let failed_ptr: *mut bool = &mut failed;
-
-    // Switch to the scheduler
-    let f = Cell(Cell(f));
-    let sched = Local::take::<Scheduler>();
-    do sched.deschedule_running_task_and_then() |sched, old_task| {
-        let old_task = Cell(old_task);
-        let f = f.take();
-        let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
-            do (|| {
-                (f.take())()
-            }).finally {
-                // Check for failure then resume the parent task
-                unsafe { *failed_ptr = task::failing(); }
-                let sched = Local::take::<Scheduler>();
-                do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
-                    sched.enqueue_task(new_task);
-                }
-            }
-        };
 
-        sched.enqueue_task(new_task);
+    let (port, chan) = oneshot();
+    let chan = Cell(chan);
+    let mut new_task = ~Task::new_root();
+    let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
+    new_task.on_exit = Some(on_exit);
+    let mut sched = Local::take::<Scheduler>();
+    let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
+                                         new_task, f);
+    do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
+        sched.enqueue_task(old_task);
     }
 
-    if !failed { Ok(()) } else { Err(()) }
+    let exit_status = port.recv();
+    if exit_status { Ok(()) } else { Err(()) }
 }
 
 // Spawn a new task in a new scheduler and return a thread handle.
 pub fn spawntask_thread(f: ~fn()) -> Thread {
     use rt::sched::*;
 
+    let task = do Local::borrow::<Task, ~Task>() |running_task| {
+        ~running_task.new_child()
+    };
+
+    let task = Cell(task);
     let f = Cell(f);
     let thread = do Thread::start {
         let mut sched = ~new_test_uv_sched();
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                         ~Task::without_unwinding(),
+                                         task.take(),
                                          f.take());
         sched.enqueue_task(task);
         sched.run();