diff options
| author | bors <bors@rust-lang.org> | 2013-08-02 14:55:54 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-08-02 14:55:54 -0700 |
| commit | 3ddc72f69be4d0a2027ff598ad262ea2b2ca3812 (patch) | |
| tree | 5942120bc4dba4d4b2da56457b425145030daf29 /src | |
| parent | f1c1f92d0c555d6e38ad1cac55926d6d9c9b090f (diff) | |
| parent | 43fecf3556b47305320221586f48f89fe2f6c505 (diff) | |
| download | rust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.tar.gz rust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.zip | |
auto merge of #8234 : bblum/rust/assorted-fixes, r=brson
This fixes 4 bugs that prevented the extra::arc and extra::sync tests from passing on the new runtime. * In ```Add SendDeferred trait``` I add a non-rescheduling ```send_deferred``` method to our various channel types. The ```extra::sync``` concurrency primitives need this guarantee so they can send while inside of an exclusive. (This fixes deterministic deadlocks seen with ```RUST_THREADS=1```.) * In "Fix nasty double-free bug" I make sure that a ```ChanOne``` suppresses_finalize *before* rescheduling away to the receiver, so in case it gets a kill signal upon coming back, the destructor is inhibited as desired. (This is pretty uncommon on multiple CPUs but showed up always with ```RUST_THREADS=1```.) * In ```Fix embarrassing bug where 'unkillable' would unwind improperly``` I make sure the task's unkillable counter stays consistent when a kill signal is received right at the start of an unkillable section. (This is a very uncommon race and can only occur with multiple CPUs.) * In ```Don't fail from kill signals if already unwinding``` I do pretty much what it says on the tin. Surprising that it took the whole suite of sync/arc tests to expose this. The other two commits are cleanup. r @brson
Diffstat (limited to 'src')
| -rw-r--r-- | src/libextra/sync.rs | 318 | ||||
| -rw-r--r-- | src/libstd/comm.rs | 30 | ||||
| -rw-r--r-- | src/libstd/rt/comm.rs | 160 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 9 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 6 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 41 | ||||
| -rw-r--r-- | src/libstd/util.rs | 6 |
7 files changed, 328 insertions, 242 deletions
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index e539b067edd..276f9cad7c6 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -18,10 +18,13 @@ use std::borrow; use std::comm; +use std::comm::SendDeferred; use std::task; use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox}; use std::unstable::atomics; +use std::unstable::finally::Finally; use std::util; +use std::util::NonCopyable; /**************************************************************************** * Internals @@ -49,7 +52,7 @@ impl WaitQueue { if self.head.peek() { // Pop and send a wakeup signal. If the waiter was killed, its port // will have closed. Keep trying until we get a live task. - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { true } else { self.signal() @@ -62,7 +65,7 @@ impl WaitQueue { fn broadcast(&self) -> uint { let mut count = 0; while self.head.peek() { - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { count += 1; } } @@ -83,7 +86,6 @@ struct SemInner<Q> { #[doc(hidden)] struct Sem<Q>(Exclusive<SemInner<Q>>); - #[doc(hidden)] impl<Q:Send> Sem<Q> { fn new(count: int, q: Q) -> Sem<Q> { @@ -102,7 +104,7 @@ impl<Q:Send> Sem<Q> { // Tell outer scope we need to block. waiter_nobe = Some(WaitEnd); // Enqueue ourself. - state.waiters.tail.send(SignalEnd); + state.waiters.tail.send_deferred(SignalEnd); } } // Uncomment if you wish to test for sem races. Not valgrind-friendly. @@ -124,17 +126,18 @@ impl<Q:Send> Sem<Q> { } } } -} -// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs -#[doc(hidden)] -impl Sem<()> { + pub fn access<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; do task::unkillable { - self.acquire(); - release = Some(SemRelease(self)); + do (|| { + self.acquire(); + unsafe { + do task::rekillable { blk() } + } + }).finally { + self.release(); + } } - blk() } } @@ -148,46 +151,6 @@ impl Sem<~[WaitQueue]> { } Sem::new(count, queues) } - - pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; - do task::unkillable { - self.acquire(); - release = Some(SemAndSignalRelease(self)); - } - blk() - } -} - -// FIXME(#3588) should go inside of access() -#[doc(hidden)] -type SemRelease<'self> = SemReleaseGeneric<'self, ()>; -#[doc(hidden)] -type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>; -#[doc(hidden)] -struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> } - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> { - fn drop(&self) { - self.sem.release(); - } -} - -#[doc(hidden)] -fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> { - SemReleaseGeneric { - sem: sem - } -} - -#[doc(hidden)] -fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>) - -> SemAndSignalRelease<'r> { - SemReleaseGeneric { - sem: sem - } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum @@ -210,11 +173,10 @@ pub struct Condvar<'self> { // writer waking up from a cvar wait can't race with a reader to steal it, // See the comment in write_cond for more detail. priv order: ReacquireOrderLock<'self>, + // Make sure condvars are non-copyable. + priv token: util::NonCopyable, } -#[unsafe_destructor] -impl<'self> Drop for Condvar<'self> { fn drop(&self) {} } - impl<'self> Condvar<'self> { /** * Atomically drop the associated lock, and block until a signal is sent. @@ -242,11 +204,10 @@ impl<'self> Condvar<'self> { let (WaitEnd, SignalEnd) = comm::oneshot(); let mut WaitEnd = Some(WaitEnd); let mut SignalEnd = Some(SignalEnd); - let mut reacquire = None; let mut out_of_bounds = None; - unsafe { - do task::unkillable { - // Release lock, 'atomically' enqueuing ourselves in so doing. + do task::unkillable { + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { do (**self.sem).with |state| { if condvar_id < state.blocked.len() { // Drop the lock. @@ -256,42 +217,30 @@ impl<'self> Condvar<'self> { } // Enqueue ourself to be woken up by a signaller. let SignalEnd = SignalEnd.take_unwrap(); - state.blocked[condvar_id].tail.send(SignalEnd); + state.blocked[condvar_id].tail.send_deferred(SignalEnd); } else { out_of_bounds = Some(state.blocked.len()); } } - - // If yield checks start getting inserted anywhere, we can be - // killed before or after enqueueing. Deciding whether to - // unkillably reacquire the lock needs to happen atomically - // wrt enqueuing. - if out_of_bounds.is_none() { - reacquire = Some(CondvarReacquire { sem: self.sem, - order: self.order }); - } } - } - do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { - // Unconditionally "block". (Might not actually block if a - // signaller already sent -- I mean 'unconditionally' in contrast - // with acquire().) - let _ = comm::recv_one(WaitEnd.take_unwrap()); - } - // This is needed for a failing condition variable to reacquire the - // mutex during unwinding. As long as the wrapper (mutex, etc) is - // bounded in when it gets released, this shouldn't hang forever. - struct CondvarReacquire<'self> { - sem: &'self Sem<~[WaitQueue]>, - order: ReacquireOrderLock<'self>, - } - - #[unsafe_destructor] - impl<'self> Drop for CondvarReacquire<'self> { - fn drop(&self) { - // Needs to succeed, instead of itself dying. - do task::unkillable { + // If yield checks start getting inserted anywhere, we can be + // killed before or after enqueueing. Deciding whether to + // unkillably reacquire the lock needs to happen atomically + // wrt enqueuing. + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + do (|| { + unsafe { + do task::rekillable { + let _ = comm::recv_one(WaitEnd.take_unwrap()); + } + } + }).finally { + // Reacquire the condvar. Note this is back in the unkillable + // section; it needs to succeed, instead of itself dying. match self.order { Just(lock) => do lock.access { self.sem.acquire(); @@ -373,8 +322,8 @@ impl Sem<~[WaitQueue]> { // The only other places that condvars get built are rwlock.write_cond() // and rwlock_write_mode. pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U { - do self.access_waitqueue { - blk(&Condvar { sem: self, order: Nothing }) + do self.access { + blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() }) } } } @@ -452,7 +401,7 @@ impl Mutex { /// Run a function with ownership of the mutex. pub fn lock<U>(&self, blk: &fn() -> U) -> U { - (&self.sem).access_waitqueue(blk) + (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. @@ -531,7 +480,6 @@ impl RWLock { * tasks may run concurrently with this one. */ pub fn read<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; unsafe { do task::unkillable { do (&self.order_lock).access { @@ -542,10 +490,24 @@ impl RWLock { state.read_mode = true; } } - release = Some(RWLockReleaseRead(self)); + do (|| { + do task::rekillable { blk() } + }).finally { + let state = &mut *self.state.get(); + assert!(state.read_mode); + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + state.read_mode = false; + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + (&self.access_lock).release(); + } + } } } - blk() } /** @@ -556,7 +518,7 @@ impl RWLock { unsafe { do task::unkillable { (&self.order_lock).acquire(); - do (&self.access_lock).access_waitqueue { + do (&self.access_lock).access { (&self.order_lock).release(); do task::rekillable { blk() @@ -606,7 +568,8 @@ impl RWLock { (&self.order_lock).release(); do task::rekillable { let opt_lock = Just(&self.order_lock); - blk(&Condvar { order: opt_lock, ..*cond }) + blk(&Condvar { sem: cond.sem, order: opt_lock, + token: NonCopyable::new() }) } } } @@ -637,14 +600,43 @@ impl RWLock { pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. - let mut _release = None; do task::unkillable { (&self.order_lock).acquire(); (&self.access_lock).acquire(); (&self.order_lock).release(); + do (|| { + unsafe { + do task::rekillable { + blk(RWLockWriteMode { lock: self, token: NonCopyable::new() }) + } + } + }).finally { + let writer_or_last_reader; + // Check if we're releasing from read mode or from write mode. + let state = unsafe { &mut *self.state.get() }; + if state.read_mode { + // Releasing from read mode. + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + // Check if other readers remain. + if old_count == 1 { + // Case 1: Writer downgraded & was the last reader + writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + writer_or_last_reader = false; + } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; + } + if writer_or_last_reader { + // Nobody left inside; release the "reader cloud" lock. + (&self.access_lock).release(); + } + } } - _release = Some(RWLockReleaseDowngrade(self)); - blk(RWLockWriteMode { lock: self }) } /// To be called inside of the write_downgrade block. @@ -673,105 +665,16 @@ impl RWLock { } } } - RWLockReadMode { lock: token.lock } - } -} - -// FIXME(#3588) should go inside of read() -#[doc(hidden)] -struct RWLockReleaseRead<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseRead<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let state = &mut *self.lock.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> { - RWLockReleaseRead { - lock: lock - } -} - -// FIXME(#3588) should go inside of downgrade() -#[doc(hidden)] -#[unsafe_destructor] -struct RWLockReleaseDowngrade<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseDowngrade<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = &mut *self.lock.state.get(); - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock) - -> RWLockReleaseDowngrade<'r> { - RWLockReleaseDowngrade { - lock: lock + RWLockReadMode { lock: token.lock, token: NonCopyable::new() } } } /// The "write permission" token used for rwlock.write_downgrade(). -pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} } +pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable } /// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} } +pub struct RWLockReadMode<'self> { priv lock: &'self RWLock, + priv token: NonCopyable } impl<'self> RWLockWriteMode<'self> { /// Access the pre-downgrade rwlock in write mode. @@ -781,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> { // Need to make the condvar use the order lock when reacquiring the // access lock. See comment in RWLock::write_cond for why. blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), }) + order: Just(&self.lock.order_lock), + token: NonCopyable::new() }) } } @@ -1059,6 +963,8 @@ mod tests { } #[test] #[ignore(cfg(windows))] fn test_mutex_killed_broadcast() { + use std::unstable::finally::Finally; + let m = ~Mutex::new(); let m2 = ~m.clone(); let (p,c) = comm::stream(); @@ -1075,8 +981,13 @@ mod tests { do mi.lock_cond |cond| { let c = c.take(); c.send(()); // tell sibling to go ahead - let _z = SendOnFailure(c); - cond.wait(); // block forever + do (|| { + cond.wait(); // block forever + }).finally { + error!("task unwinding and sending"); + c.send(()); + error!("task unwinding and done sending"); + } } } } @@ -1095,21 +1006,6 @@ mod tests { let woken = cond.broadcast(); assert_eq!(woken, 0); } - struct SendOnFailure { - c: comm::Chan<()>, - } - - impl Drop for SendOnFailure { - fn drop(&self) { - self.c.send(()); - } - } - - fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure { - SendOnFailure { - c: c - } - } } #[test] fn test_mutex_cond_signal_on_0() { diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index acdf2cee841..a0731dc3494 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -19,6 +19,7 @@ use either::{Either, Left, Right}; use kinds::Send; use option::{Option, Some}; use unstable::sync::Exclusive; +pub use rt::comm::SendDeferred; use rtcomm = rt::comm; use rt; @@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> { } } +impl<T: Send> SendDeferred<T> for Chan<T> { + fn send_deferred(&self, x: T) { + match self.inner { + Left(ref chan) => chan.send(x), + Right(ref chan) => chan.send_deferred(x) + } + } + fn try_send_deferred(&self, x: T) -> bool { + match self.inner { + Left(ref chan) => chan.try_send(x), + Right(ref chan) => chan.try_send_deferred(x) + } + } +} + impl<T: Send> GenericPort<T> for Port<T> { fn recv(&self) -> T { match self.inner { @@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> { Right(p) => p.try_send(data) } } + pub fn send_deferred(self, data: T) { + let ChanOne { inner } = self; + match inner { + Left(p) => p.send(data), + Right(p) => p.send_deferred(data) + } + } + pub fn try_send_deferred(self, data: T) -> bool { + let ChanOne { inner } = self; + match inner { + Left(p) => p.try_send(data), + Right(p) => p.try_send_deferred(data) + } + } } pub fn recv_one<T: Send>(port: PortOne<T>) -> T { diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa2193..a060059f5fc 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl<T> ChanOne<T> { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -110,6 +131,13 @@ impl<T> ChanOne<T> { // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); + + // Suppress the synchronizing actions in the finalizer. We're + // done with the packet. NB: In case of do_resched, this *must* + // happen before waking up a blocked task (or be unkillable), + // because we might get a kill signal during the reschedule. + this.suppress_finalize = true; + match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do @@ -130,15 +158,20 @@ impl<T> ChanOne<T> { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow::<Scheduler, ()> |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.suppress_finalize = true; return recvr_active; } } @@ -152,6 +185,7 @@ impl<T> PortOne<T> { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +195,7 @@ impl<T> PortOne<T> { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option<T> { let mut this = self; @@ -382,6 +417,12 @@ impl<T> Drop for PortOne<T> { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred<T> { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload<T> { val: T, next: PortOne<StreamPayload<T>> @@ -409,6 +450,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { return (port, chan); } +impl<T: Send> Chan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl<T: Send> GenericChan<T> for Chan<T> { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +467,16 @@ impl<T: Send> GenericChan<T> for Chan<T> { impl<T: Send> GenericSmartChan<T> for Chan<T> { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for Chan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +551,17 @@ impl<T> SharedChan<T> { } } +impl<T: Send> SharedChan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl<T: Send> GenericChan<T> for SharedChan<T> { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +570,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> { impl<T: Send> GenericSmartChan<T> for SharedChan<T> { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for SharedChan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +656,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> { impl<T: Send> GenericChan<T> for MegaPipe<T> { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl<T: Send> GenericSmartChan<T> for MegaPipe<T> { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl<T: Send> GenericPort<T> for MegaPipe<T> { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option<T> { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl<T: Send> SendDeferred<T> for MegaPipe<T> { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1090,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + } diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 696f4a8c355..deec8dd37a6 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -530,13 +530,13 @@ impl Death { /// Fails if a kill signal was received. #[inline] - pub fn check_killed(&self) { + pub fn check_killed(&self, already_failing: bool) { match self.kill_handle { Some(ref kill_handle) => // The task may be both unkillable and killed if it does some // synchronization during unwinding or cleanup (for example, // sending on a notify port). In that case failing won't help. - if self.unkillable == 0 && kill_handle.killed() { + if self.unkillable == 0 && (!already_failing) && kill_handle.killed() { fail!(KILLED_MSG); }, // This may happen during task death (see comments in collect_failure). @@ -548,11 +548,12 @@ impl Death { /// All calls must be paired with a subsequent call to allow_kill. #[inline] pub fn inhibit_kill(&mut self, already_failing: bool) { - if self.unkillable == 0 { + self.unkillable += 1; + // May fail, hence must happen *after* incrementing the counter + if self.unkillable == 1 { rtassert!(self.kill_handle.is_some()); self.kill_handle.get_mut_ref().inhibit_kill(already_failing); } - self.unkillable += 1; } /// Exit a possibly-nested unkillable section of code. diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 5c9b142c052..dfe003253c2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -540,6 +540,10 @@ impl Scheduler { // The current task is grabbed from TLS, not taken as an input. let current_task: ~Task = Local::take::<Task>(); + // Check that the task is not in an atomically() section (e.g., + // holding a pthread mutex, which could deadlock the scheduler). + current_task.death.assert_may_sleep(); + // These transmutes do something fishy with a closure. let f_fake_region = unsafe { transmute::<&fn(&mut Scheduler, ~Task), @@ -600,7 +604,7 @@ impl Scheduler { // Must happen after running the cleanup job (of course). let task = Local::unsafe_borrow::<Task>(); - (*task).death.check_killed(); + (*task).death.check_killed((*task).unwinder.unwinding); } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 19acedb56dd..e08297a1425 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -655,6 +655,47 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U { } } +#[test] #[ignore(cfg(windows))] +fn test_kill_unkillable_task() { + use rt::test::*; + + // Attempt to test that when a kill signal is received at the start of an + // unkillable section, 'unkillable' unwinds correctly. This is actually + // quite a difficult race to expose, as the kill has to happen on a second + // CPU, *after* the spawner is already switched-back-to (and passes the + // killed check at the start of its timeslice). As far as I know, it's not + // possible to make this race deterministic, or even more likely to happen. + do run_in_newsched_task { + do task::try { + do task::spawn { + fail!(); + } + do task::unkillable { } + }; + } +} + +#[test] #[ignore(cfg(windows))] +fn test_kill_rekillable_task() { + use rt::test::*; + + // Tests that when a kill signal is received, 'rekillable' and + // 'unkillable' unwind correctly in conjunction with each other. + do run_in_newsched_task { + do task::try { + do task::unkillable { + unsafe { + do task::rekillable { + do task::spawn { + fail!(); + } + } + } + } + }; + } +} + #[test] #[should_fail] #[ignore(cfg(windows))] fn test_cant_dup_task_builder() { let mut builder = task(); diff --git a/src/libstd/util.rs b/src/libstd/util.rs index 8fcfa083cb6..b46876ad3fe 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -79,6 +79,12 @@ pub fn replace<T>(dest: &mut T, mut src: T) -> T { #[unsafe_no_drop_flag] pub struct NonCopyable; +impl NonCopyable { + // FIXME(#8233) should not be necessary + /// Create a new noncopyable token. + pub fn new() -> NonCopyable { NonCopyable } +} + impl Drop for NonCopyable { fn drop(&self) { } } |
