diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-06-15 19:31:46 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-06-15 19:31:46 -0700 |
| commit | 3208fc36bf2c7e99451e21171f82dafef2ea51dc (patch) | |
| tree | 0ac94c64e1601824d3e326964580a3b84295737a /src | |
| parent | b08c4467980bc712995d421dd50c1cca2948b67b (diff) | |
| parent | 505ef7e710ff890c0027fadad54997041b7ee93b (diff) | |
| download | rust-3208fc36bf2c7e99451e21171f82dafef2ea51dc.tar.gz rust-3208fc36bf2c7e99451e21171f82dafef2ea51dc.zip | |
Merge remote-tracking branch 'brson/io-wip' into io
Conflicts: src/libstd/rt/sched.rs src/libstd/rt/task.rs src/libstd/rt/test.rs src/libstd/task/mod.rs src/libstd/task/spawn.rs
Diffstat (limited to 'src')
| -rw-r--r-- | src/libstd/rt/join_latch.rs | 645 | ||||
| -rw-r--r-- | src/libstd/rt/metrics.rs | 16 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 7 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 38 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 60 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 98 | ||||
| -rw-r--r-- | src/libstd/sys.rs | 6 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 15 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 8 |
9 files changed, 788 insertions, 105 deletions
diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs new file mode 100644 index 00000000000..b501699509e --- /dev/null +++ b/src/libstd/rt/join_latch.rs @@ -0,0 +1,645 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! The JoinLatch is a concurrent type that establishes the task +//! tree and propagates failure. +//! +//! Each task gets a JoinLatch that is derived from the JoinLatch +//! of its parent task. Every latch must be released by either calling +//! the non-blocking `release` method or the task-blocking `wait` method. +//! Releasing a latch does not complete until all of its child latches +//! complete. +//! +//! Latches carry a `success` flag that is set to `false` during task +//! failure and is propagated both from children to parents and parents +//! to children. The status af this flag may be queried for the purposes +//! of linked failure. +//! +//! In addition to failure propagation the task tree serves to keep the +//! default task schedulers alive. The runtime only sends the shutdown +//! message to schedulers once the root task exits. +//! +//! Under this scheme tasks that terminate before their children become +//! 'zombies' since they may not exit until their children do. Zombie +//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks +//! themselves allowed to terminate. +//! +//! XXX: Propagate flag from parents to children. +//! XXX: Tombstoning actually doesn't work. +//! XXX: This could probably be done in a way that doesn't leak tombstones +//! longer than the life of the child tasks. + +use comm::{GenericPort, Peekable, GenericSmartChan}; +use clone::Clone; +use container::Container; +use option::{Option, Some, None}; +use ops::Drop; +use rt::comm::{SharedChan, Port, stream}; +use rt::local::Local; +use rt::sched::Scheduler; +use unstable::atomics::{AtomicUint, SeqCst}; +use util; +use vec::OwnedVector; + +// FIXME #7026: Would prefer this to be an enum +pub struct JoinLatch { + priv parent: Option<ParentLink>, + priv child: Option<ChildLink>, + closed: bool, +} + +// Shared between parents and all their children. +struct SharedState { + /// Reference count, held by a parent and all children. + count: AtomicUint, + success: bool +} + +struct ParentLink { + shared: *mut SharedState, + // For communicating with the parent. + chan: SharedChan<Message> +} + +struct ChildLink { + shared: ~SharedState, + // For receiving from children. + port: Port<Message>, + chan: SharedChan<Message>, + // Prevents dropping the child SharedState reference counts multiple times. + dropped_child: bool +} + +// Messages from child latches to parent. +enum Message { + Tombstone(~JoinLatch), + ChildrenTerminated +} + +impl JoinLatch { + pub fn new_root() -> ~JoinLatch { + let this = ~JoinLatch { + parent: None, + child: None, + closed: false + }; + rtdebug!("new root latch %x", this.id()); + return this; + } + + fn id(&self) -> uint { + unsafe { ::cast::transmute(&*self) } + } + + pub fn new_child(&mut self) -> ~JoinLatch { + rtassert!(!self.closed); + + if self.child.is_none() { + // This is the first time spawning a child + let shared = ~SharedState { + count: AtomicUint::new(1), + success: true + }; + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let child = ChildLink { + shared: shared, + port: port, + chan: chan, + dropped_child: false + }; + self.child = Some(child); + } + + let child_link: &mut ChildLink = self.child.get_mut_ref(); + let shared_state: *mut SharedState = &mut *child_link.shared; + + child_link.shared.count.fetch_add(1, SeqCst); + + let child = ~JoinLatch { + parent: Some(ParentLink { + shared: shared_state, + chan: child_link.chan.clone() + }), + child: None, + closed: false + }; + rtdebug!("NEW child latch %x", child.id()); + return child; + } + + pub fn release(~self, local_success: bool) { + // XXX: This should not block, but there's a bug in the below + // code that I can't figure out. + self.wait(local_success); + } + + // XXX: Should not require ~self + fn release_broken(~self, local_success: bool) { + rtassert!(!self.closed); + + rtdebug!("releasing %x", self.id()); + + let id = self.id(); + let _ = id; // XXX: `id` is only used in debug statements so appears unused + let mut this = self; + let mut child_success = true; + let mut children_done = false; + + if this.child.is_some() { + rtdebug!("releasing children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u %x", last_count, id); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + let mut tombstones = ~[]; + loop { + if child_link.port.peek() { + match child_link.port.recv() { + Tombstone(t) => { + tombstones.push(t); + }, + ChildrenTerminated => { + children_done = true; + break; + } + } + } else { + break + } + } + + rtdebug!("releasing %u tombstones %x", tombstones.len(), id); + + // Try to release the tombstones. Those that still have + // outstanding will be re-enqueued. When this task's + // parents release their latch we'll end up back here + // trying them again. + while !tombstones.is_empty() { + tombstones.pop().release(true); + } + + if children_done { + let count = shared.count.load(SeqCst); + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + } else { + children_done = true; + } + + let total_success = local_success && child_success; + + rtassert!(this.parent.is_some()); + + unsafe { + { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + } + + if children_done { + rtdebug!("children done"); + do Local::borrow::<Scheduler, ()> |sched| { + sched.metrics.release_tombstone += 1; + } + { + rtdebug!("RELEASING parent %x", id); + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u %x", last_count, id); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + this.closed = true; + util::ignore(this); + } else { + rtdebug!("children not done"); + rtdebug!("TOMBSTONING %x", id); + do Local::borrow::<Scheduler, ()> |sched| { + sched.metrics.release_no_tombstone += 1; + } + let chan = { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + parent_link.chan.clone() + }; + assert!(chan.try_send(Tombstone(this))); + } + } + } + + // XXX: Should not require ~self + pub fn wait(~self, local_success: bool) -> bool { + rtassert!(!self.closed); + + rtdebug!("WAITING %x", self.id()); + + let mut this = self; + let mut child_success = true; + + if this.child.is_some() { + rtdebug!("waiting for children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u", last_count); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + loop { + match child_link.port.recv() { + Tombstone(t) => { + t.wait(true); + } + ChildrenTerminated => break + } + } + + let count = shared.count.load(SeqCst); + if count != 0 { ::io::println(fmt!("%u", count)); } + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + + let total_success = local_success && child_success; + + if this.parent.is_some() { + rtdebug!("releasing parent"); + unsafe { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u", last_count); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + } + + this.closed = true; + util::ignore(this); + + return total_success; + } +} + +impl Drop for JoinLatch { + fn finalize(&self) { + rtdebug!("DESTROYING %x", self.id()); + rtassert!(self.closed); + } +} + +#[cfg(test)] +mod test { + use super::*; + use cell::Cell; + use container::Container; + use iter::Times; + use old_iter::BaseIter; + use rt::test::*; + use rand; + use rand::RngUtil; + use vec::{CopyableVector, ImmutableVector}; + + #[test] + fn success_immediately() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn success_later() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + for 10.times { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + let spawn = |status| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + child_latch.wait(status); + } + }; + + for 10.times { spawn(true) } + spawn(false); + for 10.times { spawn(true) } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(true); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(false); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_no_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_later { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone_stress() { + fn rand_orders() -> ~[bool] { + let mut v = ~[false,.. 5]; + v[0] = true; + let mut rng = rand::rng(); + return rng.shuffle(v); + } + + fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) { + if orders.is_empty() { + return (~[], ~[]); + } else if orders.len() <= 2 { + return (orders.to_owned(), ~[]); + } + let mut rng = rand::rng(); + let n = rng.gen_uint_range(1, orders.len()); + let first = orders.slice(0, n).to_owned(); + let last = orders.slice(n, orders.len()).to_owned(); + assert!(first.len() + last.len() == orders.len()); + return (first, last); + } + + for stress_factor().times { + do run_in_newsched_task { + fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) { + let (my_orders, remaining_orders) = split_orders(orders); + rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders)); + rtdebug!("depth: %u", depth); + let mut remaining_orders = remaining_orders; + let mut num = 0; + for my_orders.each |&order| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + let (child_orders, remaining) = split_orders(remaining_orders); + rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining)); + remaining_orders = remaining; + let child_orders = Cell(child_orders); + let child_num = num; + let _ = child_num; // XXX unused except in rtdebug! + do spawntask_random { + rtdebug!("depth %u num %u", depth, child_num); + let mut child_latch = child_latch.take(); + let child_orders = child_orders.take(); + doit(&mut *child_latch, child_orders, depth + 1); + child_latch.release(order); + } + + num += 1; + } + } + + let mut latch = JoinLatch::new_root(); + let orders = rand_orders(); + rtdebug!("orders: %?", orders); + + doit(&mut *latch, orders, 0); + + assert!(!latch.wait(true)); + } + } + } + + #[test] + fn whateverman() { + struct Order { + immediate: bool, + succeed: bool, + orders: ~[Order] + } + fn next(latch: &mut JoinLatch, orders: ~[Order]) { + for orders.each |order| { + let suborders = copy order.orders; + let child_latch = Cell(latch.new_child()); + let succeed = order.succeed; + if order.immediate { + do spawntask_immediately { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("immediate releasing"); + child_latch.release(succeed); + } + } else { + do spawntask_later { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("later releasing"); + child_latch.release(succeed); + } + } + } + } + + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let orders = ~[ Order { // 0 0 + immediate: true, + succeed: true, + orders: ~[ Order { // 1 0 + immediate: true, + succeed: false, + orders: ~[ Order { // 2 0 + immediate: false, + succeed: false, + orders: ~[ Order { // 3 0 + immediate: true, + succeed: false, + orders: ~[] + }, Order { // 3 1 + immediate: false, + succeed: false, + orders: ~[] + }] + }] + }] + }]; + + next(&mut *latch, orders); + assert!(!latch.wait(true)); + } + } +} diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs index 70e347fdfb6..b0c0fa5d708 100644 --- a/src/libstd/rt/metrics.rs +++ b/src/libstd/rt/metrics.rs @@ -34,7 +34,11 @@ pub struct SchedMetrics { // Message receives that do not block the receiver rendezvous_recvs: uint, // Message receives that block the receiver - non_rendezvous_recvs: uint + non_rendezvous_recvs: uint, + // JoinLatch releases that create tombstones + release_tombstone: uint, + // JoinLatch releases that do not create tombstones + release_no_tombstone: uint, } impl SchedMetrics { @@ -51,7 +55,9 @@ impl SchedMetrics { rendezvous_sends: 0, non_rendezvous_sends: 0, rendezvous_recvs: 0, - non_rendezvous_recvs: 0 + non_rendezvous_recvs: 0, + release_tombstone: 0, + release_no_tombstone: 0 } } } @@ -70,6 +76,8 @@ impl ToStr for SchedMetrics { non_rendezvous_sends: %u\n\ rendezvous_recvs: %u\n\ non_rendezvous_recvs: %u\n\ + release_tombstone: %u\n\ + release_no_tombstone: %u\n\ ", self.turns, self.messages_received, @@ -82,7 +90,9 @@ impl ToStr for SchedMetrics { self.rendezvous_sends, self.non_rendezvous_sends, self.rendezvous_recvs, - self.non_rendezvous_recvs + self.non_rendezvous_recvs, + self.release_tombstone, + self.release_no_tombstone ) } } \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 3198b285876..5f06c1455a4 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -133,6 +133,9 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +/// A concurrent data structure with which parent tasks wait on child tasks. +pub mod join_latch; + pub mod metrics; @@ -164,7 +167,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { let sleepers = SleeperList::new(); let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); sched.no_sleep = true; - let main_task = ~Coroutine::new(&mut sched.stack_pool, main); + let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main); sched.enqueue_task(main_task); sched.run(); @@ -238,7 +241,7 @@ fn test_context() { do run_in_bare_thread { assert_eq!(context(), GlobalContext); let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::<Scheduler>(); do sched.deschedule_running_task_and_then() |sched, task| { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 3b8a31d1840..fe553467ebd 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -181,8 +181,10 @@ pub impl Scheduler { // XXX: Reenable this once we're using a per-task queue. With a shared // queue this is not true //assert!(sched.work_queue.is_empty()); -// let out = sched.metrics.to_str(); -// rtdebug!("scheduler metrics: %s\n", out); + rtdebug!("scheduler metrics: %s\n", { + use to_str::ToStr; + sched.metrics.to_str() + }); return sched; } @@ -728,11 +730,11 @@ pub impl Coroutine { // using the AnySched paramter. fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine { - Coroutine::with_task_homed(stack_pool, ~Task::new(), start, home) + Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home) } - fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - Coroutine::with_task(stack_pool, ~Task::new(), start) + fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { + Coroutine::with_task(stack_pool, ~Task::new_root(), start) } fn with_task_homed(stack_pool: &mut StackPool, @@ -740,7 +742,7 @@ pub impl Coroutine { start: ~fn(), home: SchedHome) -> Coroutine { - static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); @@ -930,14 +932,14 @@ mod test { }; let t1f = Cell(t1f); - let t2f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { let on_special = Coroutine::on_special(); rtdebug!("t2 should not be on special: %b", on_special); assert!(!on_special); }; let t2f = Cell(t2f); - let t3f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { // not on special let on_special = Coroutine::on_special(); rtdebug!("t3 should not be on special: %b", on_special); @@ -986,7 +988,7 @@ mod test { let t4 = Cell(t4); // build a main task that runs our four tests - let main_task = ~do Coroutine::new(&mut normal_sched.stack_pool) { + let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { // the two tasks that require a normal start location t2.take()(); t4.take()(); @@ -1141,7 +1143,7 @@ mod test { let task_ran_ptr: *mut bool = &mut task_ran; let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; sched.enqueue_task(task); @@ -1159,7 +1161,7 @@ mod test { let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } }; sched.enqueue_task(task); @@ -1176,10 +1178,10 @@ mod test { let count_ptr: *mut int = &mut count; let mut sched = ~new_test_uv_sched(); - let task1 = ~do Coroutine::new(&mut sched.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::<Scheduler>(); - let task2 = ~do Coroutine::new(&mut sched.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task @@ -1204,7 +1206,7 @@ mod test { let mut sched = ~new_test_uv_sched(); - let start_task = ~do Coroutine::new(&mut sched.stack_pool) { + let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) { run_task(count_ptr); }; sched.enqueue_task(start_task); @@ -1214,7 +1216,7 @@ mod test { fn run_task(count_ptr: *mut int) { do Local::borrow::<Scheduler, ()> |sched| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; if *count_ptr != MAX { @@ -1232,7 +1234,7 @@ mod test { fn test_block_task() { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { let sched = Local::take::<Scheduler>(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { @@ -1279,13 +1281,13 @@ mod test { let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell(handle1); - let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) { chan_cell.take().send(()); }; sched1.enqueue_task(task1); let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit handle1_cell.take(); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 06318ac6623..6e4be3c1ef9 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -16,19 +16,23 @@ use prelude::*; use libc::{c_void, uintptr_t}; use cast::transmute; +use option::{Option, Some, None}; use rt::local::Local; use super::local_heap::LocalHeap; use rt::logging::StdErrLogger; use rt::sched::{SchedHome, AnySched}; +use rt::join_latch::JoinLatch; pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, - unwinder: Option<Unwinder>, - destroyed: bool, - home: Option<SchedHome> + unwinder: Unwinder, + home: Option<SchedHome>, + join_latch: Option<~JoinLatch>, + on_exit: Option<~fn(bool)>, + destroyed: bool } pub struct GarbageCollector; @@ -39,27 +43,31 @@ pub struct Unwinder { } impl Task { - pub fn new() -> Task { + pub fn new_root() -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), - destroyed: false, - home: Some(AnySched) + unwinder: Unwinder { unwinding: false }, + home: Some(AnySched), + join_latch: Some(JoinLatch::new_root()), + on_exit: None, + destroyed: false } } - pub fn without_unwinding() -> Task { + pub fn new_child(&mut self) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: None, - destroyed: false, - home: Some(AnySched) + home: Some(AnySched), + unwinder: Unwinder { unwinding: false }, + join_latch: Some(self.join_latch.get_mut_ref().new_child()), + on_exit: None, + destroyed: false } } @@ -74,20 +82,24 @@ impl Task { assert!(ptr::ref_eq(task, self)); } - match self.unwinder { - Some(ref mut unwinder) => { - // If there's an unwinder then set up the catch block - unwinder.try(f); + self.unwinder.try(f); + self.destroy(); + + // Wait for children. Possibly report the exit status. + let local_success = !self.unwinder.unwinding; + let join_latch = self.join_latch.swap_unwrap(); + match self.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); } None => { - // Otherwise, just run the body - f() + join_latch.release(local_success); } } - self.destroy(); } - /// Must be called manually before finalization to clean up + /// must be called manually before finalization to clean up /// thread-local resources. Some of the routines here expect /// Task to be available recursively so this must be /// called unsafely, without removing Task from @@ -233,5 +245,15 @@ mod test { assert!(port.recv() == 10); } } + + #[test] + fn linked_failure() { + do run_in_newsched_task() { + let res = do spawntask_try { + spawntask_random(|| fail!()); + }; + assert!(res.is_err()); + } + } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index bb284c02541..d35d01cf719 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -18,6 +18,7 @@ use vec::OwnedVector; use result::{Result, Ok, Err}; use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; +use rt::comm::oneshot; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; @@ -47,8 +48,11 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + new_task.on_exit = Some(on_exit); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + new_task, f.take()); sched.enqueue_task(task); sched.run(); @@ -95,16 +99,20 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell(f_cell.take()); let handles = Cell(handles); - let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { - f_cell.take()(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); // Tell schedulers to exit for handles.each_mut |handle| { handle.send(Shutdown); } - }; + rtassert!(exit_status); + }; + new_task.on_exit = Some(on_exit); + let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, + new_task, f_cell.take()); scheds[0].enqueue_task(main_task); let mut threads = ~[]; @@ -201,7 +209,7 @@ pub fn run_in_mt_newsched_task_random_homed() { rtdebug!("creating main task"); - let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { f_cell.take()(); let mut handles = handles.take(); // Tell schedulers to exit @@ -245,10 +253,13 @@ pub fn spawntask(f: ~fn()) { use super::sched::*; rtdebug!("spawntask taking the scheduler from TLS") + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); rtdebug!("spawntask scheduling the new task"); sched.schedule_task(task); } @@ -257,10 +268,13 @@ pub fn spawntask(f: ~fn()) { pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); } @@ -270,10 +284,13 @@ pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); sched.enqueue_task(task); Local::put(sched); @@ -284,13 +301,16 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); + + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); if run_now { do sched.switch_running_tasks_and_then(task) |sched, task| { @@ -327,7 +347,7 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { }; ~Coroutine::with_task_homed(&mut sched.stack_pool, - ~Task::without_unwinding(), + ~Task::new_root(), af, Sched(handle)) }; @@ -340,47 +360,37 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; use super::sched::*; - use task; - use unstable::finally::Finally; - - // Our status variables will be filled in from the scheduler context - let mut failed = false; - let failed_ptr: *mut bool = &mut failed; - - // Switch to the scheduler - let f = Cell(Cell(f)); - let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |sched, old_task| { - let old_task = Cell(old_task); - let f = f.take(); - let new_task = ~do Coroutine::new(&mut sched.stack_pool) { - do (|| { - (f.take())() - }).finally { - // Check for failure then resume the parent task - unsafe { *failed_ptr = task::failing(); } - let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { - sched.enqueue_task(new_task); - } - } - }; - sched.enqueue_task(new_task); + let (port, chan) = oneshot(); + let chan = Cell(chan); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); + new_task.on_exit = Some(on_exit); + let mut sched = Local::take::<Scheduler>(); + let new_task = ~Coroutine::with_task(&mut sched.stack_pool, + new_task, f); + do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { + sched.enqueue_task(old_task); } - if !failed { Ok(()) } else { Err(()) } + let exit_status = port.recv(); + if exit_status { Ok(()) } else { Err(()) } } // Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + + let task = Cell(task); let f = Cell(f); let thread = do Thread::start { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.take(), f.take()); sched.enqueue_task(task); sched.run(); diff --git a/src/libstd/sys.rs b/src/libstd/sys.rs index 137070ce202..77085d19567 100644 --- a/src/libstd/sys.rs +++ b/src/libstd/sys.rs @@ -226,11 +226,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { gc::cleanup_stack_for_failure(); let task = Local::unsafe_borrow::<Task>(); - let unwinder: &mut Option<Unwinder> = &mut (*task).unwinder; - match *unwinder { - Some(ref mut unwinder) => unwinder.begin_unwind(), - None => abort!("failure without unwinder. aborting process") - } + (*task).unwinder.begin_unwind(); } } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index df5b88207ec..dbfb9baefa7 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -513,20 +513,9 @@ pub fn failing() -> bool { } } _ => { - let mut unwinding = false; - do Local::borrow::<Task, ()> |local| { - unwinding = match local.unwinder { - Some(unwinder) => { - unwinder.unwinding - } - None => { - // Because there is no unwinder we can't be unwinding. - // (The process will abort on failure) - false - } - } + do Local::borrow::<Task, bool> |local| { + local.unwinder.unwinding } - return unwinding; } } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 5e507238f67..bff4aa1498d 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -91,6 +91,7 @@ use uint; use util; use unstable::sync::{Exclusive, exclusive}; use rt::local::Local; +use rt::task::Task; #[cfg(test)] use task::default_task_opts; @@ -576,8 +577,13 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { use rt::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); - let task = ~Coroutine::new(&mut sched.stack_pool, f); + let task = ~Coroutine::with_task(&mut sched.stack_pool, + task, f); sched.schedule_task(task); } |
