From d83d38c7fe3408848664de66a9a53587f627a01b Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 6 Jun 2013 20:27:27 -0700 Subject: std::rt: Reduce task stack size to 1MB --- src/libstd/rt/sched.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index df231f6d88a..97a1c26ed4d 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -523,7 +523,7 @@ pub impl Coroutine { task: ~Task, start: ~fn()) -> Coroutine { - static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack + static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); -- cgit 1.4.1-3-g733a5 From abc3a8aa1e76f3ecc3930e20453a52681843cec0 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 2 Jun 2013 01:55:22 -0700 Subject: std::rt: Add JoinLatch This is supposed to be an efficient way to link the lifetimes of tasks into a tree. JoinLatches form a tree and when `release` is called they wait on children then signal the parent. This structure creates zombie tasks which currently keep the entire task allocated. Zombie tasks are supposed to be tombstoned but that code does not work correctly. --- src/libstd/rt/join_latch.rs | 645 ++++++++++++++++++++++++++++++++++++++++++++ src/libstd/rt/metrics.rs | 16 +- src/libstd/rt/mod.rs | 3 + src/libstd/rt/sched.rs | 5 +- 4 files changed, 665 insertions(+), 4 deletions(-) create mode 100644 src/libstd/rt/join_latch.rs (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs new file mode 100644 index 00000000000..6ffba992fdf --- /dev/null +++ b/src/libstd/rt/join_latch.rs @@ -0,0 +1,645 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! The JoinLatch is a concurrent type that establishes the task +//! tree and propagates failure. +//! +//! Each task gets a JoinLatch that is derived from the JoinLatch +//! of its parent task. Every latch must be released by either calling +//! the non-blocking `release` method or the task-blocking `wait` method. +//! Releasing a latch does not complete until all of its child latches +//! complete. +//! +//! Latches carry a `success` flag that is set to `false` during task +//! failure and is propagated both from children to parents and parents +//! to children. The status af this flag may be queried for the purposes +//! of linked failure. +//! +//! In addition to failure propagation the task tree serves to keep the +//! default task schedulers alive. The runtime only sends the shutdown +//! message to schedulers once the root task exits. +//! +//! Under this scheme tasks that terminate before their children become +//! 'zombies' since they may not exit until their children do. Zombie +//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks +//! themselves allowed to terminate. +//! +//! XXX: Propagate flag from parents to children. +//! XXX: Tombstoning actually doesn't work. +//! XXX: This could probably be done in a way that doesn't leak tombstones +//! longer than the life of the child tasks. + +use comm::{GenericPort, Peekable, GenericSmartChan}; +use clone::Clone; +use container::Container; +use option::{Option, Some, None}; +use ops::Drop; +use rt::comm::{SharedChan, Port, stream}; +use rt::local::Local; +use rt::sched::Scheduler; +use unstable::atomics::{AtomicUint, SeqCst}; +use util; +use vec::OwnedVector; + +// FIXME #7026: Would prefer this to be an enum +pub struct JoinLatch { + priv parent: Option, + priv child: Option, + closed: bool, +} + +// Shared between parents and all their children. +struct SharedState { + /// Reference count, held by a parent and all children. + count: AtomicUint, + success: bool +} + +struct ParentLink { + shared: *mut SharedState, + // For communicating with the parent. + chan: SharedChan +} + +struct ChildLink { + shared: ~SharedState, + // For receiving from children. + port: Port, + chan: SharedChan, + // Prevents dropping the child SharedState reference counts multiple times. + dropped_child: bool +} + +// Messages from child latches to parent. +enum Message { + Tombstone(~JoinLatch), + ChildrenTerminated +} + +impl JoinLatch { + pub fn new_root() -> ~JoinLatch { + let this = ~JoinLatch { + parent: None, + child: None, + closed: false + }; + rtdebug!("new root latch %x", this.id()); + return this; + } + + fn id(&self) -> uint { + unsafe { ::cast::transmute(&*self) } + } + + pub fn new_child(&mut self) -> ~JoinLatch { + rtassert!(!self.closed); + + if self.child.is_none() { + // This is the first time spawning a child + let shared = ~SharedState { + count: AtomicUint::new(1), + success: true + }; + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let child = ChildLink { + shared: shared, + port: port, + chan: chan, + dropped_child: false + }; + self.child = Some(child); + } + + let child_link: &mut ChildLink = self.child.get_mut_ref(); + let shared_state: *mut SharedState = &mut *child_link.shared; + + child_link.shared.count.fetch_add(1, SeqCst); + + let child = ~JoinLatch { + parent: Some(ParentLink { + shared: shared_state, + chan: child_link.chan.clone() + }), + child: None, + closed: false + }; + rtdebug!("NEW child latch %x", child.id()); + return child; + } + + pub fn release(~self, local_success: bool) { + // XXX: This should not block, but there's a bug in the below + // code that I can't figure out. + self.wait(local_success); + } + + // XXX: Should not require ~self + fn release_broken(~self, local_success: bool) { + rtassert!(!self.closed); + + rtdebug!("releasing %x", self.id()); + + let id = self.id(); + let _ = id; // XXX: `id` is only used in debug statements so appears unused + let mut this = self; + let mut child_success = true; + let mut children_done = false; + + if this.child.is_some() { + rtdebug!("releasing children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u %x", last_count, id); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + let mut tombstones = ~[]; + loop { + if child_link.port.peek() { + match child_link.port.recv() { + Tombstone(t) => { + tombstones.push(t); + }, + ChildrenTerminated => { + children_done = true; + break; + } + } + } else { + break + } + } + + rtdebug!("releasing %u tombstones %x", tombstones.len(), id); + + // Try to release the tombstones. Those that still have + // outstanding will be re-enqueued. When this task's + // parents release their latch we'll end up back here + // trying them again. + while !tombstones.is_empty() { + tombstones.pop().release(true); + } + + if children_done { + let count = shared.count.load(SeqCst); + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + } else { + children_done = true; + } + + let total_success = local_success && child_success; + + rtassert!(this.parent.is_some()); + + unsafe { + { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + } + + if children_done { + rtdebug!("children done"); + do Local::borrow:: |sched| { + sched.metrics.release_tombstone += 1; + } + { + rtdebug!("RELEASING parent %x", id); + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u %x", last_count, id); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + this.closed = true; + util::ignore(this); + } else { + rtdebug!("children not done"); + rtdebug!("TOMBSTONING %x", id); + do Local::borrow:: |sched| { + sched.metrics.release_no_tombstone += 1; + } + let chan = { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + parent_link.chan.clone() + }; + assert!(chan.try_send(Tombstone(this))); + } + } + } + + // XXX: Should not require ~self + pub fn wait(~self, local_success: bool) -> bool { + rtassert!(!self.closed); + + rtdebug!("WAITING %x", self.id()); + + let mut this = self; + let mut child_success = true; + + if this.child.is_some() { + rtdebug!("waiting for children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u", last_count); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + loop { + match child_link.port.recv() { + Tombstone(t) => { + t.wait(true); + } + ChildrenTerminated => break + } + } + + let count = shared.count.load(SeqCst); + if count != 0 { ::io::println(fmt!("%u", count)); } + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + + let total_success = local_success && child_success; + + if this.parent.is_some() { + rtdebug!("releasing parent"); + unsafe { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u", last_count); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + } + + this.closed = true; + util::ignore(this); + + return total_success; + } +} + +impl Drop for JoinLatch { + fn finalize(&self) { + rtdebug!("DESTROYING %x", self.id()); + rtassert!(self.closed); + } +} + +#[cfg(test)] +mod test { + use super::*; + use cell::Cell; + use container::Container; + use iter::Times; + use old_iter::BaseIter; + use rt::test::*; + use rand; + use rand::RngUtil; + use vec::{CopyableVector, ImmutableVector}; + + #[test] + fn success_immediately() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn success_later() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + for 10.times { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + let spawn = |status| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + child_latch.wait(status); + } + }; + + for 10.times { spawn(true) } + spawn(false); + for 10.times { spawn(true) } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(true); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(false); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_immediately { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_later { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_no_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + + do spawntask_later { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone_stress() { + fn rand_orders() -> ~[bool] { + let mut v = ~[false,.. 5]; + v[0] = true; + let mut rng = rand::rng(); + return rng.shuffle(v); + } + + fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) { + if orders.is_empty() { + return (~[], ~[]); + } else if orders.len() <= 2 { + return (orders.to_owned(), ~[]); + } + let mut rng = rand::rng(); + let n = rng.gen_uint_range(1, orders.len()); + let first = orders.slice(0, n).to_owned(); + let last = orders.slice(n, orders.len()).to_owned(); + assert!(first.len() + last.len() == orders.len()); + return (first, last); + } + + for stress_factor().times { + do run_in_newsched_task { + fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) { + let (my_orders, remaining_orders) = split_orders(orders); + rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders)); + rtdebug!("depth: %u", depth); + let mut remaining_orders = remaining_orders; + let mut num = 0; + for my_orders.each |&order| { + let child_latch = latch.new_child(); + let child_latch = Cell(child_latch); + let (child_orders, remaining) = split_orders(remaining_orders); + rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining)); + remaining_orders = remaining; + let child_orders = Cell(child_orders); + let child_num = num; + let _ = child_num; // XXX unused except in rtdebug! + do spawntask_random { + rtdebug!("depth %u num %u", depth, child_num); + let mut child_latch = child_latch.take(); + let child_orders = child_orders.take(); + doit(&mut *child_latch, child_orders, depth + 1); + child_latch.release(order); + } + + num += 1; + } + } + + let mut latch = JoinLatch::new_root(); + let orders = rand_orders(); + rtdebug!("orders: %?", orders); + + doit(&mut *latch, orders, 0); + + assert!(!latch.wait(true)); + } + } + } + + #[test] + fn whateverman() { + struct Order { + immediate: bool, + succeed: bool, + orders: ~[Order] + } + fn next(latch: &mut JoinLatch, orders: ~[Order]) { + for orders.each |order| { + let suborders = copy order.orders; + let child_latch = Cell(latch.new_child()); + let succeed = order.succeed; + if order.immediate { + do spawntask_immediately { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("immediate releasing"); + child_latch.release(succeed); + } + } else { + do spawntask_later { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("later releasing"); + child_latch.release(succeed); + } + } + } + } + + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let orders = ~[ Order { // 0 0 + immediate: true, + succeed: true, + orders: ~[ Order { // 1 0 + immediate: true, + succeed: false, + orders: ~[ Order { // 2 0 + immediate: false, + succeed: false, + orders: ~[ Order { // 3 0 + immediate: true, + succeed: false, + orders: ~[] + }, Order { // 3 1 + immediate: false, + succeed: false, + orders: ~[] + }] + }] + }] + }]; + + next(&mut *latch, orders); + assert!(!latch.wait(true)); + } + } +} diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs index 70e347fdfb6..b0c0fa5d708 100644 --- a/src/libstd/rt/metrics.rs +++ b/src/libstd/rt/metrics.rs @@ -34,7 +34,11 @@ pub struct SchedMetrics { // Message receives that do not block the receiver rendezvous_recvs: uint, // Message receives that block the receiver - non_rendezvous_recvs: uint + non_rendezvous_recvs: uint, + // JoinLatch releases that create tombstones + release_tombstone: uint, + // JoinLatch releases that do not create tombstones + release_no_tombstone: uint, } impl SchedMetrics { @@ -51,7 +55,9 @@ impl SchedMetrics { rendezvous_sends: 0, non_rendezvous_sends: 0, rendezvous_recvs: 0, - non_rendezvous_recvs: 0 + non_rendezvous_recvs: 0, + release_tombstone: 0, + release_no_tombstone: 0 } } } @@ -70,6 +76,8 @@ impl ToStr for SchedMetrics { non_rendezvous_sends: %u\n\ rendezvous_recvs: %u\n\ non_rendezvous_recvs: %u\n\ + release_tombstone: %u\n\ + release_no_tombstone: %u\n\ ", self.turns, self.messages_received, @@ -82,7 +90,9 @@ impl ToStr for SchedMetrics { self.rendezvous_sends, self.non_rendezvous_sends, self.rendezvous_recvs, - self.non_rendezvous_recvs + self.non_rendezvous_recvs, + self.release_tombstone, + self.release_no_tombstone ) } } \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index caf3e15e535..2008c4a180f 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -133,6 +133,9 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +/// A concurrent data structure with which parent tasks wait on child tasks. +pub mod join_latch; + pub mod metrics; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 97a1c26ed4d..104eb4b8bae 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -151,7 +151,10 @@ pub impl Scheduler { // XXX: Reenable this once we're using a per-task queue. With a shared // queue this is not true //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); + rtdebug!("scheduler metrics: %s\n", { + use to_str::ToStr; + sched.metrics.to_str() + }); return sched; } -- cgit 1.4.1-3-g733a5 From fd148cd3e2d08ce15272f0690f6e41d2e85ee721 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 13 Jun 2013 22:43:20 -0700 Subject: std::rt: Change the Task constructors to reflect a tree --- src/libstd/rt/mod.rs | 4 ++-- src/libstd/rt/sched.rs | 22 +++++++++++----------- src/libstd/rt/task.rs | 26 ++++++++++++++++++++++++-- src/libstd/rt/test.rs | 46 ++++++++++++++++++++++++++++++++++++---------- src/libstd/task/spawn.rs | 9 ++++++++- 5 files changed, 81 insertions(+), 26 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 2008c4a180f..a65b07fdbcf 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -167,7 +167,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { let sleepers = SleeperList::new(); let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); sched.no_sleep = true; - let main_task = ~Coroutine::new(&mut sched.stack_pool, main); + let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main); sched.enqueue_task(main_task); sched.run(); @@ -241,7 +241,7 @@ fn test_context() { do run_in_bare_thread { assert_eq!(context(), GlobalContext); let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); do sched.deschedule_running_task_and_then() |sched, task| { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 104eb4b8bae..9abcc2ec3cc 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -518,8 +518,8 @@ impl SchedHandle { } pub impl Coroutine { - fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - Coroutine::with_task(stack_pool, ~Task::new(), start) + fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { + Coroutine::with_task(stack_pool, ~Task::new_root(), start) } fn with_task(stack_pool: &mut StackPool, @@ -614,7 +614,7 @@ mod test { let task_ran_ptr: *mut bool = &mut task_ran; let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; sched.enqueue_task(task); @@ -632,7 +632,7 @@ mod test { let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } }; sched.enqueue_task(task); @@ -649,10 +649,10 @@ mod test { let count_ptr: *mut int = &mut count; let mut sched = ~new_test_uv_sched(); - let task1 = ~do Coroutine::new(&mut sched.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); - let task2 = ~do Coroutine::new(&mut sched.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task @@ -677,7 +677,7 @@ mod test { let mut sched = ~new_test_uv_sched(); - let start_task = ~do Coroutine::new(&mut sched.stack_pool) { + let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) { run_task(count_ptr); }; sched.enqueue_task(start_task); @@ -687,7 +687,7 @@ mod test { fn run_task(count_ptr: *mut int) { do Local::borrow:: |sched| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; if *count_ptr != MAX { @@ -705,7 +705,7 @@ mod test { fn test_block_task() { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { @@ -752,13 +752,13 @@ mod test { let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell(handle1); - let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) { chan_cell.take().send(()); }; sched1.enqueue_task(task1); let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit handle1_cell.take(); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index cf4967b12b3..10b4672df05 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -37,7 +37,7 @@ pub struct Unwinder { } impl Task { - pub fn new() -> Task { + pub fn new_root() -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, @@ -48,7 +48,29 @@ impl Task { } } - pub fn without_unwinding() -> Task { + pub fn new_root_without_unwinding() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: None, + destroyed: false + } + } + + pub fn new_child(&mut self) -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Some(Unwinder { unwinding: false }), + destroyed: false + } + } + + pub fn new_child_without_unwinding(&mut self) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c8df3a61203..4a4d498a26e 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -48,7 +48,7 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + ~Task::new_root_without_unwinding(), f.take()); sched.enqueue_task(task); sched.run(); @@ -94,7 +94,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell(f_cell.take()); let handles = Cell(handles); - let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { f_cell.take()(); let mut handles = handles.take(); @@ -132,9 +132,14 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { pub fn spawntask(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); sched.schedule_new_task(task); } @@ -143,9 +148,14 @@ pub fn spawntask(f: ~fn()) { pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); @@ -156,9 +166,14 @@ pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); sched.enqueue_task(task); @@ -170,14 +185,19 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); + if run_now { do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); @@ -206,7 +226,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let new_task = ~do Coroutine::new(&mut sched.stack_pool) { + let new_task = ~do Coroutine::new_root(&mut sched.stack_pool) { do (|| { (f.take())() }).finally { @@ -229,11 +249,17 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + + let task = Cell(task.swap_unwrap()); let f = Cell(f); let thread = do Thread::start { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.take(), f.take()); sched.enqueue_task(task); sched.run(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 5941221821a..a4fbec11d72 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -91,6 +91,7 @@ use uint; use util; use unstable::sync::{Exclusive, exclusive}; use rt::local::Local; +use rt::task::Task; #[cfg(test)] use task::default_task_opts; @@ -576,8 +577,14 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { use rt::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); - let task = ~Coroutine::new(&mut sched.stack_pool, f); + let task = ~Coroutine::with_task(&mut sched.stack_pool, + task.swap_unwrap(), f); sched.schedule_new_task(task); } -- cgit 1.4.1-3-g733a5 From 90fbe38f0064836fd5e169c520d3fd19953e5604 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 13 Jun 2013 23:16:27 -0700 Subject: std::rt: Tasks must have an unwinder. Simpler --- src/libstd/rt/task.rs | 39 ++++----------------------------------- src/libstd/rt/test.rs | 12 ++++++------ src/libstd/sys.rs | 6 +----- src/libstd/task/mod.rs | 11 +---------- src/libstd/task/spawn.rs | 2 +- 5 files changed, 13 insertions(+), 57 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 10b4672df05..7c08dabf0bd 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -25,7 +25,7 @@ pub struct Task { gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, - unwinder: Option, + unwinder: Unwinder, destroyed: bool } @@ -43,18 +43,7 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), - destroyed: false - } - } - - pub fn new_root_without_unwinding() -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(ptr::null(), None), - logger: StdErrLogger, - unwinder: None, + unwinder: Unwinder { unwinding: false }, destroyed: false } } @@ -65,18 +54,7 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), - destroyed: false - } - } - - pub fn new_child_without_unwinding(&mut self) -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(ptr::null(), None), - logger: StdErrLogger, - unwinder: None, + unwinder: Unwinder { unwinding: false }, destroyed: false } } @@ -88,16 +66,7 @@ impl Task { assert!(ptr::ref_eq(task, self)); } - match self.unwinder { - Some(ref mut unwinder) => { - // If there's an unwinder then set up the catch block - unwinder.try(f); - } - None => { - // Otherwise, just run the body - f() - } - } + self.unwinder.try(f); self.destroy(); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 4a4d498a26e..ecfe93560b4 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -48,7 +48,7 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::new_root_without_unwinding(), + ~Task::new_root(), f.take()); sched.enqueue_task(task); sched.run(); @@ -134,7 +134,7 @@ pub fn spawntask(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -150,7 +150,7 @@ pub fn spawntask_immediately(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -168,7 +168,7 @@ pub fn spawntask_later(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -187,7 +187,7 @@ pub fn spawntask_random(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -251,7 +251,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let task = Cell(task.swap_unwrap()); diff --git a/src/libstd/sys.rs b/src/libstd/sys.rs index 137070ce202..77085d19567 100644 --- a/src/libstd/sys.rs +++ b/src/libstd/sys.rs @@ -226,11 +226,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { gc::cleanup_stack_for_failure(); let task = Local::unsafe_borrow::(); - let unwinder: &mut Option = &mut (*task).unwinder; - match *unwinder { - Some(ref mut unwinder) => unwinder.begin_unwind(), - None => abort!("failure without unwinder. aborting process") - } + (*task).unwinder.begin_unwind(); } } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f24d2327358..faa505c1995 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -515,16 +515,7 @@ pub fn failing() -> bool { _ => { let mut unwinding = false; do Local::borrow:: |local| { - unwinding = match local.unwinder { - Some(unwinder) => { - unwinder.unwinding - } - None => { - // Because there is no unwinder we can't be unwinding. - // (The process will abort on failure) - false - } - } + unwinding = local.unwinder.unwinding } return unwinding; } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index a4fbec11d72..a17a6777a98 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -579,7 +579,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); -- cgit 1.4.1-3-g733a5 From 505ef7e710ff890c0027fadad54997041b7ee93b Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 13 Jun 2013 23:31:19 -0700 Subject: std::rt: Tasks contain a JoinLatch --- src/libstd/rt/task.rs | 33 ++++++++++++++++++++++++++++++- src/libstd/rt/test.rs | 55 ++++++++++++++++++++++----------------------------- 2 files changed, 56 insertions(+), 32 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 7c08dabf0bd..75ca4c941c5 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -16,9 +16,11 @@ use prelude::*; use libc::{c_void, uintptr_t}; use cast::transmute; +use option::{Option, Some, None}; use rt::local::Local; use super::local_heap::LocalHeap; use rt::logging::StdErrLogger; +use rt::join_latch::JoinLatch; pub struct Task { heap: LocalHeap, @@ -26,6 +28,8 @@ pub struct Task { storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, + join_latch: Option<~JoinLatch>, + on_exit: Option<~fn(bool)>, destroyed: bool } @@ -44,6 +48,8 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, + join_latch: Some(JoinLatch::new_root()), + on_exit: None, destroyed: false } } @@ -55,6 +61,8 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, + join_latch: Some(self.join_latch.get_mut_ref().new_child()), + on_exit: None, destroyed: false } } @@ -68,9 +76,22 @@ impl Task { self.unwinder.try(f); self.destroy(); + + // Wait for children. Possibly report the exit status. + let local_success = !self.unwinder.unwinding; + let join_latch = self.join_latch.swap_unwrap(); + match self.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); + } + None => { + join_latch.release(local_success); + } + } } - /// Must be called manually before finalization to clean up + /// must be called manually before finalization to clean up /// thread-local resources. Some of the routines here expect /// Task to be available recursively so this must be /// called unsafely, without removing Task from @@ -216,5 +237,15 @@ mod test { assert!(port.recv() == 10); } } + + #[test] + fn linked_failure() { + do run_in_newsched_task() { + let res = do spawntask_try { + spawntask_random(|| fail!()); + }; + assert!(res.is_err()); + } + } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index ecfe93560b4..36e394e5c5b 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -18,6 +18,7 @@ use vec::OwnedVector; use result::{Result, Ok, Err}; use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; +use rt::comm::oneshot; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; @@ -47,8 +48,11 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + new_task.on_exit = Some(on_exit); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::new_root(), + new_task, f.take()); sched.enqueue_task(task); sched.run(); @@ -94,16 +98,20 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell(f_cell.take()); let handles = Cell(handles); - let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { - f_cell.take()(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); // Tell schedulers to exit for handles.each_mut |handle| { handle.send(Shutdown); } - }; + rtassert!(exit_status); + }; + new_task.on_exit = Some(on_exit); + let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, + new_task, f_cell.take()); scheds[0].enqueue_task(main_task); let mut threads = ~[]; @@ -213,36 +221,21 @@ pub fn spawntask_random(f: ~fn()) { pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; use super::sched::*; - use task; - use unstable::finally::Finally; - - // Our status variables will be filled in from the scheduler context - let mut failed = false; - let failed_ptr: *mut bool = &mut failed; - - // Switch to the scheduler - let f = Cell(Cell(f)); - let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |sched, old_task| { - let old_task = Cell(old_task); - let f = f.take(); - let new_task = ~do Coroutine::new_root(&mut sched.stack_pool) { - do (|| { - (f.take())() - }).finally { - // Check for failure then resume the parent task - unsafe { *failed_ptr = task::failing(); } - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| { - sched.enqueue_task(new_task); - } - } - }; - sched.enqueue_task(new_task); + let (port, chan) = oneshot(); + let chan = Cell(chan); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); + new_task.on_exit = Some(on_exit); + let mut sched = Local::take::(); + let new_task = ~Coroutine::with_task(&mut sched.stack_pool, + new_task, f); + do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { + sched.enqueue_task(old_task); } - if !failed { Ok(()) } else { Err(()) } + let exit_status = port.recv(); + if exit_status { Ok(()) } else { Err(()) } } // Spawn a new task in a new scheduler and return a thread handle. -- cgit 1.4.1-3-g733a5