about summary refs log tree commit diff
path: root/library
diff options
context:
space:
mode:
Diffstat (limited to 'library')
-rw-r--r--library/std/src/sys/unix/futex.rs20
-rw-r--r--library/std/src/sys/unix/locks/futex_rwlock.rs313
-rw-r--r--library/std/src/sys/unix/locks/mod.rs4
3 files changed, 329 insertions, 8 deletions
diff --git a/library/std/src/sys/unix/futex.rs b/library/std/src/sys/unix/futex.rs
index c61d948fb60..b45d1c0149c 100644
--- a/library/std/src/sys/unix/futex.rs
+++ b/library/std/src/sys/unix/futex.rs
@@ -7,6 +7,11 @@
 use crate::sync::atomic::AtomicI32;
 use crate::time::Duration;
 
+/// Wait for a futex_wake operation to wake us.
+///
+/// Returns directly if the futex doesn't hold the expected value.
+///
+/// Returns false on timeout, and true in all other cases.
 #[cfg(any(target_os = "linux", target_os = "android"))]
 pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) -> bool {
     use super::time::Timespec;
@@ -68,18 +73,23 @@ pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
     }
 }
 
+/// Wake up one thread that's blocked on futex_wait on this futex.
+///
+/// Returns true if this actually woke up such a thread,
+/// or false if no thread was waiting on this futex.
 #[cfg(any(target_os = "linux", target_os = "android"))]
-pub fn futex_wake(futex: &AtomicI32) {
+pub fn futex_wake(futex: &AtomicI32) -> bool {
     unsafe {
         libc::syscall(
             libc::SYS_futex,
             futex as *const AtomicI32,
             libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
             1,
-        );
+        ) > 0
     }
 }
 
