about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorSteven Fackler <sfackler@gmail.com>2015-01-13 21:24:26 -0800
committerSteven Fackler <sfackler@gmail.com>2015-01-16 09:17:37 -0800
commit08f6380a9f0b866796080094f44fe25ea5636547 (patch)
treeb07f1154c5095e48f96369ce9d37988e33ae3d71 /src/libstd/sync
parent3d5fbae33897a8340542f21b6ded913148ca9199 (diff)
downloadrust-08f6380a9f0b866796080094f44fe25ea5636547.tar.gz
rust-08f6380a9f0b866796080094f44fe25ea5636547.zip
Rewrite Condvar::wait_timeout and make it public
**The implementation is a direct adaptation of libcxx's
condition_variable implementation.**

pthread_cond_timedwait uses the non-monotonic system clock. It's
possible to change the clock to a monotonic via pthread_cond_attr, but
this is incompatible with static initialization. To deal with this, we
calculate the timeout using the system clock, and maintain a separate
record of the start and end times with a monotonic clock to be used for
calculation of the return value.
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/condvar.rs123
-rw-r--r--src/libstd/sync/poison.rs17
2 files changed, 129 insertions, 11 deletions
diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs
index bcd5f56a353..b8b186f31e0 100644
--- a/src/libstd/sync/condvar.rs
+++ b/src/libstd/sync/condvar.rs
@@ -12,6 +12,7 @@ use prelude::v1::*;
 
 use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
 use sync::poison::{self, LockResult};
+use sys::time::SteadyTime;
 use sys_common::condvar as sys;
 use sys_common::mutex as sys_mutex;
 use time::Duration;
