about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-08-02 07:31:52 -0700
committerbors <bors@rust-lang.org>2013-08-02 07:31:52 -0700
commit986df44753a7dd2fe9077c6d537f02785cbaccc6 (patch)
tree7e3ea157e53f302f5f46caea676b86a9ec7ece69 /src/libstd
parentaf973397713806fdaf27b19e80637cc6a45d7278 (diff)
parent963d37e821590b470f7a1fc9cfcda5a5ceceeee4 (diff)
downloadrust-986df44753a7dd2fe9077c6d537f02785cbaccc6.tar.gz
rust-986df44753a7dd2fe9077c6d537f02785cbaccc6.zip
auto merge of #8195 : bblum/rust/task-cleanup, r=brson
In the first commit it is obvious why some of the barriers can be changed to ```Relaxed```, but it is not as obvious for the once I changed in ```kill.rs```. The rationale for those is documented as part of the documenting commit.

Also the last commit is a temporary hack to prevent kill signals from being received in taskgroup cleanup code, which could be fixed in a more principled way once the old runtime is gone.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/comm.rs16
-rw-r--r--src/libstd/rt/kill.rs88
-rw-r--r--src/libstd/rt/task.rs9
-rw-r--r--src/libstd/unstable/sync.rs10
4 files changed, 94 insertions, 29 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 5a671d877d2..00e1aaa2193 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -18,7 +18,7 @@ use kinds::Send;
 use rt::sched::Scheduler;
 use rt::local::Local;
 use rt::select::{Select, SelectPort};
