diff options
| author | bors <bors@rust-lang.org> | 2014-12-30 08:02:39 +0000 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-12-30 08:02:39 +0000 |
| commit | d2368c3c11ddab9d812c4ddec2e44579326ad347 (patch) | |
| tree | b976bc0eb040da67646a9d99bb9b901cb9f55abd /src/libstd/sync | |
| parent | fea5aa656ff4349f4d3e1fea1447d26986762ae1 (diff) | |
| parent | 470ae101d6e26a6ce07292b7fca6eaed527451c7 (diff) | |
| download | rust-d2368c3c11ddab9d812c4ddec2e44579326ad347.tar.gz rust-d2368c3c11ddab9d812c4ddec2e44579326ad347.zip | |
auto merge of #20320 : alexcrichton/rust/rollup, r=alexcrichton
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/atomic.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/barrier.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/condvar.rs | 121 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 7 | ||||
| -rw-r--r-- | src/libstd/sync/mutex.rs | 236 | ||||
| -rw-r--r-- | src/libstd/sync/poison.rs | 122 | ||||
| -rw-r--r-- | src/libstd/sync/rwlock.rs | 312 | ||||
| -rw-r--r-- | src/libstd/sync/semaphore.rs | 6 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 34 |
9 files changed, 498 insertions, 346 deletions
diff --git a/src/libstd/sync/atomic.rs b/src/libstd/sync/atomic.rs index 26778ef70b3..bdf947438f3 100644 --- a/src/libstd/sync/atomic.rs +++ b/src/libstd/sync/atomic.rs @@ -180,7 +180,7 @@ impl<T: Send> Drop for AtomicOption<T> { #[cfg(test)] mod test { - use prelude::*; + use prelude::{Some, None}; use super::*; #[test] diff --git a/src/libstd/sync/barrier.rs b/src/libstd/sync/barrier.rs index 6cdb199819a..4091f0df395 100644 --- a/src/libstd/sync/barrier.rs +++ b/src/libstd/sync/barrier.rs @@ -69,7 +69,7 @@ impl Barrier { /// Barriers are re-usable after all threads have rendezvoused once, and can /// be used continuously. pub fn wait(&self) { - let mut lock = self.lock.lock(); + let mut lock = self.lock.lock().unwrap(); let local_gen = lock.generation_id; lock.count += 1; if lock.count < self.num_threads { @@ -77,7 +77,7 @@ impl Barrier { // http://en.wikipedia.org/wiki/Spurious_wakeup while local_gen == lock.generation_id && lock.count < self.num_threads { - self.cvar.wait(&lock); + lock = self.cvar.wait(lock).unwrap(); } } else { lock.count = 0; diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index f1940bfd829..15faf5be258 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -11,10 +11,11 @@ use prelude::*; use sync::atomic::{mod, AtomicUint}; -use sync::{mutex, StaticMutexGuard}; +use sync::poison::{mod, LockResult}; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; use time::Duration; +use sync::{mutex, MutexGuard}; /// A Condition Variable /// @@ -44,18 +45,19 @@ use time::Duration; /// // Inside of our lock, spawn a new thread, and then wait for it to start /// Thread::spawn(move|| { /// let &(ref lock, ref cvar) = &*pair2; -/// let mut started = lock.lock(); +/// let mut started = lock.lock().unwrap(); /// *started = true; /// cvar.notify_one(); /// }).detach(); /// /// // wait for the thread to start up /// let &(ref lock, ref cvar) = &*pair; -/// let started = lock.lock(); +/// let mut started = lock.lock().unwrap(); /// while !*started { -/// cvar.wait(&started); +/// started = cvar.wait(started).unwrap(); /// } /// ``` +#[stable] pub struct Condvar { inner: Box<StaticCondvar> } unsafe impl Send for Condvar {} @@ -73,6 +75,7 @@ unsafe impl Sync for Condvar {} /// /// static CVAR: StaticCondvar = CONDVAR_INIT; /// ``` +#[unstable = "may be merged with Condvar in the future"] pub struct StaticCondvar { inner: sys::Condvar, mutex: AtomicUint, @@ -82,24 +85,16 @@ unsafe impl Send for StaticCondvar {} unsafe impl Sync for StaticCondvar {} /// Constant initializer for a statically allocated condition variable. +#[unstable = "may be merged with Condvar in the future"] pub const CONDVAR_INIT: StaticCondvar = StaticCondvar { inner: sys::CONDVAR_INIT, mutex: atomic::INIT_ATOMIC_UINT, }; -/// A trait for vaules which can be passed to the waiting methods of condition -/// variables. This is implemented by the mutex guards in this module. -/// -/// Note that this trait should likely not be implemented manually unless you -/// really know what you're doing. -pub trait AsMutexGuard { - #[allow(missing_docs)] - unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard; -} - impl Condvar { /// Creates a new condition variable which is ready to be waited on and /// notified. + #[stable] pub fn new() -> Condvar { Condvar { inner: box StaticCondvar { @@ -113,8 +108,8 @@ impl Condvar { /// notification. /// /// This function will atomically unlock the mutex specified (represented by - /// `guard`) and block the current thread. This means that any calls to - /// `notify_*()` which happen logically after the mutex is unlocked are + /// `mutex_guard`) and block the current thread. This means that any calls + /// to `notify_*()` which happen logically after the mutex is unlocked are /// candidates to wake this thread up. When this function call returns, the /// lock specified will have been re-acquired. /// @@ -123,16 +118,24 @@ impl Condvar { /// the predicate must always be checked each time this function returns to /// protect against spurious wakeups. /// + /// # Failure + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about poisoning on the Mutex type. + /// /// # Panics /// /// This function will `panic!()` if it is used with more than one mutex /// over time. Each condition variable is dynamically bound to exactly one /// mutex to ensure defined behavior across platforms. If this functionality /// is not desired, then unsafe primitives in `sys` are provided. - pub fn wait<T: AsMutexGuard>(&self, mutex_guard: &T) { + #[stable] + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) + -> LockResult<MutexGuard<'a, T>> { unsafe { let me: &'static Condvar = &*(self as *const _); - me.inner.wait(mutex_guard) + me.inner.wait(guard) } } @@ -156,11 +159,11 @@ impl Condvar { // provide. There are also additional concerns about the unix-specific // implementation which may need to be addressed. #[allow(dead_code)] - fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T, - dur: Duration) -> bool { + fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration) + -> LockResult<(MutexGuard<'a, T>, bool)> { unsafe { let me: &'static Condvar = &*(self as *const _); - me.inner.wait_timeout(mutex_guard, dur) + me.inner.wait_timeout(guard, dur) } } @@ -171,6 +174,7 @@ impl Condvar { /// `notify_one` are not buffered in any way. /// /// To wake up all threads, see `notify_one()`. + #[stable] pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } } /// Wake up all blocked threads on this condvar. @@ -180,6 +184,7 @@ impl Condvar { /// way. /// /// To wake up only one thread, see `notify_one()`. + #[stable] pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } } } @@ -194,13 +199,19 @@ impl StaticCondvar { /// notification. /// /// See `Condvar::wait`. - pub fn wait<T: AsMutexGuard>(&'static self, mutex_guard: &T) { - unsafe { - let lock = mutex_guard.as_mutex_guard(); - let sys = mutex::guard_lock(lock); - self.verify(sys); - self.inner.wait(sys); - (*mutex::guard_poison(lock)).check("mutex"); + #[unstable = "may be merged with Condvar in the future"] + pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>) + -> LockResult<MutexGuard<'a, T>> { + let poisoned = unsafe { + let lock = mutex::guard_lock(&guard); + self.verify(lock); + self.inner.wait(lock); + mutex::guard_poison(&guard).get() + }; + if poisoned { + Err(poison::new_poison_error(guard)) + } else { + Ok(guard) } } @@ -209,26 +220,31 @@ impl StaticCondvar { /// /// See `Condvar::wait_timeout`. #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above - fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T, - dur: Duration) -> bool { - unsafe { - let lock = mutex_guard.as_mutex_guard(); - let sys = mutex::guard_lock(lock); - self.verify(sys); - let ret = self.inner.wait_timeout(sys, dur); - (*mutex::guard_poison(lock)).check("mutex"); - return ret; + fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration) + -> LockResult<(MutexGuard<'a, T>, bool)> { + let (poisoned, success) = unsafe { + let lock = mutex::guard_lock(&guard); + self.verify(lock); + let success = self.inner.wait_timeout(lock, dur); + (mutex::guard_poison(&guard).get(), success) + }; + if poisoned { + Err(poison::new_poison_error((guard, success))) + } else { + Ok((guard, success)) } } /// Wake up one blocked thread on this condvar. /// /// See `Condvar::notify_one`. + #[unstable = "may be merged with Condvar in the future"] pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } } /// Wake up all blocked threads on this condvar. /// /// See `Condvar::notify_all`. + #[unstable = "may be merged with Condvar in the future"] pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } } /// Deallocate all resources associated with this static condvar. @@ -237,6 +253,7 @@ impl StaticCondvar { /// active users of the condvar, and this also doesn't prevent any future /// users of the condvar. This method is required to be called to not leak /// memory on all platforms. + #[unstable = "may be merged with Condvar in the future"] pub unsafe fn destroy(&'static self) { self.inner.destroy() } @@ -288,12 +305,12 @@ mod tests { static C: StaticCondvar = CONDVAR_INIT; static M: StaticMutex = MUTEX_INIT; - let g = M.lock(); + let g = M.lock().unwrap(); spawn(move|| { - let _g = M.lock(); + let _g = M.lock().unwrap(); C.notify_one(); }); - C.wait(&g); + let g = C.wait(g).unwrap(); drop(g); unsafe { C.destroy(); M.destroy(); } } @@ -309,13 +326,13 @@ mod tests { let tx = tx.clone(); spawn(move|| { let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock(); + let mut cnt = lock.lock().unwrap(); *cnt += 1; if *cnt == N { tx.send(()); } while *cnt != 0 { - cond.wait(&cnt); + cnt = cond.wait(cnt).unwrap(); } tx.send(()); }); @@ -324,7 +341,7 @@ mod tests { let &(ref lock, ref cond) = &*data; rx.recv(); - let mut cnt = lock.lock(); + let mut cnt = lock.lock().unwrap(); *cnt = 0; cond.notify_all(); drop(cnt); @@ -339,13 +356,15 @@ mod tests { static C: StaticCondvar = CONDVAR_INIT; static M: StaticMutex = MUTEX_INIT; - let g = M.lock(); - assert!(!C.wait_timeout(&g, Duration::nanoseconds(1000))); + let g = M.lock().unwrap(); + let (g, success) = C.wait_timeout(g, Duration::nanoseconds(1000)).unwrap(); + assert!(!success); spawn(move|| { - let _g = M.lock(); + let _g = M.lock().unwrap(); C.notify_one(); }); - assert!(C.wait_timeout(&g, Duration::days(1))); + let (g, success) = C.wait_timeout(g, Duration::days(1)).unwrap(); + assert!(success); drop(g); unsafe { C.destroy(); M.destroy(); } } @@ -357,15 +376,15 @@ mod tests { static M2: StaticMutex = MUTEX_INIT; static C: StaticCondvar = CONDVAR_INIT; - let g = M1.lock(); + let mut g = M1.lock().unwrap(); spawn(move|| { - let _g = M1.lock(); + let _g = M1.lock().unwrap(); C.notify_one(); }); - C.wait(&g); + g = C.wait(g).unwrap(); drop(g); - C.wait(&M2.lock()); + C.wait(M2.lock().unwrap()).unwrap(); } } diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 7605a6a96a0..092acc7ff25 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -19,14 +19,15 @@ pub use alloc::arc::{Arc, Weak}; -pub use self::mutex::{Mutex, MutexGuard, StaticMutex, StaticMutexGuard, MUTEX_INIT}; +pub use self::mutex::{Mutex, MutexGuard, StaticMutex}; +pub use self::mutex::MUTEX_INIT; pub use self::rwlock::{RWLock, StaticRWLock, RWLOCK_INIT}; pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard}; -pub use self::rwlock::{StaticRWLockReadGuard, StaticRWLockWriteGuard}; -pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT, AsMutexGuard}; +pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT}; pub use self::once::{Once, ONCE_INIT}; pub use self::semaphore::{Semaphore, SemaphoreGuard}; pub use self::barrier::Barrier; +pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; pub use self::future::Future; pub use self::task_pool::TaskPool; diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index 4d2fbfc4055..52004bb4a8f 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -12,7 +12,7 @@ use prelude::*; use cell::UnsafeCell; use kinds::marker; -use sync::{poison, AsMutexGuard}; +use sync::poison::{mod, TryLockError, TryLockResult, LockResult}; use sys_common::mutex as sys; /// A mutual exclusion primitive useful for protecting shared data @@ -26,12 +26,23 @@ use sys_common::mutex as sys; /// /// # Poisoning /// -/// In order to prevent access to otherwise invalid data, each mutex will -/// propagate any panics which occur while the lock is held. Once a thread has -/// panicked while holding the lock, then all other threads will immediately -/// panic as well once they hold the lock. +/// The mutexes in this module implement a strategy called "poisoning" where a +/// mutex is considered poisoned whenever a thread panics while holding the +/// lock. Once a mutex is poisoned, all other tasks are unable to access the +/// data by default as it is likely tainted (some invariant is not being +/// upheld). /// -/// # Example +/// For a mutex, this means that the `lock` and `try_lock` methods return a +/// `Result` which indicates whether a mutex has been poisoned or not. Most +/// usage of a mutex will simply `unwrap()` these results, propagating panics +/// among threads to ensure that a possibly invalid invariant is not witnessed. +/// +/// A poisoned mutex, however, does not prevent all access to the underlying +/// data. The `PoisonError` type has an `into_guard` method which will return +/// the guard that would have otherwise been returned on a successful lock. This +/// allows access to the data, despite the lock being poisoned. +/// +/// # Examples /// /// ```rust /// use std::sync::{Arc, Mutex}; @@ -48,11 +59,14 @@ use sys_common::mutex as sys; /// let (tx, rx) = channel(); /// for _ in range(0u, 10) { /// let (data, tx) = (data.clone(), tx.clone()); -/// Thread::spawn(move|| { +/// Thread::spawn(move || { /// // The shared static can only be accessed once the lock is held. /// // Our non-atomic increment is safe because we're the only thread /// // which can access the shared state when the lock is held. -/// let mut data = data.lock(); +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // tasks to ever fail while holding the lock. +/// let mut data = data.lock().unwrap(); /// *data += 1; /// if *data == N { /// tx.send(()); @@ -63,6 +77,36 @@ use sys_common::mutex as sys; /// /// rx.recv(); /// ``` +/// +/// To recover from a poisoned mutex: +/// +/// ```rust +/// use std::sync::{Arc, Mutex}; +/// use std::thread::Thread; +/// +/// let lock = Arc::new(Mutex::new(0u)); +/// let lock2 = lock.clone(); +/// +/// let _ = Thread::spawn(move || -> () { +/// // This thread will acquire the mutex first, unwrapping the result of +/// // `lock` because the lock has not been poisoned. +/// let _lock = lock2.lock().unwrap(); +/// +/// // This panic while holding the lock (`_guard` is in scope) will poison +/// // the mutex. +/// panic!(); +/// }).join(); +/// +/// // The lock is poisoned by this point, but the returned result can be +/// // pattern matched on to return the underlying guard on both branches. +/// let mut guard = match lock.lock() { +/// Ok(guard) => guard, +/// Err(poisoned) => poisoned.into_guard(), +/// }; +/// +/// *guard += 1; +/// ``` +#[stable] pub struct Mutex<T> { // Note that this static mutex is in a *box*, not inlined into the struct // itself. Once a native mutex has been used once, its address can never @@ -93,14 +137,15 @@ unsafe impl<T:Send> Sync for Mutex<T> { } /// static LOCK: StaticMutex = MUTEX_INIT; /// /// { -/// let _g = LOCK.lock(); +/// let _g = LOCK.lock().unwrap(); /// // do some productive work /// } /// // lock is unlocked here. /// ``` +#[unstable = "may be merged with Mutex in the future"] pub struct StaticMutex { lock: sys::Mutex, - poison: UnsafeCell<poison::Flag>, + poison: poison::Flag, } unsafe impl Sync for StaticMutex {} @@ -111,31 +156,27 @@ unsafe impl Sync for StaticMutex {} /// The data protected by the mutex can be access through this guard via its /// Deref and DerefMut implementations #[must_use] +#[stable] pub struct MutexGuard<'a, T: 'a> { // funny underscores due to how Deref/DerefMut currently work (they // disregard field privacy). - __lock: &'a Mutex<T>, - __guard: StaticMutexGuard, -} - -/// An RAII implementation of a "scoped lock" of a static mutex. When this -/// structure is dropped (falls out of scope), the lock will be unlocked. -#[must_use] -pub struct StaticMutexGuard { - lock: &'static sys::Mutex, - marker: marker::NoSend, - poison: poison::Guard<'static>, + __lock: &'a StaticMutex, + __data: &'a UnsafeCell<T>, + __poison: poison::Guard, + __marker: marker::NoSend, } /// Static initialization of a mutex. This constant can be used to initialize /// other mutex constants. +#[unstable = "may be merged with Mutex in the future"] pub const MUTEX_INIT: StaticMutex = StaticMutex { lock: sys::MUTEX_INIT, - poison: UnsafeCell { value: poison::Flag { failed: false } }, + poison: poison::FLAG_INIT, }; impl<T: Send> Mutex<T> { /// Creates a new mutex in an unlocked state ready for use. + #[stable] pub fn new(t: T) -> Mutex<T> { Mutex { inner: box MUTEX_INIT, @@ -150,15 +191,14 @@ impl<T: Send> Mutex<T> { /// held. An RAII guard is returned to allow scoped unlock of the lock. When /// the guard goes out of scope, the mutex will be unlocked. /// - /// # Panics + /// # Failure /// /// If another user of this mutex panicked while holding the mutex, then - /// this call will immediately panic once the mutex is acquired. - pub fn lock(&self) -> MutexGuard<T> { - unsafe { - let lock: &'static StaticMutex = &*(&*self.inner as *const _); - MutexGuard::new(self, lock.lock()) - } + /// this call will return an error once the mutex is acquired. + #[stable] + pub fn lock(&self) -> LockResult<MutexGuard<T>> { + unsafe { self.inner.lock.lock() } + MutexGuard::new(&*self.inner, &self.data) } /// Attempts to acquire this lock. @@ -169,17 +209,17 @@ impl<T: Send> Mutex<T> { /// /// This function does not block. /// - /// # Panics + /// # Failure /// /// If another user of this mutex panicked while holding the mutex, then - /// this call will immediately panic if the mutex would otherwise be + /// this call will return failure if the mutex would otherwise be /// acquired. - pub fn try_lock(&self) -> Option<MutexGuard<T>> { - unsafe { - let lock: &'static StaticMutex = &*(&*self.inner as *const _); - lock.try_lock().map(|guard| { - MutexGuard::new(self, guard) - }) + #[stable] + pub fn try_lock(&self) -> TryLockResult<MutexGuard<T>> { + if unsafe { self.inner.lock.try_lock() } { + Ok(try!(MutexGuard::new(&*self.inner, &self.data))) + } else { + Err(TryLockError::WouldBlock) } } } @@ -194,19 +234,27 @@ impl<T: Send> Drop for Mutex<T> { } } +struct Dummy(UnsafeCell<()>); +unsafe impl Sync for Dummy {} +static DUMMY: Dummy = Dummy(UnsafeCell { value: () }); + impl StaticMutex { /// Acquires this lock, see `Mutex::lock` - pub fn lock(&'static self) -> StaticMutexGuard { + #[inline] + #[unstable = "may be merged with Mutex in the future"] + pub fn lock(&'static self) -> LockResult<MutexGuard<()>> { unsafe { self.lock.lock() } - StaticMutexGuard::new(self) + MutexGuard::new(self, &DUMMY.0) } /// Attempts to grab this lock, see `Mutex::try_lock` - pub fn try_lock(&'static self) -> Option<StaticMutexGuard> { + #[inline] + #[unstable = "may be merged with Mutex in the future"] + pub fn try_lock(&'static self) -> TryLockResult<MutexGuard<()>> { if unsafe { self.lock.try_lock() } { - Some(StaticMutexGuard::new(self)) + Ok(try!(MutexGuard::new(self, &DUMMY.0))) } else { - None + Err(TryLockError::WouldBlock) } } @@ -220,61 +268,54 @@ impl StaticMutex { /// *all* platforms. It may be the case that some platforms do not leak /// memory if this method is not called, but this is not guaranteed to be /// true on all platforms. + #[unstable = "may be merged with Mutex in the future"] pub unsafe fn destroy(&'static self) { self.lock.destroy() } } impl<'mutex, T> MutexGuard<'mutex, T> { - fn new(lock: &Mutex<T>, guard: StaticMutexGuard) -> MutexGuard<T> { - MutexGuard { __lock: lock, __guard: guard } + fn new(lock: &'mutex StaticMutex, data: &'mutex UnsafeCell<T>) + -> LockResult<MutexGuard<'mutex, T>> { + poison::map_result(lock.poison.borrow(), |guard| { + MutexGuard { + __lock: lock, + __data: data, + __poison: guard, + __marker: marker::NoSend, + } + }) } } -impl<'mutex, T> AsMutexGuard for MutexGuard<'mutex, T> { - unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { &self.__guard } -} - impl<'mutex, T> Deref<T> for MutexGuard<'mutex, T> { - fn deref<'a>(&'a self) -> &'a T { unsafe { &*self.__lock.data.get() } } + fn deref<'a>(&'a self) -> &'a T { + unsafe { &*self.__data.get() } + } } impl<'mutex, T> DerefMut<T> for MutexGuard<'mutex, T> { fn deref_mut<'a>(&'a mut self) -> &'a mut T { - unsafe { &mut *self.__lock.data.get() } + unsafe { &mut *self.__data.get() } } } -impl StaticMutexGuard { - fn new(lock: &'static StaticMutex) -> StaticMutexGuard { +#[unsafe_destructor] +impl<'a, T> Drop for MutexGuard<'a, T> { + #[inline] + fn drop(&mut self) { unsafe { - let guard = StaticMutexGuard { - lock: &lock.lock, - marker: marker::NoSend, - poison: (*lock.poison.get()).borrow(), - }; - guard.poison.check("mutex"); - return guard; + self.__lock.poison.done(&self.__poison); + self.__lock.lock.unlock(); } } } -pub fn guard_lock(guard: &StaticMutexGuard) -> &sys::Mutex { guard.lock } -pub fn guard_poison(guard: &StaticMutexGuard) -> &poison::Guard { - &guard.poison +pub fn guard_lock<'a, T>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex { + &guard.__lock.lock } -impl AsMutexGuard for StaticMutexGuard { - unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { self } -} - -#[unsafe_destructor] -impl Drop for StaticMutexGuard { - fn drop(&mut self) { - unsafe { - self.poison.done(); - self.lock.unlock(); - } - } +pub fn guard_poison<'a, T>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag { + &guard.__lock.poison } #[cfg(test)] @@ -292,16 +333,16 @@ mod test { #[test] fn smoke() { let m = Mutex::new(()); - drop(m.lock()); - drop(m.lock()); + drop(m.lock().unwrap()); + drop(m.lock().unwrap()); } #[test] fn smoke_static() { static M: StaticMutex = MUTEX_INIT; unsafe { - drop(M.lock()); - drop(M.lock()); + drop(M.lock().unwrap()); + drop(M.lock().unwrap()); M.destroy(); } } @@ -316,7 +357,7 @@ mod test { fn inc() { for _ in range(0, J) { unsafe { - let _g = M.lock(); + let _g = M.lock().unwrap(); CNT += 1; } } @@ -343,7 +384,7 @@ mod test { #[test] fn try_lock() { let m = Mutex::new(()); - assert!(m.try_lock().is_some()); + *m.try_lock().unwrap() = (); } #[test] @@ -355,22 +396,21 @@ mod test { // wait until parent gets in rx.recv(); let &(ref lock, ref cvar) = &*packet2.0; - let mut lock = lock.lock(); + let mut lock = lock.lock().unwrap(); *lock = true; cvar.notify_one(); }); let &(ref lock, ref cvar) = &*packet.0; - let lock = lock.lock(); + let mut lock = lock.lock().unwrap(); tx.send(()); assert!(!*lock); while !*lock { - cvar.wait(&lock); + lock = cvar.wait(lock).unwrap(); } } #[test] - #[should_fail] fn test_arc_condvar_poison() { let packet = Packet(Arc::new((Mutex::new(1i), Condvar::new()))); let packet2 = Packet(packet.0.clone()); @@ -379,31 +419,35 @@ mod test { spawn(move|| { rx.recv(); let &(ref lock, ref cvar) = &*packet2.0; - let _g = lock.lock(); + let _g = lock.lock().unwrap(); cvar.notify_one(); // Parent should fail when it wakes up. panic!(); }); let &(ref lock, ref cvar) = &*packet.0; - let lock = lock.lock(); + let mut lock = lock.lock().unwrap(); tx.send(()); while *lock == 1 { - cvar.wait(&lock); + match cvar.wait(lock) { + Ok(l) => { + lock = l; + assert_eq!(*lock, 1); + } + Err(..) => break, + } } } #[test] - #[should_fail] fn test_mutex_arc_poison() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { - let lock = arc2.lock(); + Thread::spawn(move|| { + let lock = arc2.lock().unwrap(); assert_eq!(*lock, 2); }).join(); - let lock = arc.lock(); - assert_eq!(*lock, 1); + assert!(arc.lock().is_err()); } #[test] @@ -414,8 +458,8 @@ mod test { let arc2 = Arc::new(Mutex::new(arc)); let (tx, rx) = channel(); spawn(move|| { - let lock = arc2.lock(); - let lock2 = lock.deref().lock(); + let lock = arc2.lock().unwrap(); + let lock2 = lock.deref().lock().unwrap(); assert_eq!(*lock2, 1); tx.send(()); }); @@ -432,13 +476,13 @@ mod test { } impl Drop for Unwinder { fn drop(&mut self) { - *self.i.lock() += 1; + *self.i.lock().unwrap() += 1; } } let _u = Unwinder { i: arc2 }; panic!(); }).join(); - let lock = arc.lock(); + let lock = arc.lock().unwrap(); assert_eq!(*lock, 2); } } diff --git a/src/libstd/sync/poison.rs b/src/libstd/sync/poison.rs index ad08e9873fa..edf16d99f49 100644 --- a/src/libstd/sync/poison.rs +++ b/src/libstd/sync/poison.rs @@ -8,31 +8,127 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::*; + +use cell::UnsafeCell; +use error::FromError; +use fmt; use thread::Thread; -pub struct Flag { pub failed: bool } +pub struct Flag { failed: UnsafeCell<bool> } +pub const FLAG_INIT: Flag = Flag { failed: UnsafeCell { value: false } }; impl Flag { - pub fn borrow(&mut self) -> Guard { - Guard { flag: &mut self.failed, panicking: Thread::panicking() } + #[inline] + pub fn borrow(&self) -> LockResult<Guard> { + let ret = Guard { panicking: Thread::panicking() }; + if unsafe { *self.failed.get() } { + Err(new_poison_error(ret)) + } else { + Ok(ret) + } + } + + #[inline] + pub fn done(&self, guard: &Guard) { + if !guard.panicking && Thread::panicking() { + unsafe { *self.failed.get() = true; } + } + } + + #[inline] + pub fn get(&self) -> bool { + unsafe { *self.failed.get() } } } -pub struct Guard<'a> { - flag: &'a mut bool, +#[allow(missing_copy_implementations)] +pub struct Guard { panicking: bool, } -impl<'a> Guard<'a> { - pub fn check(&self, name: &str) { - if *self.flag { - panic!("poisoned {} - another task failed inside", name); - } +/// A type of error which can be returned whenever a lock is acquired. +/// +/// Both Mutexes and RWLocks are poisoned whenever a task fails while the lock +/// is held. The precise semantics for when a lock is poisoned is documented on +/// each lock, but once a lock is poisoned then all future acquisitions will +/// return this error. +#[stable] +pub struct PoisonError<T> { + guard: T, +} + +/// An enumeration of possible errors which can occur while calling the +/// `try_lock` method. +#[stable] +pub enum TryLockError<T> { + /// The lock could not be acquired because another task failed while holding + /// the lock. + #[stable] + Poisoned(PoisonError<T>), + /// The lock could not be acquired at this time because the operation would + /// otherwise block. + #[stable] + WouldBlock, +} + +/// A type alias for the result of a lock method which can be poisoned. +/// +/// The `Ok` variant of this result indicates that the primitive was not +/// poisoned, and the `Guard` is contained within. The `Err` variant indicates +/// that the primitive was poisoned. Note that the `Err` variant *also* carries +/// the associated guard, and it can be acquired through the `into_inner` +/// method. +#[stable] +pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>; + +/// A type alias for the result of a nonblocking locking method. +/// +/// For more information, see `LockResult`. A `TryLockResult` doesn't +/// necessarily hold the associated guard in the `Err` type as the lock may not +/// have been acquired for other reasons. +#[stable] +pub type TryLockResult<Guard> = Result<Guard, TryLockError<Guard>>; + +impl<T> fmt::Show for PoisonError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + "poisoned lock: another task failed inside".fmt(f) + } +} + +impl<T> PoisonError<T> { + /// Consumes this error indicating that a lock is poisoned, returning the + /// underlying guard to allow access regardless. + #[stable] + pub fn into_guard(self) -> T { self.guard } +} + +impl<T> FromError<PoisonError<T>> for TryLockError<T> { + fn from_error(err: PoisonError<T>) -> TryLockError<T> { + TryLockError::Poisoned(err) } +} - pub fn done(&mut self) { - if !self.panicking && Thread::panicking() { - *self.flag = true; +impl<T> fmt::Show for TryLockError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TryLockError::Poisoned(ref p) => p.fmt(f), + TryLockError::WouldBlock => { + "try_lock failed because the operation would block".fmt(f) + } } } } + +pub fn new_poison_error<T>(guard: T) -> PoisonError<T> { + PoisonError { guard: guard } +} + +pub fn map_result<T, U, F>(result: LockResult<T>, f: F) + -> LockResult<U> + where F: FnOnce(T) -> U { + match result { + Ok(t) => Ok(f(t)), + Err(PoisonError { guard }) => Err(new_poison_error(f(guard))) + } +} diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs index 76d05d9bfd4..7f3c77c97ad 100644 --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@ -10,10 +10,10 @@ use prelude::*; -use kinds::marker; use cell::UnsafeCell; +use kinds::marker; +use sync::poison::{mod, LockResult, TryLockError, TryLockResult}; use sys_common::rwlock as sys; -use sync::poison; /// A reader-writer lock /// @@ -28,12 +28,14 @@ use sync::poison; /// locking methods implement `Deref` (and `DerefMut` for the `write` methods) /// to allow access to the contained of the lock. /// +/// # Poisoning +/// /// RWLocks, like Mutexes, will become poisoned on panics. Note, however, that /// an RWLock may only be poisoned if a panic occurs while it is locked /// exclusively (write mode). If a panic occurs in any reader, then the lock /// will not be poisoned. /// -/// # Example +/// # Examples /// /// ``` /// use std::sync::RWLock; @@ -42,19 +44,20 @@ use sync::poison; /// /// // many reader locks can be held at once /// { -/// let r1 = lock.read(); -/// let r2 = lock.read(); +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); /// assert_eq!(*r1, 5); /// assert_eq!(*r2, 5); /// } // read locks are dropped at this point /// /// // only one write lock may be held, however /// { -/// let mut w = lock.write(); +/// let mut w = lock.write().unwrap(); /// *w += 1; /// assert_eq!(*w, 6); /// } // write lock is dropped here /// ``` +#[stable] pub struct RWLock<T> { inner: Box<StaticRWLock>, data: UnsafeCell<T>, @@ -77,64 +80,55 @@ unsafe impl<T> Sync for RWLock<T> {} /// static LOCK: StaticRWLock = RWLOCK_INIT; /// /// { -/// let _g = LOCK.read(); +/// let _g = LOCK.read().unwrap(); /// // ... shared read access /// } /// { -/// let _g = LOCK.write(); +/// let _g = LOCK.write().unwrap(); /// // ... exclusive write access /// } /// unsafe { LOCK.destroy() } // free all resources /// ``` +#[unstable = "may be merged with RWLock in the future"] pub struct StaticRWLock { - inner: sys::RWLock, - poison: UnsafeCell<poison::Flag>, + lock: sys::RWLock, + poison: poison::Flag, } unsafe impl Send for StaticRWLock {} unsafe impl Sync for StaticRWLock {} /// Constant initialization for a statically-initialized rwlock. +#[unstable = "may be merged with RWLock in the future"] pub const RWLOCK_INIT: StaticRWLock = StaticRWLock { - inner: sys::RWLOCK_INIT, - poison: UnsafeCell { value: poison::Flag { failed: false } }, + lock: sys::RWLOCK_INIT, + poison: poison::FLAG_INIT, }; /// RAII structure used to release the shared read access of a lock when /// dropped. #[must_use] +#[stable] pub struct RWLockReadGuard<'a, T: 'a> { - __lock: &'a RWLock<T>, - __guard: StaticRWLockReadGuard, + __lock: &'a StaticRWLock, + __data: &'a UnsafeCell<T>, + __marker: marker::NoSend, } /// RAII structure used to release the exclusive write access of a lock when /// dropped. #[must_use] +#[stable] pub struct RWLockWriteGuard<'a, T: 'a> { - __lock: &'a RWLock<T>, - __guard: StaticRWLockWriteGuard, -} - -/// RAII structure used to release the shared read access of a lock when -/// dropped. -#[must_use] -pub struct StaticRWLockReadGuard { - lock: &'static sys::RWLock, - marker: marker::NoSend, -} - -/// RAII structure used to release the exclusive write access of a lock when -/// dropped. -#[must_use] -pub struct StaticRWLockWriteGuard { - lock: &'static sys::RWLock, - marker: marker::NoSend, - poison: poison::Guard<'static>, + __lock: &'a StaticRWLock, + __data: &'a UnsafeCell<T>, + __poison: poison::Guard, + __marker: marker::NoSend, } impl<T: Send + Sync> RWLock<T> { /// Creates a new instance of an RWLock which is unlocked and read to go. + #[stable] pub fn new(t: T) -> RWLock<T> { RWLock { inner: box RWLOCK_INIT, data: UnsafeCell::new(t) } } @@ -151,17 +145,16 @@ impl<T: Send + Sync> RWLock<T> { /// Returns an RAII guard which will release this thread's shared access /// once it is dropped. /// - /// # Panics + /// # Failure /// - /// This function will panic if the RWLock is poisoned. An RWLock is - /// poisoned whenever a writer panics while holding an exclusive lock. The - /// panic will occur immediately after the lock has been acquired. + /// This function will return an error if the RWLock is poisoned. An RWLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// The failure will occur immediately after the lock has been acquired. #[inline] - pub fn read(&self) -> RWLockReadGuard<T> { - unsafe { - let lock: &'static StaticRWLock = &*(&*self.inner as *const _); - RWLockReadGuard::new(self, lock.read()) - } + #[stable] + pub fn read(&self) -> LockResult<RWLockReadGuard<T>> { + unsafe { self.inner.lock.read() } + RWLockReadGuard::new(&*self.inner, &self.data) } /// Attempt to acquire this lock with shared read access. @@ -173,18 +166,19 @@ impl<T: Send + Sync> RWLock<T> { /// guarantees with respect to the ordering of whether contentious readers /// or writers will acquire the lock first. /// - /// # Panics + /// # Failure /// - /// This function will panic if the RWLock is poisoned. An RWLock is - /// poisoned whenever a writer panics while holding an exclusive lock. A - /// panic will only occur if the lock is acquired. + /// This function will return an error if the RWLock is poisoned. An RWLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. #[inline] - pub fn try_read(&self) -> Option<RWLockReadGuard<T>> { - unsafe { - let lock: &'static StaticRWLock = &*(&*self.inner as *const _); - lock.try_read().map(|guard| { - RWLockReadGuard::new(self, guard) - }) + #[stable] + pub fn try_read(&self) -> TryLockResult<RWLockReadGuard<T>> { + if unsafe { self.inner.lock.try_read() } { + Ok(try!(RWLockReadGuard::new(&*self.inner, &self.data))) + } else { + Err(TryLockError::WouldBlock) } } @@ -197,17 +191,16 @@ impl<T: Send + Sync> RWLock<T> { /// Returns an RAII guard which will drop the write access of this rwlock /// when dropped. /// - /// # Panics + /// # Failure /// - /// This function will panic if the RWLock is poisoned. An RWLock is - /// poisoned whenever a writer panics while holding an exclusive lock. The - /// panic will occur when the lock is acquired. + /// This function will return an error if the RWLock is poisoned. An RWLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// An error will be returned when the lock is acquired. #[inline] - pub fn write(&self) -> RWLockWriteGuard<T> { - unsafe { - let lock: &'static StaticRWLock = &*(&*self.inner as *const _); - RWLockWriteGuard::new(self, lock.write()) - } + #[stable] + pub fn write(&self) -> LockResult<RWLockWriteGuard<T>> { + unsafe { self.inner.lock.write() } + RWLockWriteGuard::new(&*self.inner, &self.data) } /// Attempt to lock this rwlock with exclusive write access. @@ -216,18 +209,19 @@ impl<T: Send + Sync> RWLock<T> { /// to `write` would otherwise block. If successful, an RAII guard is /// returned. /// - /// # Panics + /// # Failure /// - /// This function will panic if the RWLock is poisoned. An RWLock is - /// poisoned whenever a writer panics while holding an exclusive lock. A - /// panic will only occur if the lock is acquired. + /// This function will return an error if the RWLock is poisoned. An RWLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. #[inline] - pub fn try_write(&self) -> Option<RWLockWriteGuard<T>> { - unsafe { - let lock: &'static StaticRWLock = &*(&*self.inner as *const _); - lock.try_write().map(|guard| { - RWLockWriteGuard::new(self, guard) - }) + #[stable] + pub fn try_write(&self) -> TryLockResult<RWLockWriteGuard<T>> { + if unsafe { self.inner.lock.try_read() } { + Ok(try!(RWLockWriteGuard::new(&*self.inner, &self.data))) + } else { + Err(TryLockError::WouldBlock) } } } @@ -235,30 +229,37 @@ impl<T: Send + Sync> RWLock<T> { #[unsafe_destructor] impl<T> Drop for RWLock<T> { fn drop(&mut self) { - unsafe { self.inner.inner.destroy() } + unsafe { self.inner.lock.destroy() } } } +struct Dummy(UnsafeCell<()>); +unsafe impl Sync for Dummy {} +static DUMMY: Dummy = Dummy(UnsafeCell { value: () }); + impl StaticRWLock { /// Locks this rwlock with shared read access, blocking the current thread /// until it can be acquired. /// /// See `RWLock::read`. #[inline] - pub fn read(&'static self) -> StaticRWLockReadGuard { - unsafe { self.inner.read() } - StaticRWLockReadGuard::new(self) + #[unstable = "may be merged with RWLock in the future"] + pub fn read(&'static self) -> LockResult<RWLockReadGuard<'static, ()>> { + unsafe { self.lock.read() } + RWLockReadGuard::new(self, &DUMMY.0) } /// Attempt to acquire this lock with shared read access. /// /// See `RWLock::try_read`. #[inline] - pub fn try_read(&'static self) -> Option<StaticRWLockReadGuard> { - if unsafe { self.inner.try_read() } { - Some(StaticRWLockReadGuard::new(self)) + #[unstable = "may be merged with RWLock in the future"] + pub fn try_read(&'static self) + -> TryLockResult<RWLockReadGuard<'static, ()>> { + if unsafe { self.lock.try_read() } { + Ok(try!(RWLockReadGuard::new(self, &DUMMY.0))) } else { - None + Err(TryLockError::WouldBlock) } } @@ -267,20 +268,23 @@ impl StaticRWLock { /// /// See `RWLock::write`. #[inline] - pub fn write(&'static self) -> StaticRWLockWriteGuard { - unsafe { self.inner.write() } - StaticRWLockWriteGuard::new(self) + #[unstable = "may be merged with RWLock in the future"] + pub fn write(&'static self) -> LockResult<RWLockWriteGuard<'static, ()>> { + unsafe { self.lock.write() } + RWLockWriteGuard::new(self, &DUMMY.0) } /// Attempt to lock this rwlock with exclusive write access. /// /// See `RWLock::try_write`. #[inline] - pub fn try_write(&'static self) -> Option<StaticRWLockWriteGuard> { - if unsafe { self.inner.try_write() } { - Some(StaticRWLockWriteGuard::new(self)) + #[unstable = "may be merged with RWLock in the future"] + pub fn try_write(&'static self) + -> TryLockResult<RWLockWriteGuard<'static, ()>> { + if unsafe { self.lock.try_write() } { + Ok(try!(RWLockWriteGuard::new(self, &DUMMY.0))) } else { - None + Err(TryLockError::WouldBlock) } } @@ -290,70 +294,62 @@ impl StaticRWLock { /// active users of the lock, and this also doesn't prevent any future users /// of this lock. This method is required to be called to not leak memory on /// all platforms. + #[unstable = "may be merged with RWLock in the future"] pub unsafe fn destroy(&'static self) { - self.inner.destroy() + self.lock.destroy() } } impl<'rwlock, T> RWLockReadGuard<'rwlock, T> { - fn new(lock: &RWLock<T>, guard: StaticRWLockReadGuard) - -> RWLockReadGuard<T> { - RWLockReadGuard { __lock: lock, __guard: guard } + fn new(lock: &'rwlock StaticRWLock, data: &'rwlock UnsafeCell<T>) + -> LockResult<RWLockReadGuard<'rwlock, T>> { + poison::map_result(lock.poison.borrow(), |_| { + RWLockReadGuard { + __lock: lock, + __data: data, + __marker: marker::NoSend, + } + }) } } impl<'rwlock, T> RWLockWriteGuard<'rwlock, T> { - fn new(lock: &RWLock<T>, guard: StaticRWLockWriteGuard) - -> RWLockWriteGuard<T> { - RWLockWriteGuard { __lock: lock, __guard: guard } + fn new(lock: &'rwlock StaticRWLock, data: &'rwlock UnsafeCell<T>) + -> LockResult<RWLockWriteGuard<'rwlock, T>> { + poison::map_result(lock.poison.borrow(), |guard| { + RWLockWriteGuard { + __lock: lock, + __data: data, + __poison: guard, + __marker: marker::NoSend, + } + }) } } impl<'rwlock, T> Deref<T> for RWLockReadGuard<'rwlock, T> { - fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } } + fn deref(&self) -> &T { unsafe { &*self.__data.get() } } } impl<'rwlock, T> Deref<T> for RWLockWriteGuard<'rwlock, T> { - fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } } + fn deref(&self) -> &T { unsafe { &*self.__data.get() } } } impl<'rwlock, T> DerefMut<T> for RWLockWriteGuard<'rwlock, T> { - fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.__lock.data.get() } } -} - -impl StaticRWLockReadGuard { - fn new(lock: &'static StaticRWLock) -> StaticRWLockReadGuard { - let guard = StaticRWLockReadGuard { - lock: &lock.inner, - marker: marker::NoSend, - }; - unsafe { (*lock.poison.get()).borrow().check("rwlock"); } - return guard; - } -} -impl StaticRWLockWriteGuard { - fn new(lock: &'static StaticRWLock) -> StaticRWLockWriteGuard { - unsafe { - let guard = StaticRWLockWriteGuard { - lock: &lock.inner, - marker: marker::NoSend, - poison: (*lock.poison.get()).borrow(), - }; - guard.poison.check("rwlock"); - return guard; - } + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.__data.get() } } } #[unsafe_destructor] -impl Drop for StaticRWLockReadGuard { +impl<'a, T> Drop for RWLockReadGuard<'a, T> { fn drop(&mut self) { - unsafe { self.lock.read_unlock(); } + unsafe { self.__lock.lock.read_unlock(); } } } #[unsafe_destructor] -impl Drop for StaticRWLockWriteGuard { +impl<'a, T> Drop for RWLockWriteGuard<'a, T> { fn drop(&mut self) { - self.poison.done(); - unsafe { self.lock.write_unlock(); } + self.__lock.poison.done(&self.__poison); + unsafe { self.__lock.lock.write_unlock(); } } } @@ -368,19 +364,19 @@ mod tests { #[test] fn smoke() { let l = RWLock::new(()); - drop(l.read()); - drop(l.write()); - drop((l.read(), l.read())); - drop(l.write()); + drop(l.read().unwrap()); + drop(l.write().unwrap()); + drop((l.read().unwrap(), l.read().unwrap())); + drop(l.write().unwrap()); } #[test] fn static_smoke() { static R: StaticRWLock = RWLOCK_INIT; - drop(R.read()); - drop(R.write()); - drop((R.read(), R.read())); - drop(R.write()); + drop(R.read().unwrap()); + drop(R.write().unwrap()); + drop((R.read().unwrap(), R.read().unwrap())); + drop(R.write().unwrap()); unsafe { R.destroy(); } } @@ -394,12 +390,12 @@ mod tests { for _ in range(0, N) { let tx = tx.clone(); spawn(move|| { - let mut rng = rand::task_rng(); + let mut rng = rand::thread_rng(); for _ in range(0, M) { if rng.gen_weighted_bool(N) { - drop(R.write()); + drop(R.write().unwrap()); } else { - drop(R.read()); + drop(R.read().unwrap()); } } drop(tx); @@ -411,51 +407,47 @@ mod tests { } #[test] - #[should_fail] fn test_rw_arc_poison_wr() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { - let lock = arc2.write(); - assert_eq!(*lock, 2); + let _: Result<uint, _> = Thread::spawn(move|| { + let _lock = arc2.write().unwrap(); + panic!(); }).join(); - let lock = arc.read(); - assert_eq!(*lock, 1); + assert!(arc.read().is_err()); } #[test] - #[should_fail] fn test_rw_arc_poison_ww() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { - let lock = arc2.write(); - assert_eq!(*lock, 2); + let _: Result<uint, _> = Thread::spawn(move|| { + let _lock = arc2.write().unwrap(); + panic!(); }).join(); - let lock = arc.write(); - assert_eq!(*lock, 1); + assert!(arc.write().is_err()); } #[test] fn test_rw_arc_no_poison_rr() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { - let lock = arc2.read(); - assert_eq!(*lock, 2); + let _: Result<uint, _> = Thread::spawn(move|| { + let _lock = arc2.read().unwrap(); + panic!(); }).join(); - let lock = arc.read(); + let lock = arc.read().unwrap(); assert_eq!(*lock, 1); } #[test] fn test_rw_arc_no_poison_rw() { let arc = Arc::new(RWLock::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { - let lock = arc2.read(); - assert_eq!(*lock, 2); + let _: Result<uint, _> = Thread::spawn(move|| { + let _lock = arc2.read().unwrap(); + panic!() }).join(); - let lock = arc.write(); + let lock = arc.write().unwrap(); assert_eq!(*lock, 1); } @@ -466,7 +458,7 @@ mod tests { let (tx, rx) = channel(); Thread::spawn(move|| { - let mut lock = arc2.write(); + let mut lock = arc2.write().unwrap(); for _ in range(0u, 10) { let tmp = *lock; *lock = -1; @@ -481,7 +473,7 @@ mod tests { for _ in range(0u, 5) { let arc3 = arc.clone(); children.push(Thread::spawn(move|| { - let lock = arc3.read(); + let lock = arc3.read().unwrap(); assert!(*lock >= 0); })); } @@ -493,7 +485,7 @@ mod tests { // Wait for writer to finish rx.recv(); - let lock = arc.read(); + let lock = arc.read().unwrap(); assert_eq!(*lock, 10); } @@ -507,14 +499,14 @@ mod tests { } impl Drop for Unwinder { fn drop(&mut self) { - let mut lock = self.i.write(); + let mut lock = self.i.write().unwrap(); *lock += 1; } } let _u = Unwinder { i: arc2 }; panic!(); }).join(); - let lock = arc.read(); + let lock = arc.read().unwrap(); assert_eq!(*lock, 2); } } diff --git a/src/libstd/sync/semaphore.rs b/src/libstd/sync/semaphore.rs index 574b0f22bee..e3b683a6ccb 100644 --- a/src/libstd/sync/semaphore.rs +++ b/src/libstd/sync/semaphore.rs @@ -68,9 +68,9 @@ impl Semaphore { /// This method will block until the internal count of the semaphore is at /// least 1. pub fn acquire(&self) { - let mut count = self.lock.lock(); + let mut count = self.lock.lock().unwrap(); while *count <= 0 { - self.cvar.wait(&count); + count = self.cvar.wait(count).unwrap(); } *count -= 1; } @@ -80,7 +80,7 @@ impl Semaphore { /// This will increment the number of resources in this semaphore by 1 and /// will notify any pending waiters in `acquire` or `access` if necessary. pub fn release(&self) { - *self.lock.lock() += 1; + *self.lock.lock().unwrap() += 1; self.cvar.notify_one(); } diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs index 366e4b7d35b..ee534f6cdde 100644 --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! Abstraction of a task pool for basic parallelism. +//! Abstraction of a thread pool for basic parallelism. use core::prelude::*; @@ -45,9 +45,9 @@ impl<'a> Drop for Sentinel<'a> { } } -/// A task pool used to execute functions in parallel. +/// A thread pool used to execute functions in parallel. /// -/// Spawns `n` worker tasks and replenishes the pool if any worker tasks +/// Spawns `n` worker threads and replenishes the pool if any worker threads /// panic. /// /// # Example @@ -69,34 +69,34 @@ impl<'a> Drop for Sentinel<'a> { /// assert_eq!(rx.iter().take(8u).sum(), 8u); /// ``` pub struct TaskPool { - // How the taskpool communicates with subtasks. + // How the threadpool communicates with subthreads. // - // This is the only such Sender, so when it is dropped all subtasks will + // This is the only such Sender, so when it is dropped all subthreads will // quit. jobs: Sender<Thunk> } impl TaskPool { - /// Spawns a new task pool with `tasks` tasks. + /// Spawns a new thread pool with `threads` threads. /// /// # Panics /// - /// This function will panic if `tasks` is 0. - pub fn new(tasks: uint) -> TaskPool { - assert!(tasks >= 1); + /// This function will panic if `threads` is 0. + pub fn new(threads: uint) -> TaskPool { + assert!(threads >= 1); let (tx, rx) = channel::<Thunk>(); let rx = Arc::new(Mutex::new(rx)); - // Taskpool tasks. - for _ in range(0, tasks) { + // Threadpool threads + for _ in range(0, threads) { spawn_in_pool(rx.clone()); } TaskPool { jobs: tx } } - /// Executes the function `job` on a task in the pool. + /// Executes the function `job` on a thread in the pool. pub fn execute<F>(&self, job: F) where F : FnOnce(), F : Send { @@ -106,14 +106,14 @@ impl TaskPool { fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) { Thread::spawn(move |:| { - // Will spawn a new task on panic unless it is cancelled. + // Will spawn a new thread on panic unless it is cancelled. let sentinel = Sentinel::new(&jobs); loop { let message = { // Only lock jobs for the time it takes // to get a job, not run it. - let lock = jobs.lock(); + let lock = jobs.lock().unwrap(); lock.recv_opt() }; @@ -165,12 +165,12 @@ mod test { let pool = TaskPool::new(TEST_TASKS); - // Panic all the existing tasks. + // Panic all the existing threads. for _ in range(0, TEST_TASKS) { pool.execute(move|| -> () { panic!() }); } - // Ensure new tasks were spawned to compensate. + // Ensure new threads were spawned to compensate. let (tx, rx) = channel(); for _ in range(0, TEST_TASKS) { let tx = tx.clone(); @@ -189,7 +189,7 @@ mod test { let pool = TaskPool::new(TEST_TASKS); let waiter = Arc::new(Barrier::new(TEST_TASKS + 1)); - // Panic all the existing tasks in a bit. + // Panic all the existing threads in a bit. for _ in range(0, TEST_TASKS) { let waiter = waiter.clone(); pool.execute(move|| { |
