about summary refs log tree commit diff
diff options
context:
space:
mode:
authorSean Cross <sean@xobs.io>2023-12-24 15:05:11 +0800
committerSean Cross <sean@xobs.io>2024-01-13 09:13:56 -0800
commit626926f073997af4c9bd971c827eece6cfa26bb2 (patch)
tree2f16d9c86460f275190265613deb290745a42963
parent118e8f78402a584becebf0c96082537fbaa8b9e6 (diff)
downloadrust-626926f073997af4c9bd971c827eece6cfa26bb2.tar.gz
rust-626926f073997af4c9bd971c827eece6cfa26bb2.zip
std: xous: rework condvar to fix soundness issues
Rework the Condvar implementation on Xous to ensure notifications are
not missed. This involves keeping track of how many times a Condvar
timed out and synchronizing based on that.

Signed-off-by: Sean Cross <sean@xobs.io>
-rw-r--r--library/std/src/sys/pal/xous/locks/condvar.rs159
1 files changed, 98 insertions, 61 deletions
diff --git a/library/std/src/sys/pal/xous/locks/condvar.rs b/library/std/src/sys/pal/xous/locks/condvar.rs
index 1bb38dfa341..510235046e1 100644
--- a/library/std/src/sys/pal/xous/locks/condvar.rs
+++ b/library/std/src/sys/pal/xous/locks/condvar.rs
@@ -1,14 +1,17 @@
 use super::mutex::Mutex;
 use crate::os::xous::ffi::{blocking_scalar, scalar};
-use crate::os::xous::services::ticktimer_server;
-use crate::sync::Mutex as StdMutex;
+use crate::os::xous::services::{ticktimer_server, TicktimerScalar};
 use crate::time::Duration;
+use core::sync::atomic::{AtomicUsize, Ordering};
 
 // The implementation is inspired by Andrew D. Birrell's paper
 // "Implementing Condition Variables with Semaphores"
 
+const NOTIFY_TRIES: usize = 3;
+
 pub struct Condvar {
-    counter: StdMutex<usize>,
+    counter: AtomicUsize,
+    timed_out: AtomicUsize,
 }
 
 unsafe impl Send for Condvar {}
@@ -18,94 +21,128 @@ impl Condvar {
     #[inline]
     #[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
     pub const fn new() -> Condvar {
-        Condvar { counter: StdMutex::new(0) }
+        Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) }
     }
 
-    pub fn notify_one(&self) {
-        let mut counter = self.counter.lock().unwrap();
-        if *counter <= 0 {
+    fn notify_some(&self, to_notify: usize) {
+        // Assumption: The Mutex protecting this condvar is locked throughout the
+        // entirety of this call, preventing calls to `wait` and `wait_timeout`.
+
+        // Logic check: Ensure that there aren't any missing waiters. Remove any that
+        // timed-out, ensuring the counter doesn't underflow.
+        assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed));
+        self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed);
+
+        // Figure out how many threads to notify. Note that it is impossible for `counter`
+        // to increase during this operation because Mutex is locked. However, it is
+        // possible for `counter` to decrease due to a condvar timing out, in which
+        // case the corresponding `timed_out` will increase accordingly.
+        let Ok(waiter_count) =
+            self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| {
+                if counter == 0 {
+                    return None;
+                } else {
+                    Some(counter - counter.min(to_notify))
+                }
+            })
+        else {
+            // No threads are waiting on this condvar
             return;
-        } else {
-            *counter -= 1;
-        }
-        let result = blocking_scalar(
-            ticktimer_server(),
-            crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), 1).into(),
-        );
-        drop(counter);
-        result.expect("failure to send NotifyCondition command");
-    }
+        };
 
