diff options
| author | Steven Fackler <sfackler@gmail.com> | 2015-01-13 21:24:26 -0800 |
|---|---|---|
| committer | Steven Fackler <sfackler@gmail.com> | 2015-01-16 09:17:37 -0800 |
| commit | 08f6380a9f0b866796080094f44fe25ea5636547 (patch) | |
| tree | b07f1154c5095e48f96369ce9d37988e33ae3d71 /src/libstd/sync | |
| parent | 3d5fbae33897a8340542f21b6ded913148ca9199 (diff) | |
| download | rust-08f6380a9f0b866796080094f44fe25ea5636547.tar.gz rust-08f6380a9f0b866796080094f44fe25ea5636547.zip | |
Rewrite Condvar::wait_timeout and make it public
**The implementation is a direct adaptation of libcxx's condition_variable implementation.** pthread_cond_timedwait uses the non-monotonic system clock. It's possible to change the clock to a monotonic via pthread_cond_attr, but this is incompatible with static initialization. To deal with this, we calculate the timeout using the system clock, and maintain a separate record of the start and end times with a monotonic clock to be used for calculation of the return value.
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/condvar.rs | 123 | ||||
| -rw-r--r-- | src/libstd/sync/poison.rs | 17 |
2 files changed, 129 insertions, 11 deletions
diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index bcd5f56a353..b8b186f31e0 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -12,6 +12,7 @@ use prelude::v1::*; use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use sync::poison::{self, LockResult}; +use sys::time::SteadyTime; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; use time::Duration; @@ -153,13 +154,8 @@ impl Condvar { /// /// Like `wait`, the lock specified will be re-acquired when this function /// returns, regardless of whether the timeout elapsed or not. - // Note that this method is *not* public, and this is quite intentional - // because we're not quite sure about the semantics of relative vs absolute - // durations or how the timing guarantees play into what the system APIs - // provide. There are also additional concerns about the unix-specific - // implementation which may need to be addressed. - #[allow(dead_code)] - fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration) + #[unstable] + pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration) -> LockResult<(MutexGuard<'a, T>, bool)> { unsafe { let me: &'static Condvar = &*(self as *const _); @@ -167,6 +163,25 @@ impl Condvar { } } + /// Wait on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to `wait_timeout` except + /// that the implementation will repeatedly wait while the duration has not + /// passed and the provided function returns `false`. + #[unstable] + pub fn wait_timeout_with<'a, T, F>(&self, + guard: MutexGuard<'a, T>, + dur: Duration, + f: F) + -> LockResult<(MutexGuard<'a, T>, bool)> + where F: FnMut(LockResult<&mut T>) -> bool { + unsafe { + let me: &'static Condvar = &*(self as *const _); + me.inner.wait_timeout_with(guard, dur, f) + } + } + /// Wake up one blocked thread on this condvar. /// /// If there is a blocked thread on this condition variable, then it will @@ -220,9 +235,9 @@ impl StaticCondvar { /// specified duration. /// /// See `Condvar::wait_timeout`. - #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above - fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration) - -> LockResult<(MutexGuard<'a, T>, bool)> { + #[unstable = "may be merged with Condvar in the future"] + pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration) + -> LockResult<(MutexGuard<'a, T>, bool)> { let (poisoned, success) = unsafe { let lock = mutex::guard_lock(&guard); self.verify(lock); @@ -236,6 +251,50 @@ impl StaticCondvar { } } + /// Wait on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The implementation will repeatedly wait while the duration has not + /// passed and the function returns `false`. + /// + /// See `Condvar::wait_timeout_with`. + #[unstable = "may be merged with Condvar in the future"] + pub fn wait_timeout_with<'a, T, F>(&'static self, + guard: MutexGuard<'a, T>, + dur: Duration, + mut f: F) + -> LockResult<(MutexGuard<'a, T>, bool)> + where F: FnMut(LockResult<&mut T>) -> bool { + // This could be made more efficient by pushing the implementation into sys::condvar + let start = SteadyTime::now(); + let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard); + while !f(guard_result + .as_mut() + .map(|g| &mut **g) + .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) { + let now = SteadyTime::now(); + let consumed = &now - &start; + let guard = guard_result.unwrap_or_else(|e| e.into_inner()); + let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) { + Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout), + Err(err) => { + let (new_guard, no_timeout) = err.into_inner(); + (Err(poison::new_poison_error(new_guard)), no_timeout) + } + }; + guard_result = new_guard_result; + if !no_timeout { + let result = f(guard_result + .as_mut() + .map(|g| &mut **g) + .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))); + return poison::map_result(guard_result, |g| (g, result)); + } + } + + poison::map_result(guard_result, |g| (g, true)) + } + /// Wake up one blocked thread on this condvar. /// /// See `Condvar::notify_one`. @@ -285,6 +344,7 @@ mod tests { use super::{StaticCondvar, CONDVAR_INIT}; use sync::mpsc::channel; use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc}; + use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use thread::Thread; use time::Duration; @@ -373,6 +433,49 @@ mod tests { } #[test] + fn wait_timeout_with() { + static C: StaticCondvar = CONDVAR_INIT; + static M: StaticMutex = MUTEX_INIT; + static S: AtomicUsize = ATOMIC_USIZE_INIT; + + let g = M.lock().unwrap(); + let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap(); + assert!(!success); + + let (tx, rx) = channel(); + let _t = Thread::scoped(move || { + rx.recv().unwrap(); + let g = M.lock().unwrap(); + S.store(1, Ordering::SeqCst); + C.notify_one(); + drop(g); + + rx.recv().unwrap(); + let g = M.lock().unwrap(); + S.store(2, Ordering::SeqCst); + C.notify_one(); + drop(g); + + rx.recv().unwrap(); + let _g = M.lock().unwrap(); + S.store(3, Ordering::SeqCst); + C.notify_one(); + }); + + let mut state = 0; + let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| { + assert_eq!(state, S.load(Ordering::SeqCst)); + tx.send(()).unwrap(); + state += 1; + match state { + 1|2 => false, + _ => true, + } + }).unwrap(); + assert!(success); + } + + #[test] #[should_fail] fn two_mutexes() { static M1: StaticMutex = MUTEX_INIT; diff --git a/src/libstd/sync/poison.rs b/src/libstd/sync/poison.rs index 385df45b400..cc8c331ef39 100644 --- a/src/libstd/sync/poison.rs +++ b/src/libstd/sync/poison.rs @@ -99,8 +99,23 @@ impl<T> fmt::Show for PoisonError<T> { impl<T> PoisonError<T> { /// Consumes this error indicating that a lock is poisoned, returning the /// underlying guard to allow access regardless. - #[stable] + #[deprecated="renamed to into_inner"] pub fn into_guard(self) -> T { self.guard } + + /// Consumes this error indicating that a lock is poisoned, returning the + /// underlying guard to allow access regardless. + #[unstable] + pub fn into_inner(self) -> T { self.guard } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// reference to the underlying guard to allow access regardless. + #[unstable] + pub fn get_ref(&self) -> &T { &self.guard } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// mutable reference to the underlying guard to allow access regardless. + #[unstable] + pub fn get_mut(&mut self) -> &mut T { &mut self.guard } } impl<T> FromError<PoisonError<T>> for TryLockError<T> { |