@@ -153,13 +154,8 @@ impl Condvar {
     ///
     /// Like `wait`, the lock specified will be re-acquired when this function
     /// returns, regardless of whether the timeout elapsed or not.
-    // Note that this method is *not* public, and this is quite intentional
-    // because we're not quite sure about the semantics of relative vs absolute
-    // durations or how the timing guarantees play into what the system APIs
-    // provide. There are also additional concerns about the unix-specific
-    // implementation which may need to be addressed.
-    #[allow(dead_code)]
-    fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
+    #[unstable]
+    pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
                            -> LockResult<(MutexGuard<'a, T>, bool)> {
         unsafe {
             let me: &'static Condvar = &*(self as *const _);
@@ -167,6 +163,25 @@ impl Condvar {
         }
     }
 
+    /// Wait on this condition variable for a notification, timing out after a
+    /// specified duration.
+    ///
+    /// The semantics of this function are equivalent to `wait_timeout` except
+    /// that the implementation will repeatedly wait while the duration has not
+    /// passed and the provided function returns `false`.
+    #[unstable]
+    pub fn wait_timeout_with<'a, T, F>(&self,
+                                       guard: MutexGuard<'a, T>,
+                                       dur: Duration,
+                                       f: F)
+                                       -> LockResult<(MutexGuard<'a, T>, bool)>
+            where F: FnMut(LockResult<&mut T>) -> bool {
+        unsafe {
+            let me: &'static Condvar = &*(self as *const _);
+            me.inner.wait_timeout_with(guard, dur, f)
+        }
+    }
+
     /// Wake up one blocked thread on this condvar.
     ///
     /// If there is a blocked thread on this condition variable, then it will
@@ -220,9 +235,9 @@ impl StaticCondvar {
     /// specified duration.
     ///
     /// See `Condvar::wait_timeout`.
-    #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
-    fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-                           -> LockResult<(MutexGuard<'a, T>, bool)> {
+    #[unstable = "may be merged with Condvar in the future"]
+    pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
+                               -> LockResult<(MutexGuard<'a, T>, bool)> {
         let (poisoned, success) = unsafe {
             let lock = mutex::guard_lock(&guard);
             self.verify(lock);
@@ -236,6 +251,50 @@ impl StaticCondvar {
         }
     }
 
+    /// Wait on this condition variable for a notification, timing out after a
+    /// specified duration.
+    ///
+    /// The implementation will repeatedly wait while the duration has not
+    /// passed and the function returns `false`.
+    ///
+    /// See `Condvar::wait_timeout_with`.
+    #[unstable = "may be merged with Condvar in the future"]
+    pub fn wait_timeout_with<'a, T, F>(&'static self,
+                                       guard: MutexGuard<'a, T>,
+                                       dur: Duration,
+                                       mut f: F)
+                                       -> LockResult<(MutexGuard<'a, T>, bool)>
+            where F: FnMut(LockResult<&mut T>) -> bool {
+        // This could be made more efficient by pushing the implementation into sys::condvar
+        let start = SteadyTime::now();
+        let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
+        while !f(guard_result
+                    .as_mut()
+                    .map(|g| &mut **g)
+                    .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
+            let now = SteadyTime::now();
+            let consumed = &now - &start;
+            let guard = guard_result.unwrap_or_else(|e| e.into_inner());
+            let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
+                Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
+                Err(err) => {
+                    let (new_guard, no_timeout) = err.into_inner();
+                    (Err(poison::new_poison_error(new_guard)), no_timeout)
+                }
+            };
+            guard_result = new_guard_result;
+            if !no_timeout {
+                let result = f(guard_result
+                                    .as_mut()
+                                    .map(|g| &mut **g)
+                                    .map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
+                return poison::map_result(guard_result, |g| (g, result));
+            }
+        }
+
+        poison::map_result(guard_result, |g| (g, true))
+    }
+
     /// Wake up one blocked thread on this condvar.
     ///
     /// See `Condvar::notify_one`.
@@ -285,6 +344,7 @@ mod tests {
     use super::{StaticCondvar, CONDVAR_INIT};
     use sync::mpsc::channel;
     use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
+    use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
     use thread::Thread;
     use time::Duration;
 
@@ -373,6 +433,49 @@ mod tests {
     }
 
     #[test]
+    fn wait_timeout_with() {
+        static C: StaticCondvar = CONDVAR_INIT;
+        static M: StaticMutex = MUTEX_INIT;
+        static S: AtomicUsize = ATOMIC_USIZE_INIT;
+
+        let g = M.lock().unwrap();
+        let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
+        assert!(!success);
+
+        let (tx, rx) = channel();
+        let _t = Thread::scoped(move || {
+            rx.recv().unwrap();
+            let g = M.lock().unwrap();
+            S.store(1, Ordering::SeqCst);
+            C.notify_one();
+            drop(g);
+
+            rx.recv().unwrap();
+            let g = M.lock().unwrap();
+            S.store(2, Ordering::SeqCst);
+            C.notify_one();
+            drop(g);
+
+            rx.recv().unwrap();
+            let _g = M.lock().unwrap();
+            S.store(3, Ordering::SeqCst);
+            C.notify_one();
+        });
+
+        let mut state = 0;
+        let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
+            assert_eq!(state, S.load(Ordering::SeqCst));
+            tx.send(()).unwrap();
+            state += 1;
+            match state {
+                1|2 => false,
+                _ => true,
+            }
+        }).unwrap();
+        assert!(success);
+    }
+
+    #[test]
     #[should_fail]
     fn two_mutexes() {
         static M1: StaticMutex = MUTEX_INIT;
diff --git a/src/libstd/sync/poison.rs b/src/libstd/sync/poison.rs
index 385df45b400..cc8c331ef39 100644
--- a/src/libstd/sync/poison.rs
+++ b/src/libstd/sync/poison.rs
@@ -99,8 +99,23 @@ impl<T> fmt::Show for PoisonError<T> {
 impl<T> PoisonError<T> {
     /// Consumes this error indicating that a lock is poisoned, returning the
     /// underlying guard to allow access regardless.
-    #[stable]
+    #[deprecated="renamed to into_inner"]
     pub fn into_guard(self) -> T { self.guard }
+
+    /// Consumes this error indicating that a lock is poisoned, returning the
+    /// underlying guard to allow access regardless.
+    #[unstable]
+    pub fn into_inner(self) -> T { self.guard }
+
+    /// Reaches into this error indicating that a lock is poisoned, returning a
+    /// reference to the underlying guard to allow access regardless.
+    #[unstable]
+    pub fn get_ref(&self) -> &T { &self.guard }
+
+    /// Reaches into this error indicating that a lock is poisoned, returning a
+    /// mutable reference to the underlying guard to allow access regardless.
+    #[unstable]
+    pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
 }
 
 impl<T> FromError<PoisonError<T>> for TryLockError<T> {