diff options
| author | bors <bors@rust-lang.org> | 2013-08-02 14:55:54 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-08-02 14:55:54 -0700 |
| commit | 3ddc72f69be4d0a2027ff598ad262ea2b2ca3812 (patch) | |
| tree | 5942120bc4dba4d4b2da56457b425145030daf29 /src/libextra | |
| parent | f1c1f92d0c555d6e38ad1cac55926d6d9c9b090f (diff) | |
| parent | 43fecf3556b47305320221586f48f89fe2f6c505 (diff) | |
| download | rust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.tar.gz rust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.zip | |
auto merge of #8234 : bblum/rust/assorted-fixes, r=brson
This fixes 4 bugs that prevented the extra::arc and extra::sync tests from passing on the new runtime. * In ```Add SendDeferred trait``` I add a non-rescheduling ```send_deferred``` method to our various channel types. The ```extra::sync``` concurrency primitives need this guarantee so they can send while inside of an exclusive. (This fixes deterministic deadlocks seen with ```RUST_THREADS=1```.) * In "Fix nasty double-free bug" I make sure that a ```ChanOne``` suppresses_finalize *before* rescheduling away to the receiver, so in case it gets a kill signal upon coming back, the destructor is inhibited as desired. (This is pretty uncommon on multiple CPUs but showed up always with ```RUST_THREADS=1```.) * In ```Fix embarrassing bug where 'unkillable' would unwind improperly``` I make sure the task's unkillable counter stays consistent when a kill signal is received right at the start of an unkillable section. (This is a very uncommon race and can only occur with multiple CPUs.) * In ```Don't fail from kill signals if already unwinding``` I do pretty much what it says on the tin. Surprising that it took the whole suite of sync/arc tests to expose this. The other two commits are cleanup. r @brson
Diffstat (limited to 'src/libextra')
| -rw-r--r-- | src/libextra/sync.rs | 318 |
1 files changed, 107 insertions, 211 deletions
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index e539b067edd..276f9cad7c6 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -18,10 +18,13 @@ use std::borrow; use std::comm; +use std::comm::SendDeferred; use std::task; use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox}; use std::unstable::atomics; +use std::unstable::finally::Finally; use std::util; +use std::util::NonCopyable; /**************************************************************************** * Internals @@ -49,7 +52,7 @@ impl WaitQueue { if self.head.peek() { // Pop and send a wakeup signal. If the waiter was killed, its port // will have closed. Keep trying until we get a live task. - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { true } else { self.signal() @@ -62,7 +65,7 @@ impl WaitQueue { fn broadcast(&self) -> uint { let mut count = 0; while self.head.peek() { - if comm::try_send_one(self.head.recv(), ()) { + if self.head.recv().try_send_deferred(()) { count += 1; } } @@ -83,7 +86,6 @@ struct SemInner<Q> { #[doc(hidden)] struct Sem<Q>(Exclusive<SemInner<Q>>); - #[doc(hidden)] impl<Q:Send> Sem<Q> { fn new(count: int, q: Q) -> Sem<Q> { @@ -102,7 +104,7 @@ impl<Q:Send> Sem<Q> { // Tell outer scope we need to block. waiter_nobe = Some(WaitEnd); // Enqueue ourself. - state.waiters.tail.send(SignalEnd); + state.waiters.tail.send_deferred(SignalEnd); } } // Uncomment if you wish to test for sem races. Not valgrind-friendly. @@ -124,17 +126,18 @@ impl<Q:Send> Sem<Q> { } } } -} -// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs -#[doc(hidden)] -impl Sem<()> { + pub fn access<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; do task::unkillable { - self.acquire(); - release = Some(SemRelease(self)); + do (|| { + self.acquire(); + unsafe { + do task::rekillable { blk() } + } + }).finally { + self.release(); + } } - blk() } } @@ -148,46 +151,6 @@ impl Sem<~[WaitQueue]> { } Sem::new(count, queues) } - - pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; - do task::unkillable { - self.acquire(); - release = Some(SemAndSignalRelease(self)); - } - blk() - } -} - -// FIXME(#3588) should go inside of access() -#[doc(hidden)] -type SemRelease<'self> = SemReleaseGeneric<'self, ()>; -#[doc(hidden)] -type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>; -#[doc(hidden)] -struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> } - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> { - fn drop(&self) { - self.sem.release(); - } -} - -#[doc(hidden)] -fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> { - SemReleaseGeneric { - sem: sem - } -} - -#[doc(hidden)] -fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>) - -> SemAndSignalRelease<'r> { - SemReleaseGeneric { - sem: sem - } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum @@ -210,11 +173,10 @@ pub struct Condvar<'self> { // writer waking up from a cvar wait can't race with a reader to steal it, // See the comment in write_cond for more detail. priv order: ReacquireOrderLock<'self>, + // Make sure condvars are non-copyable. + priv token: util::NonCopyable, } -#[unsafe_destructor] -impl<'self> Drop for Condvar<'self> { fn drop(&self) {} } - impl<'self> Condvar<'self> { /** * Atomically drop the associated lock, and block until a signal is sent. @@ -242,11 +204,10 @@ impl<'self> Condvar<'self> { let (WaitEnd, SignalEnd) = comm::oneshot(); let mut WaitEnd = Some(WaitEnd); let mut SignalEnd = Some(SignalEnd); - let mut reacquire = None; let mut out_of_bounds = None; - unsafe { - do task::unkillable { - // Release lock, 'atomically' enqueuing ourselves in so doing. + do task::unkillable { + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { do (**self.sem).with |state| { if condvar_id < state.blocked.len() { // Drop the lock. @@ -256,42 +217,30 @@ impl<'self> Condvar<'self> { } // Enqueue ourself to be woken up by a signaller. let SignalEnd = SignalEnd.take_unwrap(); - state.blocked[condvar_id].tail.send(SignalEnd); + state.blocked[condvar_id].tail.send_deferred(SignalEnd); } else { out_of_bounds = Some(state.blocked.len()); } } - - // If yield checks start getting inserted anywhere, we can be - // killed before or after enqueueing. Deciding whether to - // unkillably reacquire the lock needs to happen atomically - // wrt enqueuing. - if out_of_bounds.is_none() { - reacquire = Some(CondvarReacquire { sem: self.sem, - order: self.order }); - } } - } - do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { - // Unconditionally "block". (Might not actually block if a - // signaller already sent -- I mean 'unconditionally' in contrast - // with acquire().) - let _ = comm::recv_one(WaitEnd.take_unwrap()); - } - // This is needed for a failing condition variable to reacquire the - // mutex during unwinding. As long as the wrapper (mutex, etc) is - // bounded in when it gets released, this shouldn't hang forever. - struct CondvarReacquire<'self> { - sem: &'self Sem<~[WaitQueue]>, - order: ReacquireOrderLock<'self>, - } - - #[unsafe_destructor] - impl<'self> Drop for CondvarReacquire<'self> { - fn drop(&self) { - // Needs to succeed, instead of itself dying. - do task::unkillable { + // If yield checks start getting inserted anywhere, we can be + // killed before or after enqueueing. Deciding whether to + // unkillably reacquire the lock needs to happen atomically + // wrt enqueuing. + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + do (|| { + unsafe { + do task::rekillable { + let _ = comm::recv_one(WaitEnd.take_unwrap()); + } + } + }).finally { + // Reacquire the condvar. Note this is back in the unkillable + // section; it needs to succeed, instead of itself dying. match self.order { Just(lock) => do lock.access { self.sem.acquire(); @@ -373,8 +322,8 @@ impl Sem<~[WaitQueue]> { // The only other places that condvars get built are rwlock.write_cond() // and rwlock_write_mode. pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U { - do self.access_waitqueue { - blk(&Condvar { sem: self, order: Nothing }) + do self.access { + blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() }) } } } @@ -452,7 +401,7 @@ impl Mutex { /// Run a function with ownership of the mutex. pub fn lock<U>(&self, blk: &fn() -> U) -> U { - (&self.sem).access_waitqueue(blk) + (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. @@ -531,7 +480,6 @@ impl RWLock { * tasks may run concurrently with this one. */ pub fn read<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; unsafe { do task::unkillable { do (&self.order_lock).access { @@ -542,10 +490,24 @@ impl RWLock { state.read_mode = true; } } - release = Some(RWLockReleaseRead(self)); + do (|| { + do task::rekillable { blk() } + }).finally { + let state = &mut *self.state.get(); + assert!(state.read_mode); + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + state.read_mode = false; + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + (&self.access_lock).release(); + } + } } } - blk() } /** @@ -556,7 +518,7 @@ impl RWLock { unsafe { do task::unkillable { (&self.order_lock).acquire(); - do (&self.access_lock).access_waitqueue { + do (&self.access_lock).access { (&self.order_lock).release(); do task::rekillable { blk() @@ -606,7 +568,8 @@ impl RWLock { (&self.order_lock).release(); do task::rekillable { let opt_lock = Just(&self.order_lock); - blk(&Condvar { order: opt_lock, ..*cond }) + blk(&Condvar { sem: cond.sem, order: opt_lock, + token: NonCopyable::new() }) } } } @@ -637,14 +600,43 @@ impl RWLock { pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. - let mut _release = None; do task::unkillable { (&self.order_lock).acquire(); (&self.access_lock).acquire(); (&self.order_lock).release(); + do (|| { + unsafe { + do task::rekillable { + blk(RWLockWriteMode { lock: self, token: NonCopyable::new() }) + } + } + }).finally { + let writer_or_last_reader; + // Check if we're releasing from read mode or from write mode. + let state = unsafe { &mut *self.state.get() }; + if state.read_mode { + // Releasing from read mode. + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + // Check if other readers remain. + if old_count == 1 { + // Case 1: Writer downgraded & was the last reader + writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + writer_or_last_reader = false; + } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; + } + if writer_or_last_reader { + // Nobody left inside; release the "reader cloud" lock. + (&self.access_lock).release(); + } + } } - _release = Some(RWLockReleaseDowngrade(self)); - blk(RWLockWriteMode { lock: self }) } /// To be called inside of the write_downgrade block. @@ -673,105 +665,16 @@ impl RWLock { } } } - RWLockReadMode { lock: token.lock } - } -} - -// FIXME(#3588) should go inside of read() -#[doc(hidden)] -struct RWLockReleaseRead<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseRead<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let state = &mut *self.lock.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> { - RWLockReleaseRead { - lock: lock - } -} - -// FIXME(#3588) should go inside of downgrade() -#[doc(hidden)] -#[unsafe_destructor] -struct RWLockReleaseDowngrade<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseDowngrade<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = &mut *self.lock.state.get(); - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock) - -> RWLockReleaseDowngrade<'r> { - RWLockReleaseDowngrade { - lock: lock + RWLockReadMode { lock: token.lock, token: NonCopyable::new() } } } /// The "write permission" token used for rwlock.write_downgrade(). -pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} } +pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable } /// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} } +pub struct RWLockReadMode<'self> { priv lock: &'self RWLock, + priv token: NonCopyable } impl<'self> RWLockWriteMode<'self> { /// Access the pre-downgrade rwlock in write mode. @@ -781,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> { // Need to make the condvar use the order lock when reacquiring the // access lock. See comment in RWLock::write_cond for why. blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), }) + order: Just(&self.lock.order_lock), + token: NonCopyable::new() }) } } @@ -1059,6 +963,8 @@ mod tests { } #[test] #[ignore(cfg(windows))] fn test_mutex_killed_broadcast() { + use std::unstable::finally::Finally; + let m = ~Mutex::new(); let m2 = ~m.clone(); let (p,c) = comm::stream(); @@ -1075,8 +981,13 @@ mod tests { do mi.lock_cond |cond| { let c = c.take(); c.send(()); // tell sibling to go ahead - let _z = SendOnFailure(c); - cond.wait(); // block forever + do (|| { + cond.wait(); // block forever + }).finally { + error!("task unwinding and sending"); + c.send(()); + error!("task unwinding and done sending"); + } } } } @@ -1095,21 +1006,6 @@ mod tests { let woken = cond.broadcast(); assert_eq!(woken, 0); } - struct SendOnFailure { - c: comm::Chan<()>, - } - - impl Drop for SendOnFailure { - fn drop(&self) { - self.c.send(()); - } - } - - fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure { - SendOnFailure { - c: c - } - } } #[test] fn test_mutex_cond_signal_on_0() { |