-    pub fn notify_all(&self) {
-        let mut counter = self.counter.lock().unwrap();
-        if *counter <= 0 {
+        let mut remaining_to_wake = waiter_count.min(to_notify);
+        if remaining_to_wake == 0 {
             return;
         }
-        let result = blocking_scalar(
-            ticktimer_server(),
-            crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), *counter)
-                .into(),
-        );
-        *counter = 0;
-        drop(counter);
+        for _wake_tries in 0..NOTIFY_TRIES {
+            let result = blocking_scalar(
+                ticktimer_server(),
+                TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(),
+            )
+            .expect("failure to send NotifyCondition command");
+
+            // Remove the list of waiters that were notified
+            remaining_to_wake -= result[0];
+
+            // Also remove the number of waiters that timed out. Clamp it to 0 in order to
+            // ensure we don't wait forever in case the waiter woke up between the time
+            // we counted the remaining waiters and now.
+            remaining_to_wake =
+                remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed));
+            if remaining_to_wake == 0 {
+                return;
+            }
+            crate::thread::yield_now();
+        }
+    }
 
-        result.expect("failure to send NotifyCondition command");
+    pub fn notify_one(&self) {
+        self.notify_some(1)
+    }
+
+    pub fn notify_all(&self) {
+        self.notify_some(self.counter.load(Ordering::Relaxed))
     }
 
     fn index(&self) -> usize {
-        self as *const Condvar as usize
+        core::ptr::from_ref(self).addr()
     }
 
-    pub unsafe fn wait(&self, mutex: &Mutex) {
-        let mut counter = self.counter.lock().unwrap();
-        *counter += 1;
+    /// Unlock the given Mutex and wait for the notification. Wait at most
+    /// `ms` milliseconds, or pass `0` to wait forever.
+    ///
+    /// Returns `true` if the condition was received, `false` if it timed out
+    fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool {
+        self.counter.fetch_add(1, Ordering::Relaxed);
         unsafe { mutex.unlock() };
-        drop(counter);
 
+        // Threading concern: There is a chance that the `notify` thread wakes up here before
+        // we have a chance to wait for the condition. This is fine because we've recorded
+        // the fact that we're waiting by incrementing the counter.
         let result = blocking_scalar(
             ticktimer_server(),
-            crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), 0).into(),
+            TicktimerScalar::WaitForCondition(self.index(), ms).into(),
         );
+        let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;
+
+        // If we awoke due to a timeout, increment the `timed_out` counter so that the
+        // main loop of `notify` knows there's a timeout.
+        //
+        // This is done with the Mutex still unlocked, because the Mutex might still
+        // be locked by the `notify` process above.
+        if !awoken {
+            self.timed_out.fetch_add(1, Ordering::Relaxed);
+        }
+
         unsafe { mutex.lock() };
+        awoken
+    }
 
-        result.expect("Ticktimer: failure to send WaitForCondition command");
+    pub unsafe fn wait(&self, mutex: &Mutex) {
+        // Wait for 0 ms, which is a special case to "wait forever"
+        self.wait_ms(mutex, 0);
     }
 
     pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
-        let mut counter = self.counter.lock().unwrap();
-        *counter += 1;
-        unsafe { mutex.unlock() };
-        drop(counter);
-
         let mut millis = dur.as_millis() as usize;
+        // Ensure we don't wait for 0 ms, which would cause us to wait forever
         if millis == 0 {
             millis = 1;
         }
-
-        let result = blocking_scalar(
-            ticktimer_server(),
-            crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), millis)
-                .into(),
-        );
-        unsafe { mutex.lock() };
-
-        let result = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;
-
-        // If we awoke due to a timeout, decrement the wake count, as that would not have
-        // been done in the `notify()` call.
-        if !result {
-            *self.counter.lock().unwrap() -= 1;
-        }
-        result
+        self.wait_ms(mutex, millis)
     }
 }
 
 impl Drop for Condvar {
     fn drop(&mut self) {
-        scalar(
-            ticktimer_server(),
-            crate::os::xous::services::TicktimerScalar::FreeCondition(self.index()).into(),
-        )
-        .ok();
+        let remaining_count = self.counter.load(Ordering::Relaxed);
+        let timed_out = self.timed_out.load(Ordering::Relaxed);
+        assert!(
+            remaining_count - timed_out == 0,
+            "counter was {} and timed_out was {} not 0",
+            remaining_count,
+            timed_out
+        );
+        scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok();
     }
 }