about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-06-13 23:31:19 -0700
committerBrian Anderson <banderson@mozilla.com>2013-06-15 19:00:44 -0700
commit505ef7e710ff890c0027fadad54997041b7ee93b (patch)
tree80721bd6eb072d90ed6b3881c89a8f9a3fff75c9 /src/libstd
parent90fbe38f0064836fd5e169c520d3fd19953e5604 (diff)
downloadrust-505ef7e710ff890c0027fadad54997041b7ee93b.tar.gz
rust-505ef7e710ff890c0027fadad54997041b7ee93b.zip
std::rt: Tasks contain a JoinLatch
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/task.rs33
-rw-r--r--src/libstd/rt/test.rs55
2 files changed, 56 insertions, 32 deletions
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 7c08dabf0bd..75ca4c941c5 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -16,9 +16,11 @@
 use prelude::*;
 use libc::{c_void, uintptr_t};
 use cast::transmute;
+use option::{Option, Some, None};
 use rt::local::Local;
 use super::local_heap::LocalHeap;
 use rt::logging::StdErrLogger;
+use rt::join_latch::JoinLatch;
 
 pub struct Task {
     heap: LocalHeap,
@@ -26,6 +28,8 @@ pub struct Task {
     storage: LocalStorage,
     logger: StdErrLogger,
     unwinder: Unwinder,
+    join_latch: Option<~JoinLatch>,
+    on_exit: Option<~fn(bool)>,
     destroyed: bool
 }
 
@@ -44,6 +48,8 @@ impl Task {
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
             unwinder: Unwinder { unwinding: false },
+            join_latch: Some(JoinLatch::new_root()),
+            on_exit: None,
             destroyed: false
         }
     }
@@ -55,6 +61,8 @@ impl Task {
             storage: LocalStorage(ptr::null(), None),
             logger: StdErrLogger,
             unwinder: Unwinder { unwinding: false },
+            join_latch: Some(self.join_latch.get_mut_ref().new_child()),
+            on_exit: None,
             destroyed: false
         }
     }
@@ -68,9 +76,22 @@ impl Task {
 
         self.unwinder.try(f);
         self.destroy();
+
+        // Wait for children. Possibly report the exit status.
+        let local_success = !self.unwinder.unwinding;
+        let join_latch = self.join_latch.swap_unwrap();
+        match self.on_exit {
+            Some(ref on_exit) => {
+                let success = join_latch.wait(local_success);
+                (*on_exit)(success);
+            }
+            None => {
+                join_latch.release(local_success);
+            }
+        }
     }
 
-    /// Must be called manually before finalization to clean up
+    /// must be called manually before finalization to clean up
     /// thread-local resources. Some of the routines here expect
     /// Task to be available recursively so this must be
     /// called unsafely, without removing Task from
@@ -216,5 +237,15 @@ mod test {
             assert!(port.recv() == 10);
         }
     }
+
+    #[test]
+    fn linked_failure() {
+        do run_in_newsched_task() {
+            let res = do spawntask_try {
+                spawntask_random(|| fail!());
+            };
+            assert!(res.is_err());
+        }
+    }
 }
 
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index ecfe93560b4..36e394e5c5b 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -18,6 +18,7 @@ use vec::OwnedVector;
 use result::{Result, Ok, Err};
 use unstable::run_in_bare_thread;
 use super::io::net::ip::{IpAddr, Ipv4};
+use rt::comm::oneshot;
 use rt::task::Task;
 use rt::thread::Thread;
 use rt::local::Local;
@@ -47,8 +48,11 @@ pub fn run_in_newsched_task(f: ~fn()) {
 
     do run_in_bare_thread {
         let mut sched = ~new_test_uv_sched();
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
+        new_task.on_exit = Some(on_exit);
         let task = ~Coroutine::with_task(&mut sched.stack_pool,
-                                         ~Task::new_root(),
+                                         new_task,
                                          f.take());
         sched.enqueue_task(task);
         sched.run();
@@ -94,16 +98,20 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
         let f_cell = Cell(f_cell.take());
         let handles = Cell(handles);
-        let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
-            f_cell.take()();
+        let mut new_task = ~Task::new_root();
+        let on_exit: ~fn(bool) = |exit_status| {
 
             let mut handles = handles.take();
             // Tell schedulers to exit
             for handles.each_mut |handle| {
                 handle.send(Shutdown);
             }
-        };
 
+            rtassert!(exit_status);
+        };
+        new_task.on_exit = Some(on_exit);
+        let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
+                                              new_task, f_cell.take());
         scheds[0].enqueue_task(main_task);
 
         let mut threads = ~[];
@@ -213,36 +221,21 @@ pub fn spawntask_random(f: ~fn()) {
 pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     use cell::Cell;
     use super::sched::*;
-    use task;
-    use unstable::finally::Finally;
-
-    // Our status variables will be filled in from the scheduler context
-    let mut failed = false;
-    let failed_ptr: *mut bool = &mut failed;
-
-    // Switch to the scheduler
-    let f = Cell(Cell(f));
-    let sched = Local::take::<Scheduler>();
-    do sched.deschedule_running_task_and_then() |sched, old_task| {
-        let old_task = Cell(old_task);
-        let f = f.take();
-        let new_task = ~do Coroutine::new_root(&mut sched.stack_pool) {
-            do (|| {
-                (f.take())()
-            }).finally {
-                // Check for failure then resume the parent task
-                unsafe { *failed_ptr = task::failing(); }
-                let sched = Local::take::<Scheduler>();
-                do sched.switch_running_tasks_and_then(old_task.take()) |sched, new_task| {
-                    sched.enqueue_task(new_task);
-                }
-            }
-        };
 
-        sched.enqueue_task(new_task);
+    let (port, chan) = oneshot();
+    let chan = Cell(chan);
+    let mut new_task = ~Task::new_root();
+    let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
+    new_task.on_exit = Some(on_exit);
+    let mut sched = Local::take::<Scheduler>();
+    let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
+                                         new_task, f);
+    do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
+        sched.enqueue_task(old_task);
     }
 
-    if !failed { Ok(()) } else { Err(()) }
+    let exit_status = port.recv();
+    if exit_status { Ok(()) } else { Err(()) }
 }
 
 // Spawn a new task in a new scheduler and return a thread handle.