about summary refs log tree commit diff
path: root/library/std/src/sys/sync/condvar/xous.rs
blob: 21a1587214a119be6a39c0d5bb40d4bbf18a8b3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use core::sync::atomic::{Atomic, AtomicUsize, Ordering};

use crate::os::xous::ffi::{blocking_scalar, scalar};
use crate::os::xous::services::{TicktimerScalar, ticktimer_server};
use crate::sys::sync::Mutex;
use crate::time::Duration;

// The implementation is inspired by Andrew D. Birrell's paper
// "Implementing Condition Variables with Semaphores"

const NOTIFY_TRIES: usize = 3;

pub struct Condvar {
    counter: Atomic<usize>,
    timed_out: Atomic<usize>,
}

unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}

impl Condvar {
    #[inline]
    pub const fn new() -> Condvar {
        Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) }
    }

    fn notify_some(&self, to_notify: usize) {
        // Assumption: The Mutex protecting this condvar is locked throughout the
        // entirety of this call, preventing calls to `wait` and `wait_timeout`.

        // Logic check: Ensure that there aren't any missing waiters. Remove any that
        // timed-out, ensuring the counter doesn't underflow.
        assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed));
        self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed);

        // Figure out how many threads to notify. Note that it is impossible for `counter`
        // to increase during this operation because Mutex is locked. However, it is
        // possible for `counter` to decrease due to a condvar timing out, in which
        // case the corresponding `timed_out` will increase accordingly.
        let Ok(waiter_count) =
            self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| {
                if counter == 0 {
                    return None;
                } else {
                    Some(counter - counter.min(to_notify))
                }
            })
        else {
            // No threads are waiting on this condvar
            return;
        };

        let mut remaining_to_wake = waiter_count.min(to_notify);
        if remaining_to_wake == 0 {
            return;
        }
        for _wake_tries in 0..NOTIFY_TRIES {
            let result = blocking_scalar(
                ticktimer_server(),
                TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(),
            )
            .expect("failure to send NotifyCondition command");

            // Remove the list of waiters that were notified
            remaining_to_wake -= result[0];

            // Also remove the number of waiters that timed out. Clamp it to 0 in order to
            // ensure we don't wait forever in case the waiter woke up between the time
            // we counted the remaining waiters and now.
            remaining_to_wake =
                remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed));
            if remaining_to_wake == 0 {
                return;
            }
            crate::thread::yield_now();
        }
    }

    pub fn notify_one(&self) {
        self.notify_some(1)
    }

    pub fn notify_all(&self) {
        self.notify_some(self.counter.load(Ordering::Relaxed))
    }

    fn index(&self) -> usize {
        core::ptr::from_ref(self).addr()
    }

    /// Unlock the given Mutex and wait for the notification. Wait at most
    /// `ms` milliseconds, or pass `0` to wait forever.
    ///
    /// Returns `true` if the condition was received, `false` if it timed out
    fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool {
        self.counter.fetch_add(1, Ordering::Relaxed);
        unsafe { mutex.unlock() };

        // Threading concern: There is a chance that the `notify` thread wakes up here before
        // we have a chance to wait for the condition. This is fine because we've recorded
        // the fact that we're waiting by incrementing the counter.
        let result = blocking_scalar(
            ticktimer_server(),
            TicktimerScalar::WaitForCondition(self.index(), ms).into(),
        );
        let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;

        // If we awoke due to a timeout, increment the `timed_out` counter so that the
        // main loop of `notify` knows there's a timeout.
        //
        // This is done with the Mutex still unlocked, because the Mutex might still
        // be locked by the `notify` process above.
        if !awoken {
            self.timed_out.fetch_add(1, Ordering::Relaxed);
        }

        unsafe { mutex.lock() };
        awoken
    }

    pub unsafe fn wait(&self, mutex: &Mutex) {
        // Wait for 0 ms, which is a special case to "wait forever"
        self.wait_ms(mutex, 0);
    }

    pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
        let mut millis = dur.as_millis() as usize;
        // Ensure we don't wait for 0 ms, which would cause us to wait forever
        if millis == 0 {
            millis = 1;
        }
        self.wait_ms(mutex, millis)
    }
}

impl Drop for Condvar {
    fn drop(&mut self) {
        let remaining_count = self.counter.load(Ordering::Relaxed);
        let timed_out = self.timed_out.load(Ordering::Relaxed);
        assert!(
            remaining_count - timed_out == 0,
            "counter was {} and timed_out was {} not 0",
            remaining_count,
            timed_out
        );
        scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok();
    }
}