-use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
+use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
 use unstable::sync::UnsafeAtomicRcBox;
 use util::Void;
 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@@ -217,15 +217,15 @@ impl<T> Select for PortOne<T> {
                 }
                 STATE_ONE => {
                     // Re-record that we are the only owner of the packet.
-                    // Release barrier needed in case the task gets reawoken
-                    // on a different core (this is analogous to writing a
-                    // payload; a barrier in enqueueing the task protects it).
+                    // No barrier needed, even if the task gets reawoken
+                    // on a different core -- this is analogous to writing a
+                    // payload; a barrier in enqueueing the task protects it.
                     // NB(#8132). This *must* occur before the enqueue below.
                     // FIXME(#6842, #8130) This is usually only needed for the
                     // assertion in recv_ready, except in the case of select().
                     // This won't actually ever have cacheline contention, but
                     // maybe should be optimized out with a cfg(test) anyway?
-                    (*self.packet()).state.store(STATE_ONE, Release);
+                    (*self.packet()).state.store(STATE_ONE, Relaxed);
 
                     rtdebug!("rendezvous recv");
                     sched.metrics.rendezvous_recvs += 1;
@@ -300,7 +300,7 @@ impl<T> SelectPort<T> for PortOne<T> {
         unsafe {
             // See corresponding store() above in block_on for rationale.
             // FIXME(#8130) This can happen only in test builds.
-            assert!((*packet).state.load(Acquire) == STATE_ONE);
+            assert!((*packet).state.load(Relaxed) == STATE_ONE);
 
             let payload = (*packet).payload.take();
 
@@ -375,9 +375,7 @@ impl<T> Drop for PortOne<T> {
                     // receiver was killed awake. The task can't still be
                     // blocked (we are it), but we need to free the handle.
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    // FIXME(#7554)(bblum): Make this cfg(test) dependent.
-                    // in a later commit.
-                    assert!(recvr.wake().is_none());
+                    recvr.assert_already_awake();
                 }
             }
         }
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index c2571f171a1..696f4a8c355 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -8,7 +8,63 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-//! Task death: asynchronous killing, linked failure, exit code propagation.
+/*!
+
+Task death: asynchronous killing, linked failure, exit code propagation.
+
+This file implements two orthogonal building-blocks for communicating failure
+between tasks. One is 'linked failure' or 'task killing', that is, a failing
+task causing other tasks to fail promptly (even those that are blocked on
+pipes or I/O). The other is 'exit code propagation', which affects the result
+observed by the parent of a task::try task that itself spawns child tasks
+(such as any #[test] function). In both cases the data structures live in
+KillHandle.
+
+I. Task killing.
+
+The model for killing involves two atomic flags, the "kill flag" and the
+"unkillable flag". Operations on the kill flag include:
+
+- In the taskgroup code (task/spawn.rs), tasks store a clone of their
+  KillHandle in their shared taskgroup. Another task in the group that fails
+  will use that handle to call kill().
+- When a task blocks, it turns its ~Task into a BlockedTask by storing a
+  the transmuted ~Task pointer inside the KillHandle's kill flag. A task
+  trying to block and a task trying to kill it can simultaneously access the
+  kill flag, after which the task will get scheduled and fail (no matter who
+  wins the race). Likewise, a task trying to wake a blocked task normally and
+  a task trying to kill it can simultaneously access the flag; only one will
+  get the task to reschedule it.
+
+Operations on the unkillable flag include:
+
+- When a task becomes unkillable, it swaps on the flag to forbid any killer
+  from waking it up while it's blocked inside the unkillable section. If a
+  kill was already pending, the task fails instead of becoming unkillable.
+- When a task is done being unkillable, it restores the flag to the normal
+  running state. If a kill was received-but-blocked during the unkillable
+  section, the task fails at this later point.
+- When a task tries to kill another task, before swapping on the kill flag, it
+  first swaps on the unkillable flag, to see if it's "allowed" to wake up the
+  task. If it isn't, the killed task will receive the signal when it becomes
+  killable again. (Of course, a task trying to wake the task normally (e.g.
+  sending on a channel) does not access the unkillable flag at all.)
+
+Why do we not need acquire/release barriers on any of the kill flag swaps?
+This is because barriers establish orderings between accesses on different
+memory locations, but each kill-related operation is only a swap on a single
+location, so atomicity is all that matters. The exception is kill(), which
+does a swap on both flags in sequence. kill() needs no barriers because it
+does not matter if its two accesses are seen reordered on another CPU: if a
+killer does perform both writes, it means it saw a KILL_RUNNING in the
+unkillable flag, which means an unkillable task will see KILL_KILLED and fail
+immediately (rendering the subsequent write to the kill flag unnecessary).
+
+II. Exit code propagation.
+
+FIXME(#7544): Decide on the ultimate model for this and document it.
+
+*/
 
 use cast;
 use cell::Cell;
@@ -16,8 +72,9 @@ use either::{Either, Left, Right};
 use option::{Option, Some, None};
 use prelude::*;
 use rt::task::Task;
+use task::spawn::Taskgroup;
 use to_bytes::IterBytes;
-use unstable::atomics::{AtomicUint, Acquire, SeqCst};
+use unstable::atomics::{AtomicUint, Relaxed};
 use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
 use util;
 
@@ -95,7 +152,7 @@ impl Drop for KillFlag {
     // Letting a KillFlag with a task inside get dropped would leak the task.
     // We could free it here, but the task should get awoken by hand somehow.
     fn drop(&self) {
-        match self.load(Acquire) {
+        match self.load(Relaxed) {
             KILL_RUNNING | KILL_KILLED => { },
             _ => rtabort!("can't drop kill flag with a blocked task inside!"),
         }
@@ -124,7 +181,7 @@ impl BlockedTask {
             Unkillable(task) => Some(task),
             Killable(flag_arc) => {
                 let flag = unsafe { &mut **flag_arc.get() };
-                match flag.swap(KILL_RUNNING, SeqCst) {
+                match flag.swap(KILL_RUNNING, Relaxed) {
                     KILL_RUNNING => None, // woken from select(), perhaps
                     KILL_KILLED  => None, // a killer stole it already
                     task_ptr     =>
@@ -159,7 +216,7 @@ impl BlockedTask {
                 let flag     = &mut **flag_arc.get();
                 let task_ptr = cast::transmute(task);
                 // Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
-                match flag.compare_and_swap(KILL_RUNNING, task_ptr, SeqCst) {
+                match flag.compare_and_swap(KILL_RUNNING, task_ptr, Relaxed) {
                     KILL_RUNNING => Right(Killable(flag_arc)),
                     KILL_KILLED  => Left(revive_task_ptr(task_ptr, Some(flag_arc))),
                     x            => rtabort!("can't block task! kill flag = %?", x),
@@ -257,7 +314,7 @@ impl KillHandle {
         let inner = unsafe { &mut *self.get() };
         // Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
         // FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
-        match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
+        match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, Relaxed) {
             KILL_RUNNING    => { }, // normal case
             KILL_KILLED     => if !already_failing { fail!(KILLED_MSG) },
             _               => rtabort!("inhibit_kill: task already unkillable"),
@@ -270,7 +327,7 @@ impl KillHandle {
         let inner = unsafe { &mut *self.get() };
         // Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
         // FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
-        match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
+        match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, Relaxed) {
             KILL_UNKILLABLE => { }, // normal case
             KILL_KILLED     => if !already_failing { fail!(KILLED_MSG) },
             _               => rtabort!("allow_kill: task already killable"),
@@ -281,10 +338,10 @@ impl KillHandle {
     // if it was blocked and needs punted awake. To be called by other tasks.
     pub fn kill(&mut self) -> Option<~Task> {
         let inner = unsafe { &mut *self.get() };
-        if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
+        if inner.unkillable.swap(KILL_KILLED, Relaxed) == KILL_RUNNING {
             // Got in. Allowed to try to punt the task awake.
             let flag = unsafe { &mut *inner.killed.get() };
-            match flag.swap(KILL_KILLED, SeqCst) {
+            match flag.swap(KILL_KILLED, Relaxed) {
                 // Task either not blocked or already taken care of.
                 KILL_RUNNING | KILL_KILLED => None,
                 // Got ownership of the blocked task.
@@ -306,8 +363,11 @@ impl KillHandle {
         // is unkillable with a kill signal pending.
         let inner = unsafe { &*self.get() };
         let flag  = unsafe { &*inner.killed.get() };
-        // FIXME(#6598): can use relaxed ordering (i think)
-        flag.load(Acquire) == KILL_KILLED
+        // A barrier-related concern here is that a task that gets killed
+        // awake needs to see the killer's write of KILLED to this flag. This
+        // is analogous to receiving a pipe payload; the appropriate barrier
+        // should happen when enqueueing the task.
+        flag.load(Relaxed) == KILL_KILLED
     }
 
     pub fn notify_immediate_failure(&mut self) {
@@ -415,7 +475,7 @@ impl Death {
     }
 
     /// Collect failure exit codes from children and propagate them to a parent.
-    pub fn collect_failure(&mut self, mut success: bool) {
+    pub fn collect_failure(&mut self, mut success: bool, group: Option<Taskgroup>) {
         // This may run after the task has already failed, so even though the
         // task appears to need to be killed, the scheduler should not fail us
         // when we block to unwrap.
@@ -425,6 +485,10 @@ impl Death {
         rtassert!(self.unkillable == 0);
         self.unkillable = 1;
 
+        // FIXME(#7544): See corresponding fixme at the callsite in task.rs.
+        // NB(#8192): Doesn't work with "let _ = ..."
+        { use util; util::ignore(group); }
+
         // Step 1. Decide if we need to collect child failures synchronously.
         do self.on_exit.take_map |on_exit| {
             if success {
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 23a0d28e457..b242ee13fa6 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -212,8 +212,13 @@ impl Task {
     pub fn run(&mut self, f: &fn()) {
         rtdebug!("run called on task: %u", borrow::to_uint(self));
         self.unwinder.try(f);
-        { let _ = self.taskgroup.take(); }
-        self.death.collect_failure(!self.unwinder.unwinding);
+        // FIXME(#7544): We pass the taskgroup into death so that it can be
+        // dropped while the unkillable counter is set. This should not be
+        // necessary except for an extraneous clone() in task/spawn.rs that
+        // causes a killhandle to get dropped, which mustn't receive a kill
+        // signal since we're outside of the unwinder's try() scope.
+        // { let _ = self.taskgroup.take(); }
+        self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take());
         self.destroy();
     }
 
diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs
index 4b5c72e5a86..88c9c6ccb3a 100644
--- a/src/libstd/unstable/sync.rs
+++ b/src/libstd/unstable/sync.rs
@@ -16,7 +16,7 @@ use ptr;
 use option::*;
 use either::{Either, Left, Right};
 use task;
-use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst};
+use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst};
 use unstable::finally::Finally;
 use ops::Drop;
 use clone::Clone;
@@ -95,8 +95,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
     pub fn get(&self) -> *mut T {
         unsafe {
             let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-            // FIXME(#6598) Change Acquire to Relaxed.
-            assert!(data.count.load(Acquire) > 0);
+            assert!(data.count.load(Relaxed) > 0);
             let r: *mut T = data.data.get_mut_ref();
             cast::forget(data);
             return r;
@@ -107,7 +106,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
     pub fn get_immut(&self) -> *T {
         unsafe {
             let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-            assert!(data.count.load(Acquire) > 0); // no barrier is really needed
+            assert!(data.count.load(Relaxed) > 0);
             let r: *T = data.data.get_ref();
             cast::forget(data);
             return r;
@@ -130,8 +129,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
                 // Try to put our server end in the unwrapper slot.
                 // This needs no barrier -- it's protected by the release barrier on
                 // the xadd, and the acquire+release barrier in the destructor's xadd.
-                // FIXME(#6598) Change Acquire to Relaxed.
-                if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
+                if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
                     // Got in. Tell this handle's destructor not to run (we are now it).
                     this.data = ptr::mut_null();
                     // Drop our own reference.