diff options
| -rw-r--r-- | src/libextra/sync.rs | 309 | ||||
| -rw-r--r-- | src/libstd/util.rs | 6 |
2 files changed, 108 insertions, 207 deletions
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 045aeb0feda..276f9cad7c6 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -22,7 +22,9 @@ use std::comm::SendDeferred; use std::task; use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox}; use std::unstable::atomics; +use std::unstable::finally::Finally; use std::util; +use std::util::NonCopyable; /**************************************************************************** * Internals @@ -84,7 +86,6 @@ struct SemInner<Q> { #[doc(hidden)] struct Sem<Q>(Exclusive<SemInner<Q>>); - #[doc(hidden)] impl<Q:Send> Sem<Q> { fn new(count: int, q: Q) -> Sem<Q> { @@ -125,17 +126,18 @@ impl<Q:Send> Sem<Q> { } } } -} -// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs -#[doc(hidden)] -impl Sem<()> { + pub fn access<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; do task::unkillable { - self.acquire(); - release = Some(SemRelease(self)); + do (|| { + self.acquire(); + unsafe { + do task::rekillable { blk() } + } + }).finally { + self.release(); + } } - blk() } } @@ -149,46 +151,6 @@ impl Sem<~[WaitQueue]> { } Sem::new(count, queues) } - - pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; - do task::unkillable { - self.acquire(); - release = Some(SemAndSignalRelease(self)); - } - blk() - } -} - -// FIXME(#3588) should go inside of access() -#[doc(hidden)] -type SemRelease<'self> = SemReleaseGeneric<'self, ()>; -#[doc(hidden)] -type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>; -#[doc(hidden)] -struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> } - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> { - fn drop(&self) { - self.sem.release(); - } -} - -#[doc(hidden)] -fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> { - SemReleaseGeneric { - sem: sem - } -} - -#[doc(hidden)] -fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>) - -> SemAndSignalRelease<'r> { - SemReleaseGeneric { - sem: sem - } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum @@ -211,11 +173,10 @@ pub struct Condvar<'self> { // writer waking up from a cvar wait can't race with a reader to steal it, // See the comment in write_cond for more detail. priv order: ReacquireOrderLock<'self>, + // Make sure condvars are non-copyable. + priv token: util::NonCopyable, } -#[unsafe_destructor] -impl<'self> Drop for Condvar<'self> { fn drop(&self) {} } - impl<'self> Condvar<'self> { /** * Atomically drop the associated lock, and block until a signal is sent. @@ -243,11 +204,10 @@ impl<'self> Condvar<'self> { let (WaitEnd, SignalEnd) = comm::oneshot(); let mut WaitEnd = Some(WaitEnd); let mut SignalEnd = Some(SignalEnd); - let mut reacquire = None; let mut out_of_bounds = None; - unsafe { - do task::unkillable { - // Release lock, 'atomically' enqueuing ourselves in so doing. + do task::unkillable { + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { do (**self.sem).with |state| { if condvar_id < state.blocked.len() { // Drop the lock. @@ -262,37 +222,25 @@ impl<'self> Condvar<'self> { out_of_bounds = Some(state.blocked.len()); } } - - // If yield checks start getting inserted anywhere, we can be - // killed before or after enqueueing. Deciding whether to - // unkillably reacquire the lock needs to happen atomically - // wrt enqueuing. - if out_of_bounds.is_none() { - reacquire = Some(CondvarReacquire { sem: self.sem, - order: self.order }); - } } - } - do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { - // Unconditionally "block". (Might not actually block if a - // signaller already sent -- I mean 'unconditionally' in contrast - // with acquire().) - let _ = comm::recv_one(WaitEnd.take_unwrap()); - } - // This is needed for a failing condition variable to reacquire the - // mutex during unwinding. As long as the wrapper (mutex, etc) is - // bounded in when it gets released, this shouldn't hang forever. - struct CondvarReacquire<'self> { - sem: &'self Sem<~[WaitQueue]>, - order: ReacquireOrderLock<'self>, - } - - #[unsafe_destructor] - impl<'self> Drop for CondvarReacquire<'self> { - fn drop(&self) { - // Needs to succeed, instead of itself dying. - do task::unkillable { + // If yield checks start getting inserted anywhere, we can be + // killed before or after enqueueing. Deciding whether to + // unkillably reacquire the lock needs to happen atomically + // wrt enqueuing. + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + do (|| { + unsafe { + do task::rekillable { + let _ = comm::recv_one(WaitEnd.take_unwrap()); + } + } + }).finally { + // Reacquire the condvar. Note this is back in the unkillable + // section; it needs to succeed, instead of itself dying. match self.order { Just(lock) => do lock.access { self.sem.acquire(); @@ -374,8 +322,8 @@ impl Sem<~[WaitQueue]> { // The only other places that condvars get built are rwlock.write_cond() // and rwlock_write_mode. pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U { - do self.access_waitqueue { - blk(&Condvar { sem: self, order: Nothing }) + do self.access { + blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() }) } } } @@ -453,7 +401,7 @@ impl Mutex { /// Run a function with ownership of the mutex. pub fn lock<U>(&self, blk: &fn() -> U) -> U { - (&self.sem).access_waitqueue(blk) + (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. @@ -532,7 +480,6 @@ impl RWLock { * tasks may run concurrently with this one. */ pub fn read<U>(&self, blk: &fn() -> U) -> U { - let mut release = None; unsafe { do task::unkillable { do (&self.order_lock).access { @@ -543,10 +490,24 @@ impl RWLock { state.read_mode = true; } } - release = Some(RWLockReleaseRead(self)); + do (|| { + do task::rekillable { blk() } + }).finally { + let state = &mut *self.state.get(); + assert!(state.read_mode); + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + state.read_mode = false; + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + (&self.access_lock).release(); + } + } } } - blk() } /** @@ -557,7 +518,7 @@ impl RWLock { unsafe { do task::unkillable { (&self.order_lock).acquire(); - do (&self.access_lock).access_waitqueue { + do (&self.access_lock).access { (&self.order_lock).release(); do task::rekillable { blk() @@ -607,7 +568,8 @@ impl RWLock { (&self.order_lock).release(); do task::rekillable { let opt_lock = Just(&self.order_lock); - blk(&Condvar { order: opt_lock, ..*cond }) + blk(&Condvar { sem: cond.sem, order: opt_lock, + token: NonCopyable::new() }) } } } @@ -638,14 +600,43 @@ impl RWLock { pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. - let mut _release = None; do task::unkillable { (&self.order_lock).acquire(); (&self.access_lock).acquire(); (&self.order_lock).release(); + do (|| { + unsafe { + do task::rekillable { + blk(RWLockWriteMode { lock: self, token: NonCopyable::new() }) + } + } + }).finally { + let writer_or_last_reader; + // Check if we're releasing from read mode or from write mode. + let state = unsafe { &mut *self.state.get() }; + if state.read_mode { + // Releasing from read mode. + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + // Check if other readers remain. + if old_count == 1 { + // Case 1: Writer downgraded & was the last reader + writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + writer_or_last_reader = false; + } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; + } + if writer_or_last_reader { + // Nobody left inside; release the "reader cloud" lock. + (&self.access_lock).release(); + } + } } - _release = Some(RWLockReleaseDowngrade(self)); - blk(RWLockWriteMode { lock: self }) } /// To be called inside of the write_downgrade block. @@ -674,105 +665,16 @@ impl RWLock { } } } - RWLockReadMode { lock: token.lock } - } -} - -// FIXME(#3588) should go inside of read() -#[doc(hidden)] -struct RWLockReleaseRead<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseRead<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let state = &mut *self.lock.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> { - RWLockReleaseRead { - lock: lock - } -} - -// FIXME(#3588) should go inside of downgrade() -#[doc(hidden)] -#[unsafe_destructor] -struct RWLockReleaseDowngrade<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseDowngrade<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = &mut *self.lock.state.get(); - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock) - -> RWLockReleaseDowngrade<'r> { - RWLockReleaseDowngrade { - lock: lock + RWLockReadMode { lock: token.lock, token: NonCopyable::new() } } } /// The "write permission" token used for rwlock.write_downgrade(). -pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} } +pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable } /// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} } +pub struct RWLockReadMode<'self> { priv lock: &'self RWLock, + priv token: NonCopyable } impl<'self> RWLockWriteMode<'self> { /// Access the pre-downgrade rwlock in write mode. @@ -782,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> { // Need to make the condvar use the order lock when reacquiring the // access lock. See comment in RWLock::write_cond for why. blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), }) + order: Just(&self.lock.order_lock), + token: NonCopyable::new() }) } } @@ -1060,6 +963,8 @@ mod tests { } #[test] #[ignore(cfg(windows))] fn test_mutex_killed_broadcast() { + use std::unstable::finally::Finally; + let m = ~Mutex::new(); let m2 = ~m.clone(); let (p,c) = comm::stream(); @@ -1076,8 +981,13 @@ mod tests { do mi.lock_cond |cond| { let c = c.take(); c.send(()); // tell sibling to go ahead - let _z = SendOnFailure(c); - cond.wait(); // block forever + do (|| { + cond.wait(); // block forever + }).finally { + error!("task unwinding and sending"); + c.send(()); + error!("task unwinding and done sending"); + } } } } @@ -1096,21 +1006,6 @@ mod tests { let woken = cond.broadcast(); assert_eq!(woken, 0); } - struct SendOnFailure { - c: comm::Chan<()>, - } - - impl Drop for SendOnFailure { - fn drop(&self) { - self.c.send(()); - } - } - - fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure { - SendOnFailure { - c: c - } - } } #[test] fn test_mutex_cond_signal_on_0() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index 8fcfa083cb6..b46876ad3fe 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -79,6 +79,12 @@ pub fn replace<T>(dest: &mut T, mut src: T) -> T { #[unsafe_no_drop_flag] pub struct NonCopyable; +impl NonCopyable { + // FIXME(#8233) should not be necessary + /// Create a new noncopyable token. + pub fn new() -> NonCopyable { NonCopyable } +} + impl Drop for NonCopyable { fn drop(&self) { } } |