+/// Wake up all threads that are waiting on futex_wait on this futex.
 #[cfg(any(target_os = "linux", target_os = "android"))]
 pub fn futex_wake_all(futex: &AtomicI32) {
     unsafe {
@@ -93,12 +103,10 @@ pub fn futex_wake_all(futex: &AtomicI32) {
 }
 
 #[cfg(target_os = "emscripten")]
-pub fn futex_wake(futex: &AtomicI32) {
+pub fn futex_wake(futex: &AtomicI32) -> bool {
     extern "C" {
         fn emscripten_futex_wake(addr: *const AtomicI32, count: libc::c_int) -> libc::c_int;
     }
 
-    unsafe {
-        emscripten_futex_wake(futex as *const AtomicI32, 1);
-    }
+    unsafe { emscripten_futex_wake(futex as *const AtomicI32, 1) > 0 }
 }
diff --git a/library/std/src/sys/unix/locks/futex_rwlock.rs b/library/std/src/sys/unix/locks/futex_rwlock.rs
new file mode 100644
index 00000000000..aa16da97e4c
--- /dev/null
+++ b/library/std/src/sys/unix/locks/futex_rwlock.rs
@@ -0,0 +1,313 @@
+use crate::sync::atomic::{
+    AtomicI32,
+    Ordering::{Acquire, Relaxed, Release},
+};
+use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
+
+pub type MovableRwLock = RwLock;
+
+pub struct RwLock {
+    // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
+    // Bits 0..30:
+    //   0: Unlocked
+    //   1..=0x3FFF_FFFE: Locked by N readers
+    //   0x3FFF_FFFF: Write locked
+    // Bit 30: Readers are waiting on this futex.
+    // Bit 31: Writers are waiting on the writer_notify futex.
+    state: AtomicI32,
+    // The 'condition variable' to notify writers through.
+    // Incremented on every signal.
+    writer_notify: AtomicI32,
+}
+
+const READ_LOCKED: i32 = 1;
+const MASK: i32 = (1 << 30) - 1;
+const WRITE_LOCKED: i32 = MASK;
+const MAX_READERS: i32 = MASK - 1;
+const READERS_WAITING: i32 = 1 << 30;
+const WRITERS_WAITING: i32 = 1 << 31;
+
+fn is_unlocked(state: i32) -> bool {
+    state & MASK == 0
+}
+
+fn is_write_locked(state: i32) -> bool {
+    state & MASK == WRITE_LOCKED
+}
+
+fn has_readers_waiting(state: i32) -> bool {
+    state & READERS_WAITING != 0
+}
+
+fn has_writers_waiting(state: i32) -> bool {
+    state & WRITERS_WAITING != 0
+}
+
+fn is_read_lockable(state: i32) -> bool {
+    // This also returns false if the counter could overflow if we tried to read lock it.
+    //
+    // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
+    // and there's no writers waiting. The only situation when this happens is after unlocking,
+    // at which point the unlocking thread might be waking up writers, which have priority over readers.
+    // The unlocking thread will clear the readers waiting bit and wake up readers, if necssary.
+    state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
+}
+
+fn has_reached_max_readers(state: i32) -> bool {
+    state & MASK == MAX_READERS
+}
+
+impl RwLock {
+    #[inline]
+    pub const fn new() -> Self {
+        Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
+    }
+
+    #[inline]
+    pub unsafe fn destroy(&self) {}
+
+    #[inline]
+    pub unsafe fn try_read(&self) -> bool {
+        self.state
+            .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
+            .is_ok()
+    }
+
+    #[inline]
+    pub unsafe fn read(&self) {
+        let state = self.state.load(Relaxed);
+        if !is_read_lockable(state)
+            || self
+                .state
+                .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
+                .is_err()
+        {
+            self.read_contended();
+        }
+    }
+
+    #[inline]
+    pub unsafe fn read_unlock(&self) {
+        let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
+
+        // It's impossible for a reader to be waiting on a read-locked RwLock,
+        // except if there is also a writer waiting.
+        debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
+
+        // Wake up a writer if we were the last reader and there's a writer waiting.
+        if is_unlocked(state) && has_writers_waiting(state) {
+            self.wake_writer_or_readers(state);
+        }
+    }
+
+    #[cold]
+    fn read_contended(&self) {
+        let mut state = self.spin_read();
+
+        loop {
+            // If we can lock it, lock it.
+            if is_read_lockable(state) {
+                match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
+                {
+                    Ok(_) => return, // Locked!
+                    Err(s) => {
+                        state = s;
+                        continue;
+                    }
+                }
+            }
+
+            // Check for overflow.
+            if has_reached_max_readers(state) {
+                panic!("too many active read locks on RwLock");
+            }
+
+            // Make sure the readers waiting bit is set before we go to sleep.
+            if !has_readers_waiting(state) {
+                if let Err(s) =
+                    self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
+                {
+                    state = s;
+                    continue;
+                }
+            }
+
+            // Wait for the state to change.
+            futex_wait(&self.state, state | READERS_WAITING, None);
+
+            // Spin again after waking up.
+            state = self.spin_read();
+        }
+    }
+
+    #[inline]
+    pub unsafe fn try_write(&self) -> bool {
+        self.state
+            .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
+            .is_ok()
+    }
+
+    #[inline]
+    pub unsafe fn write(&self) {
+        if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
+            self.write_contended();
+        }
+    }
+
+    #[inline]
+    pub unsafe fn write_unlock(&self) {
+        let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
+
+        debug_assert!(is_unlocked(state));
+
+        if has_writers_waiting(state) || has_readers_waiting(state) {
+            self.wake_writer_or_readers(state);
+        }
+    }
+
+    #[cold]
+    fn write_contended(&self) {
+        let mut state = self.spin_write();
+
+        let mut other_writers_waiting = 0;
+
+        loop {
+            // If it's unlocked, we try to lock it.
+            if is_unlocked(state) {
+                match self.state.compare_exchange_weak(
+                    state,
+                    state | WRITE_LOCKED | other_writers_waiting,
+                    Acquire,
+                    Relaxed,
+                ) {
+                    Ok(_) => return, // Locked!
+                    Err(s) => {
+                        state = s;
+                        continue;
+                    }
+                }
+            }
+
+            // Set the waiting bit indicating that we're waiting on it.
+            if !has_writers_waiting(state) {
+                if let Err(s) =
+                    self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
+                {
+                    state = s;
+                    continue;
+                }
+            }
+
+            // Other writers might be waiting now too, so we should make sure
+            // we keep that bit on once we manage lock it.
+            other_writers_waiting = WRITERS_WAITING;
+
+            // Examine the notification counter before we check if `state` has changed,
+            // to make sure we don't miss any notifications.
+            let seq = self.writer_notify.load(Acquire);
+
+            // Don't go to sleep if the lock has become available,
+            // or if the writers waiting bit is no longer set.
+            let s = self.state.load(Relaxed);
+            if is_unlocked(state) || !has_writers_waiting(s) {
+                state = s;
+                continue;
+            }
+
+            // Wait for the state to change.
+            futex_wait(&self.writer_notify, seq, None);
+
+            // Spin again after waking up.
+            state = self.spin_write();
+        }
+    }
+
+    /// Wake up waiting threads after unlocking.
+    ///
+    /// If both are waiting, this will wake up only one writer, but will fall
+    /// back to waking up readers if there was no writer to wake up.
+    #[cold]
+    fn wake_writer_or_readers(&self, mut state: i32) {
+        assert!(is_unlocked(state));
+
+        // The readers waiting bit might be turned on at any point now,
+        // since readers will block when there's anything waiting.
+        // Writers will just lock the lock though, regardless of the waiting bits,
+        // so we don't have to worry about the writer waiting bit.
+        //
+        // If the lock gets locked in the meantime, we don't have to do
+        // anything, because then the thread that locked the lock will take
+        // care of waking up waiters when it unlocks.
+
+        // If only writers are waiting, wake one of them up.
+        if state == WRITERS_WAITING {
+            match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
+                Ok(_) => {
+                    self.wake_writer();
+                    return;
+                }
+                Err(s) => {
+                    // Maybe some readers are now waiting too. So, continue to the next `if`.
+                    state = s;
+                }
+            }
+        }
+
+        // If both writers and readers are waiting, leave the readers waiting
+        // and only wake up one writer.
+        if state == READERS_WAITING + WRITERS_WAITING {
+            if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
+                // The lock got locked. Not our problem anymore.
+                return;
+            }
+            if self.wake_writer() {
+                return;
+            }
+            // No writers were actually blocked on futex_wait, so we continue
+            // to wake up readers instead, since we can't be sure if we notified a writer.
+            state = READERS_WAITING;
+        }
+
+        // If readers are waiting, wake them all up.
+        if state == READERS_WAITING {
+            if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
+                futex_wake_all(&self.state);
+            }
+        }
+    }
+
+    /// This wakes one writer and returns true if we woke up a writer that was
+    /// blocked on futex_wait.
+    ///
+    /// If this returns false, it might still be the case that we notified a
+    /// writer that was about to go to sleep.
+    fn wake_writer(&self) -> bool {
+        self.writer_notify.fetch_add(1, Release);
+        futex_wake(&self.writer_notify)
+    }
+
+    /// Spin for a while, but stop directly at the given condition.
+    fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 {
+        let mut spin = 100; // Chosen by fair dice roll.
+        loop {
+            let state = self.state.load(Relaxed);
+            if f(state) || spin == 0 {
+                return state;
+            }
+            crate::hint::spin_loop();
+            spin -= 1;
+        }
+    }
+
+    fn spin_write(&self) -> i32 {
+        // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
+        self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
+    }
+
+    fn spin_read(&self) -> i32 {
+        // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
+        self.spin_until(|state| {
+            !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
+        })
+    }
+}
diff --git a/library/std/src/sys/unix/locks/mod.rs b/library/std/src/sys/unix/locks/mod.rs
index 2b8dd168068..85afc939d2e 100644
--- a/library/std/src/sys/unix/locks/mod.rs
+++ b/library/std/src/sys/unix/locks/mod.rs
@@ -4,13 +4,13 @@ cfg_if::cfg_if! {
         target_os = "android",
     ))] {
         mod futex;
+        mod futex_rwlock;
         #[allow(dead_code)]
         mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
         mod pthread_remutex; // FIXME: Implement this using a futex
-        mod pthread_rwlock; // FIXME: Implement this using a futex
         pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
         pub use pthread_remutex::ReentrantMutex;
-        pub use pthread_rwlock::{RwLock, MovableRwLock};
+        pub use futex_rwlock::{RwLock, MovableRwLock};
     } else {
         mod pthread_mutex;
         mod pthread_remutex;