diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-06-02 01:55:22 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-06-13 23:18:45 -0700 |
| commit | abc3a8aa1e76f3ecc3930e20453a52681843cec0 (patch) | |
| tree | 4bb0825a5c2df977c6238a7f9eaa64686284f3b3 /src/libstd | |
| parent | d83d38c7fe3408848664de66a9a53587f627a01b (diff) | |
| download | rust-abc3a8aa1e76f3ecc3930e20453a52681843cec0.tar.gz rust-abc3a8aa1e76f3ecc3930e20453a52681843cec0.zip | |
std::rt: Add JoinLatch
This is supposed to be an efficient way to link the lifetimes of tasks into a tree. JoinLatches form a tree and when `release` is called they wait on children then signal the parent. This structure creates zombie tasks which currently keep the entire task allocated. Zombie tasks are supposed to be tombstoned but that code does not work correctly.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/rt/join_latch.rs | 645 | ||||
| -rw-r--r-- | src/libstd/rt/metrics.rs | 16 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 5 |
4 files changed, 665 insertions, 4 deletions
diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs new file mode 100644 index 00000000000..6ffba992fdf --- /dev/null +++ b/src/libstd/rt/join_latch.rs @@ -0,0 +1,645 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! The JoinLatch is a concurrent type that establishes the task +//! tree and propagates failure. +//! +//! Each task gets a JoinLatch that is derived from the JoinLatch +//! of its parent task. Every latch must be released by either calling +//! the non-blocking `release` method or the task-blocking `wait` method. +//! Releasing a latch does not complete until all of its child latches +//! complete. +//! +//! Latches carry a `success` flag that is set to `false` during task +//! failure and is propagated both from children to parents and parents +//! to children. The status af this flag may be queried for the purposes +//! of linked failure. +//! +//! In addition to failure propagation the task tree serves to keep the +//! default task schedulers alive. The runtime only sends the shutdown +//! message to schedulers once the root task exits. +//! +//! Under this scheme tasks that terminate before their children become +//! 'zombies' since they may not exit until their children do. Zombie +//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks +//! themselves allowed to terminate. +//! +//! XXX: Propagate flag from parents to children. +//! XXX: Tombstoning actually doesn't work. +//! XXX: This could probably be done in a way that doesn't leak tombstones +//! longer than the life of the child tasks. + +use comm::{GenericPort, Peekable, GenericSmartChan}; +use clone::Clone; +use container::Container; +use option::{Option, Some, None}; +use ops::Drop; +use rt::comm::{SharedChan, Port, stream}; +use rt::local::Local; +use rt::sched::Scheduler; +use unstable::atomics::{AtomicUint, SeqCst}; +use util; +use vec::OwnedVector; + +// FIXME #7026: Would prefer this to be an enum +pub struct JoinLatch { + priv parent: Option<ParentLink>, + priv child: Option<ChildLink>, + closed: bool, +} + +// Shared between parents and all their children. +struct SharedState { + /// Reference count, held by a parent and all children. + count: AtomicUint, + success: bool +} + +struct ParentLink { + shared: *mut SharedState, + // For communicating with the parent. + chan: SharedChan<Message> +} + +struct ChildLink { + shared: ~SharedState, + // For receiving from children. + port: Port<Message>, + chan: SharedChan<Message>, + // Prevents dropping the child SharedState reference counts multiple times. + dropped_child: bool +} + +// Messages from child latches to parent. +enum Message { + Tombstone(~JoinLatch), + ChildrenTerminated +} + +impl JoinLatch { + pub fn new_root() -> ~JoinLatch { + let this = ~JoinLatch { + parent: None, + child: None, + closed: false + }; + rtdebug!("new root latch %x", this.id()); + return this; + } + + fn id(&self) -> uint { + unsafe { ::cast::transmute(&*self) } + } + + pub fn new_child(&mut self) -> ~JoinLatch { + rtassert!(!self.closed); + + if self.child.is_none() { + // This is the first time spawning a child + let shared = ~SharedState { + count: AtomicUint::new(1), + success: true + }; + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let child = ChildLink { + shared: shared, + port: port, + chan: chan, + dropped_child: false + }; + self.child = Some(child); + } + + let child_link: &mut ChildLink = self.child.get_mut_ref(); + let shared_state: *mut SharedState = &mut *child_link.shared; + + child_link.shared.count.fetch_add(1, SeqCst); + + let child = ~JoinLatch { + parent: Some(ParentLink { + shared: shared_state, + chan: child_link.chan.clone() + }), + child: None, + closed: false + }; + rtdebug!("NEW child latch %x", child.id()); + return child; + } + + pub fn release(~self, local_success: bool) { + // XXX: This should not block, but there's a bug in the below + // code that I can't figure out. + self.wait(local_success); + } + + // XXX: Should not require ~self + fn release_broken(~self, local_success: bool) { + rtassert!(!self.closed); + + rtdebug!("releasing %x", self.id()); + + let id = self.id(); + let _ = id; // XXX: `id` is only used in debug statements so appears unused + let mut this = self; + let mut child_success = true; + let mut children_done = false; + + if this.child.is_some() { + rtdebug!("releasing children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u %x", last_count, id); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + let mut tombstones = ~[]; + loop { + if child_link.port.peek() { + match child_link.port.recv() { + Tombstone(t) => { + tombstones.push(t); + }, + ChildrenTerminated => { + children_done = true; + break; + } + } + } else { + break + } + } + + rtdebug!("releasing %u tombstones %x", tombstones.len(), id); + + // Try to release the tombstones. Those that still have + // outstanding will be re-enqueued. When this task's + // parents release their latch we'll end up back here + // trying them again. + while !tombstones.is_empty() { + tombstones.pop().release(true); + } + + if children_done { + let count = shared.count.load(SeqCst); + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + } else { + children_done = true; + } + + let total_success = local_success && child_success; + + rtassert!(this.parent.is_some()); + + unsafe { + { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + } + + if children_done { + rtdebug!("children done"); + do Local::borrow::<Scheduler> |sched| { + sched.metrics.release_tombstone += 1; + } + { + rtdebug!("RELEASING parent %x", id); + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u %x", last_count, id); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + this.closed = true; + util::ignore(this); + } else { + rtdebug!("children not done"); + rtdebug!("TOMBSTONING %x", id); + do Local::borrow::<Scheduler> |sched| { + sched.metrics.release_no_tombstone += 1; + } + let chan = { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + parent_link.chan.clone() + }; + assert!(chan.try_send(Tombstone(this))); + } + } + } + + // XXX: Should not require ~self + pub fn wait(~self, local_success: bool) -> bool { + rtassert!(!self.closed); + + rtdebug!("WAITING %x", self.id()); + + let mut this = self; + let mut child_success = true; + + if this.child.is_some() { + rtdebug!("waiting for children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u", last_count); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + loop { + match child_link.port.recv() { + Tombstone(t) => { + t.wait(true); + } + ChildrenTerminated => break + } + } + + let count = shared.count.load(SeqCst); + if count != 0 { ::io::println(fmt!("%u", count)); } + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + + let total_success = local_success && child_success; + + if this.parent.is_some() { + rtdebug!("releasing parent"); + unsafe { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u", last_count); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + } + + this.closed = true; + util::ignore(this); + + return total_success; + } +} + +impl Drop for JoinLatch { + fn finalize(&self) { + rtdebug!("DESTROYING %x", self.id()); + rtassert!(self.closed); + } +} + +#[cfg(test)] +mod test { + use super::*; + use cell::Cell; + use container::Container; + use iter::Times; + use old_iter::BaseIter; + use rt::test::*; + use rand; + use rand::RngUtil; + use vec::{CopyableVector, ImmutableVector}; + + #[test] + fn success_immediately() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn success_later() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + for 10.times { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + let spawn = |status| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + child_latch.wait(status); + } + }; + + for 10.times { spawn(true) } + spawn(false); + for 10.times { spawn(true) } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(true); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(false); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_no_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_later { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone_stress() { + fn rand_orders() -> ~[bool] { + let mut v = ~[false,.. 5]; + v[0] = true; + let mut rng = rand::rng(); + return rng.shuffle(v); + } + + fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) { + if orders.is_empty() { + return (~[], ~[]); + } else if orders.len() <= 2 { + return (orders.to_owned(), ~[]); + } + let mut rng = rand::rng(); + let n = rng.gen_uint_range(1, orders.len()); + let first = orders.slice(0, n).to_owned(); + let last = orders.slice(n, orders.len()).to_owned(); + assert!(first.len() + last.len() == orders.len()); + return (first, last); + } + + for stress_factor().times { + do run_in_newsched_task { + fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) { + let (my_orders, remaining_orders) = split_orders(orders); + rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders)); + rtdebug!("depth: %u", depth); + let mut remaining_orders = remaining_orders; + let mut num = 0; + for my_orders.each |&order| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + let (child_orders, remaining) = split_orders(remaining_orders); + rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining)); + remaining_orders = remaining; + let child_orders = Cell(child_orders); + let child_num = num; + let _ = child_num; // XXX unused except in rtdebug! + do spawntask_random { + rtdebug!("depth %u num %u", depth, child_num); + let mut child_latch = child_latch.take(); + let child_orders = child_orders.take(); + doit(&mut *child_latch, child_orders, depth + 1); + child_latch.release(order); + } + + num += 1; + } + } + + let mut latch = JoinLatch::new_root(); + let orders = rand_orders(); + rtdebug!("orders: %?", orders); + + doit(&mut *latch, orders, 0); + + assert!(!latch.wait(true)); + } + } + } + + #[test] + fn whateverman() { + struct Order { + immediate: bool, + succeed: bool, + orders: ~[Order] + } + fn next(latch: &mut JoinLatch, orders: ~[Order]) { + for orders.each |order| { + let suborders = copy order.orders; + let child_latch = Cell(latch.new_child()); + let succeed = order.succeed; + if order.immediate { + do spawntask_immediately { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("immediate releasing"); + child_latch.release(succeed); + } + } else { + do spawntask_later { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("later releasing"); + child_latch.release(succeed); + } + } + } + } + + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let orders = ~[ Order { // 0 0 + immediate: true, + succeed: true, + orders: ~[ Order { // 1 0 + immediate: true, + succeed: false, + orders: ~[ Order { // 2 0 + immediate: false, + succeed: false, + orders: ~[ Order { // 3 0 + immediate: true, + succeed: false, + orders: ~[] + }, Order { // 3 1 + immediate: false, + succeed: false, + orders: ~[] + }] + }] + }] + }]; + + next(&mut *latch, orders); + assert!(!latch.wait(true)); + } + } +} diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs index 70e347fdfb6..b0c0fa5d708 100644 --- a/src/libstd/rt/metrics.rs +++ b/src/libstd/rt/metrics.rs @@ -34,7 +34,11 @@ pub struct SchedMetrics { // Message receives that do not block the receiver rendezvous_recvs: uint, // Message receives that block the receiver - non_rendezvous_recvs: uint + non_rendezvous_recvs: uint, + // JoinLatch releases that create tombstones + release_tombstone: uint, + // JoinLatch releases that do not create tombstones + release_no_tombstone: uint, } impl SchedMetrics { @@ -51,7 +55,9 @@ impl SchedMetrics { rendezvous_sends: 0, non_rendezvous_sends: 0, rendezvous_recvs: 0, - non_rendezvous_recvs: 0 + non_rendezvous_recvs: 0, + release_tombstone: 0, + release_no_tombstone: 0 } } } @@ -70,6 +76,8 @@ impl ToStr for SchedMetrics { non_rendezvous_sends: %u\n\ rendezvous_recvs: %u\n\ non_rendezvous_recvs: %u\n\ + release_tombstone: %u\n\ + release_no_tombstone: %u\n\ ", self.turns, self.messages_received, @@ -82,7 +90,9 @@ impl ToStr for SchedMetrics { self.rendezvous_sends, self.non_rendezvous_sends, self.rendezvous_recvs, - self.non_rendezvous_recvs + self.non_rendezvous_recvs, + self.release_tombstone, + self.release_no_tombstone ) } } \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index caf3e15e535..2008c4a180f 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -133,6 +133,9 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +/// A concurrent data structure with which parent tasks wait on child tasks. +pub mod join_latch; + pub mod metrics; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 97a1c26ed4d..104eb4b8bae 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -151,7 +151,10 @@ pub impl Scheduler { // XXX: Reenable this once we're using a per-task queue. With a shared // queue this is not true //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); + rtdebug!("scheduler metrics: %s\n", { + use to_str::ToStr; + sched.metrics.to_str() + }); return sched; } |
