diff options
| author | bors <bors@rust-lang.org> | 2013-08-02 07:31:52 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-08-02 07:31:52 -0700 |
| commit | 986df44753a7dd2fe9077c6d537f02785cbaccc6 (patch) | |
| tree | 7e3ea157e53f302f5f46caea676b86a9ec7ece69 /src/libstd | |
| parent | af973397713806fdaf27b19e80637cc6a45d7278 (diff) | |
| parent | 963d37e821590b470f7a1fc9cfcda5a5ceceeee4 (diff) | |
| download | rust-986df44753a7dd2fe9077c6d537f02785cbaccc6.tar.gz rust-986df44753a7dd2fe9077c6d537f02785cbaccc6.zip | |
auto merge of #8195 : bblum/rust/task-cleanup, r=brson
In the first commit it is obvious why some of the barriers can be changed to ```Relaxed```, but it is not as obvious for the once I changed in ```kill.rs```. The rationale for those is documented as part of the documenting commit. Also the last commit is a temporary hack to prevent kill signals from being received in taskgroup cleanup code, which could be fixed in a more principled way once the old runtime is gone.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/rt/comm.rs | 16 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 88 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 9 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 10 |
4 files changed, 94 insertions, 29 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 5a671d877d2..00e1aaa2193 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -18,7 +18,7 @@ use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; use rt::select::{Select, SelectPort}; -use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst}; +use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -217,15 +217,15 @@ impl<T> Select for PortOne<T> { } STATE_ONE => { // Re-record that we are the only owner of the packet. - // Release barrier needed in case the task gets reawoken - // on a different core (this is analogous to writing a - // payload; a barrier in enqueueing the task protects it). + // No barrier needed, even if the task gets reawoken + // on a different core -- this is analogous to writing a + // payload; a barrier in enqueueing the task protects it. // NB(#8132). This *must* occur before the enqueue below. // FIXME(#6842, #8130) This is usually only needed for the // assertion in recv_ready, except in the case of select(). // This won't actually ever have cacheline contention, but // maybe should be optimized out with a cfg(test) anyway? - (*self.packet()).state.store(STATE_ONE, Release); + (*self.packet()).state.store(STATE_ONE, Relaxed); rtdebug!("rendezvous recv"); sched.metrics.rendezvous_recvs += 1; @@ -300,7 +300,7 @@ impl<T> SelectPort<T> for PortOne<T> { unsafe { // See corresponding store() above in block_on for rationale. // FIXME(#8130) This can happen only in test builds. - assert!((*packet).state.load(Acquire) == STATE_ONE); + assert!((*packet).state.load(Relaxed) == STATE_ONE); let payload = (*packet).payload.take(); @@ -375,9 +375,7 @@ impl<T> Drop for PortOne<T> { // receiver was killed awake. The task can't still be // blocked (we are it), but we need to free the handle. let recvr = BlockedTask::cast_from_uint(task_as_state); - // FIXME(#7554)(bblum): Make this cfg(test) dependent. - // in a later commit. - assert!(recvr.wake().is_none()); + recvr.assert_already_awake(); } } } diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index c2571f171a1..696f4a8c355 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -8,7 +8,63 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! Task death: asynchronous killing, linked failure, exit code propagation. +/*! + +Task death: asynchronous killing, linked failure, exit code propagation. + +This file implements two orthogonal building-blocks for communicating failure +between tasks. One is 'linked failure' or 'task killing', that is, a failing +task causing other tasks to fail promptly (even those that are blocked on +pipes or I/O). The other is 'exit code propagation', which affects the result +observed by the parent of a task::try task that itself spawns child tasks +(such as any #[test] function). In both cases the data structures live in +KillHandle. + +I. Task killing. + +The model for killing involves two atomic flags, the "kill flag" and the +"unkillable flag". Operations on the kill flag include: + +- In the taskgroup code (task/spawn.rs), tasks store a clone of their + KillHandle in their shared taskgroup. Another task in the group that fails + will use that handle to call kill(). +- When a task blocks, it turns its ~Task into a BlockedTask by storing a + the transmuted ~Task pointer inside the KillHandle's kill flag. A task + trying to block and a task trying to kill it can simultaneously access the + kill flag, after which the task will get scheduled and fail (no matter who + wins the race). Likewise, a task trying to wake a blocked task normally and + a task trying to kill it can simultaneously access the flag; only one will + get the task to reschedule it. + +Operations on the unkillable flag include: + +- When a task becomes unkillable, it swaps on the flag to forbid any killer + from waking it up while it's blocked inside the unkillable section. If a + kill was already pending, the task fails instead of becoming unkillable. +- When a task is done being unkillable, it restores the flag to the normal + running state. If a kill was received-but-blocked during the unkillable + section, the task fails at this later point. +- When a task tries to kill another task, before swapping on the kill flag, it + first swaps on the unkillable flag, to see if it's "allowed" to wake up the + task. If it isn't, the killed task will receive the signal when it becomes + killable again. (Of course, a task trying to wake the task normally (e.g. + sending on a channel) does not access the unkillable flag at all.) + +Why do we not need acquire/release barriers on any of the kill flag swaps? +This is because barriers establish orderings between accesses on different +memory locations, but each kill-related operation is only a swap on a single +location, so atomicity is all that matters. The exception is kill(), which +does a swap on both flags in sequence. kill() needs no barriers because it +does not matter if its two accesses are seen reordered on another CPU: if a +killer does perform both writes, it means it saw a KILL_RUNNING in the +unkillable flag, which means an unkillable task will see KILL_KILLED and fail +immediately (rendering the subsequent write to the kill flag unnecessary). + +II. Exit code propagation. + +FIXME(#7544): Decide on the ultimate model for this and document it. + +*/ use cast; use cell::Cell; @@ -16,8 +72,9 @@ use either::{Either, Left, Right}; use option::{Option, Some, None}; use prelude::*; use rt::task::Task; +use task::spawn::Taskgroup; use to_bytes::IterBytes; -use unstable::atomics::{AtomicUint, Acquire, SeqCst}; +use unstable::atomics::{AtomicUint, Relaxed}; use unstable::sync::{UnsafeAtomicRcBox, LittleLock}; use util; @@ -95,7 +152,7 @@ impl Drop for KillFlag { // Letting a KillFlag with a task inside get dropped would leak the task. // We could free it here, but the task should get awoken by hand somehow. fn drop(&self) { - match self.load(Acquire) { + match self.load(Relaxed) { KILL_RUNNING | KILL_KILLED => { }, _ => rtabort!("can't drop kill flag with a blocked task inside!"), } @@ -124,7 +181,7 @@ impl BlockedTask { Unkillable(task) => Some(task), Killable(flag_arc) => { let flag = unsafe { &mut **flag_arc.get() }; - match flag.swap(KILL_RUNNING, SeqCst) { + match flag.swap(KILL_RUNNING, Relaxed) { KILL_RUNNING => None, // woken from select(), perhaps KILL_KILLED => None, // a killer stole it already task_ptr => @@ -159,7 +216,7 @@ impl BlockedTask { let flag = &mut **flag_arc.get(); let task_ptr = cast::transmute(task); // Expect flag to contain RUNNING. If KILLED, it should stay KILLED. - match flag.compare_and_swap(KILL_RUNNING, task_ptr, SeqCst) { + match flag.compare_and_swap(KILL_RUNNING, task_ptr, Relaxed) { KILL_RUNNING => Right(Killable(flag_arc)), KILL_KILLED => Left(revive_task_ptr(task_ptr, Some(flag_arc))), x => rtabort!("can't block task! kill flag = %?", x), @@ -257,7 +314,7 @@ impl KillHandle { let inner = unsafe { &mut *self.get() }; // Expect flag to contain RUNNING. If KILLED, it should stay KILLED. // FIXME(#7544)(bblum): is it really necessary to prohibit double kill? - match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) { + match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, Relaxed) { KILL_RUNNING => { }, // normal case KILL_KILLED => if !already_failing { fail!(KILLED_MSG) }, _ => rtabort!("inhibit_kill: task already unkillable"), @@ -270,7 +327,7 @@ impl KillHandle { let inner = unsafe { &mut *self.get() }; // Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED. // FIXME(#7544)(bblum): is it really necessary to prohibit double kill? - match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) { + match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, Relaxed) { KILL_UNKILLABLE => { }, // normal case KILL_KILLED => if !already_failing { fail!(KILLED_MSG) }, _ => rtabort!("allow_kill: task already killable"), @@ -281,10 +338,10 @@ impl KillHandle { // if it was blocked and needs punted awake. To be called by other tasks. pub fn kill(&mut self) -> Option<~Task> { let inner = unsafe { &mut *self.get() }; - if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING { + if inner.unkillable.swap(KILL_KILLED, Relaxed) == KILL_RUNNING { // Got in. Allowed to try to punt the task awake. let flag = unsafe { &mut *inner.killed.get() }; - match flag.swap(KILL_KILLED, SeqCst) { + match flag.swap(KILL_KILLED, Relaxed) { // Task either not blocked or already taken care of. KILL_RUNNING | KILL_KILLED => None, // Got ownership of the blocked task. @@ -306,8 +363,11 @@ impl KillHandle { // is unkillable with a kill signal pending. let inner = unsafe { &*self.get() }; let flag = unsafe { &*inner.killed.get() }; - // FIXME(#6598): can use relaxed ordering (i think) - flag.load(Acquire) == KILL_KILLED + // A barrier-related concern here is that a task that gets killed + // awake needs to see the killer's write of KILLED to this flag. This + // is analogous to receiving a pipe payload; the appropriate barrier + // should happen when enqueueing the task. + flag.load(Relaxed) == KILL_KILLED } pub fn notify_immediate_failure(&mut self) { @@ -415,7 +475,7 @@ impl Death { } /// Collect failure exit codes from children and propagate them to a parent. - pub fn collect_failure(&mut self, mut success: bool) { + pub fn collect_failure(&mut self, mut success: bool, group: Option<Taskgroup>) { // This may run after the task has already failed, so even though the // task appears to need to be killed, the scheduler should not fail us // when we block to unwrap. @@ -425,6 +485,10 @@ impl Death { rtassert!(self.unkillable == 0); self.unkillable = 1; + // FIXME(#7544): See corresponding fixme at the callsite in task.rs. + // NB(#8192): Doesn't work with "let _ = ..." + { use util; util::ignore(group); } + // Step 1. Decide if we need to collect child failures synchronously. do self.on_exit.take_map |on_exit| { if success { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 23a0d28e457..b242ee13fa6 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -212,8 +212,13 @@ impl Task { pub fn run(&mut self, f: &fn()) { rtdebug!("run called on task: %u", borrow::to_uint(self)); self.unwinder.try(f); - { let _ = self.taskgroup.take(); } - self.death.collect_failure(!self.unwinder.unwinding); + // FIXME(#7544): We pass the taskgroup into death so that it can be + // dropped while the unkillable counter is set. This should not be + // necessary except for an extraneous clone() in task/spawn.rs that + // causes a killhandle to get dropped, which mustn't receive a kill + // signal since we're outside of the unwinder's try() scope. + // { let _ = self.taskgroup.take(); } + self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take()); self.destroy(); } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 4b5c72e5a86..88c9c6ccb3a 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -16,7 +16,7 @@ use ptr; use option::*; use either::{Either, Left, Right}; use task; -use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst}; +use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst}; use unstable::finally::Finally; use ops::Drop; use clone::Clone; @@ -95,8 +95,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> { pub fn get(&self) -> *mut T { unsafe { let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - // FIXME(#6598) Change Acquire to Relaxed. - assert!(data.count.load(Acquire) > 0); + assert!(data.count.load(Relaxed) > 0); let r: *mut T = data.data.get_mut_ref(); cast::forget(data); return r; @@ -107,7 +106,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> { pub fn get_immut(&self) -> *T { unsafe { let data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - assert!(data.count.load(Acquire) > 0); // no barrier is really needed + assert!(data.count.load(Relaxed) > 0); let r: *T = data.data.get_ref(); cast::forget(data); return r; @@ -130,8 +129,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> { // Try to put our server end in the unwrapper slot. // This needs no barrier -- it's protected by the release barrier on // the xadd, and the acquire+release barrier in the destructor's xadd. - // FIXME(#6598) Change Acquire to Relaxed. - if data.unwrapper.fill(~(c1,p2), Acquire).is_none() { + if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() { // Got in. Tell this handle's destructor not to run (we are now it). this.data = ptr::mut_null(); // Drop our own reference. |
