diff options
| author | Sean Cross <sean@xobs.io> | 2023-12-24 15:05:11 +0800 |
|---|---|---|
| committer | Sean Cross <sean@xobs.io> | 2024-01-13 09:13:56 -0800 |
| commit | 626926f073997af4c9bd971c827eece6cfa26bb2 (patch) | |
| tree | 2f16d9c86460f275190265613deb290745a42963 | |
| parent | 118e8f78402a584becebf0c96082537fbaa8b9e6 (diff) | |
| download | rust-626926f073997af4c9bd971c827eece6cfa26bb2.tar.gz rust-626926f073997af4c9bd971c827eece6cfa26bb2.zip | |
std: xous: rework condvar to fix soundness issues
Rework the Condvar implementation on Xous to ensure notifications are not missed. This involves keeping track of how many times a Condvar timed out and synchronizing based on that. Signed-off-by: Sean Cross <sean@xobs.io>
| -rw-r--r-- | library/std/src/sys/pal/xous/locks/condvar.rs | 159 |
1 files changed, 98 insertions, 61 deletions
diff --git a/library/std/src/sys/pal/xous/locks/condvar.rs b/library/std/src/sys/pal/xous/locks/condvar.rs index 1bb38dfa341..510235046e1 100644 --- a/library/std/src/sys/pal/xous/locks/condvar.rs +++ b/library/std/src/sys/pal/xous/locks/condvar.rs @@ -1,14 +1,17 @@ use super::mutex::Mutex; use crate::os::xous::ffi::{blocking_scalar, scalar}; -use crate::os::xous::services::ticktimer_server; -use crate::sync::Mutex as StdMutex; +use crate::os::xous::services::{ticktimer_server, TicktimerScalar}; use crate::time::Duration; +use core::sync::atomic::{AtomicUsize, Ordering}; // The implementation is inspired by Andrew D. Birrell's paper // "Implementing Condition Variables with Semaphores" +const NOTIFY_TRIES: usize = 3; + pub struct Condvar { - counter: StdMutex<usize>, + counter: AtomicUsize, + timed_out: AtomicUsize, } unsafe impl Send for Condvar {} @@ -18,94 +21,128 @@ impl Condvar { #[inline] #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] pub const fn new() -> Condvar { - Condvar { counter: StdMutex::new(0) } + Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) } } - pub fn notify_one(&self) { - let mut counter = self.counter.lock().unwrap(); - if *counter <= 0 { + fn notify_some(&self, to_notify: usize) { + // Assumption: The Mutex protecting this condvar is locked throughout the + // entirety of this call, preventing calls to `wait` and `wait_timeout`. + + // Logic check: Ensure that there aren't any missing waiters. Remove any that + // timed-out, ensuring the counter doesn't underflow. + assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed)); + self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed); + + // Figure out how many threads to notify. Note that it is impossible for `counter` + // to increase during this operation because Mutex is locked. However, it is + // possible for `counter` to decrease due to a condvar timing out, in which + // case the corresponding `timed_out` will increase accordingly. + let Ok(waiter_count) = + self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| { + if counter == 0 { + return None; + } else { + Some(counter - counter.min(to_notify)) + } + }) + else { + // No threads are waiting on this condvar return; - } else { - *counter -= 1; - } - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), 1).into(), - ); - drop(counter); - result.expect("failure to send NotifyCondition command"); - } + }; - pub fn notify_all(&self) { - let mut counter = self.counter.lock().unwrap(); - if *counter <= 0 { + let mut remaining_to_wake = waiter_count.min(to_notify); + if remaining_to_wake == 0 { return; } - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), *counter) - .into(), - ); - *counter = 0; - drop(counter); + for _wake_tries in 0..NOTIFY_TRIES { + let result = blocking_scalar( + ticktimer_server(), + TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(), + ) + .expect("failure to send NotifyCondition command"); + + // Remove the list of waiters that were notified + remaining_to_wake -= result[0]; + + // Also remove the number of waiters that timed out. Clamp it to 0 in order to + // ensure we don't wait forever in case the waiter woke up between the time + // we counted the remaining waiters and now. + remaining_to_wake = + remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed)); + if remaining_to_wake == 0 { + return; + } + crate::thread::yield_now(); + } + } - result.expect("failure to send NotifyCondition command"); + pub fn notify_one(&self) { + self.notify_some(1) + } + + pub fn notify_all(&self) { + self.notify_some(self.counter.load(Ordering::Relaxed)) } fn index(&self) -> usize { - self as *const Condvar as usize + core::ptr::from_ref(self).addr() } - pub unsafe fn wait(&self, mutex: &Mutex) { - let mut counter = self.counter.lock().unwrap(); - *counter += 1; + /// Unlock the given Mutex and wait for the notification. Wait at most + /// `ms` milliseconds, or pass `0` to wait forever. + /// + /// Returns `true` if the condition was received, `false` if it timed out + fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed); unsafe { mutex.unlock() }; - drop(counter); + // Threading concern: There is a chance that the `notify` thread wakes up here before + // we have a chance to wait for the condition. This is fine because we've recorded + // the fact that we're waiting by incrementing the counter. let result = blocking_scalar( ticktimer_server(), - crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), 0).into(), + TicktimerScalar::WaitForCondition(self.index(), ms).into(), ); + let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0; + + // If we awoke due to a timeout, increment the `timed_out` counter so that the + // main loop of `notify` knows there's a timeout. + // + // This is done with the Mutex still unlocked, because the Mutex might still + // be locked by the `notify` process above. + if !awoken { + self.timed_out.fetch_add(1, Ordering::Relaxed); + } + unsafe { mutex.lock() }; + awoken + } - result.expect("Ticktimer: failure to send WaitForCondition command"); + pub unsafe fn wait(&self, mutex: &Mutex) { + // Wait for 0 ms, which is a special case to "wait forever" + self.wait_ms(mutex, 0); } pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let mut counter = self.counter.lock().unwrap(); - *counter += 1; - unsafe { mutex.unlock() }; - drop(counter); - let mut millis = dur.as_millis() as usize; + // Ensure we don't wait for 0 ms, which would cause us to wait forever if millis == 0 { millis = 1; } - - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), millis) - .into(), - ); - unsafe { mutex.lock() }; - - let result = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0; - - // If we awoke due to a timeout, decrement the wake count, as that would not have - // been done in the `notify()` call. - if !result { - *self.counter.lock().unwrap() -= 1; - } - result + self.wait_ms(mutex, millis) } } impl Drop for Condvar { fn drop(&mut self) { - scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::FreeCondition(self.index()).into(), - ) - .ok(); + let remaining_count = self.counter.load(Ordering::Relaxed); + let timed_out = self.timed_out.load(Ordering::Relaxed); + assert!( + remaining_count - timed_out == 0, + "counter was {} and timed_out was {} not 0", + remaining_count, + timed_out + ); + scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok(); } } |
