about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--library/std/src/sys/pal/unix/locks/queue_rwlock.rs339
1 files changed, 195 insertions, 144 deletions
diff --git a/library/std/src/sys/pal/unix/locks/queue_rwlock.rs b/library/std/src/sys/pal/unix/locks/queue_rwlock.rs
index 315c59a02d5..7619adbf839 100644
--- a/library/std/src/sys/pal/unix/locks/queue_rwlock.rs
+++ b/library/std/src/sys/pal/unix/locks/queue_rwlock.rs
@@ -39,16 +39,16 @@
 //!
 //! ## State
 //!
-//! A single [`AtomicPtr`] is used as state variable. The lowest two bits are used
+//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used
 //! to indicate the meaning of the remaining bits:
 //!
-//! | `LOCKED`  | `QUEUED`  | Remaining    |                                                                                                                             |
-//! |:----------|:----------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
-//! | 0         | 0         | 0            | The lock is unlocked, no threads are waiting                                                                                |
-//! | 1         | 0         | 0            | The lock is write-locked, no threads waiting                                                                                |
-//! | 1         | 0         | n > 0        | The lock is read-locked with n readers                                                                                      |
-//! | 0         | 1         | `*mut Node`  | The lock is unlocked, but some threads are waiting. Only writers may lock the lock                                          |
-//! | 1         | 1         | `*mut Node`  | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
+//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining    |                                                                                                                             |
+//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
+//! | 0          | 0          | 0                | 0            | The lock is unlocked, no threads are waiting                                                                                |
+//! | 1          | 0          | 0                | 0            | The lock is write-locked, no threads waiting                                                                                |
+//! | 1          | 0          | 0                | n > 0        | The lock is read-locked with n readers                                                                                      |
+//! | 0          | 1          | *                | `*mut Node`  | The lock is unlocked, but some threads are waiting. Only writers may lock the lock                                          |
+//! | 1          | 1          | *                | `*mut Node`  | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
 //!
 //! ## Waiter queue
 //!
@@ -84,28 +84,31 @@
 //! ```
 //!
 //! Invariants:
-//! 1. The `next` field always points to a valid node, except in the tail node.
-//! 2. The `next` field of the tail node must be null while the queue is unlocked.
-//! 3. At least one node must contain a non-null, current `tail` field.
-//! 4. The first non-null `tail` field must be valid and current.
-//! 5. All nodes following this node must have a correct, non-null `prev` field.
+//! 1. At least one node must contain a non-null, current `tail` field.
+//! 2. The first non-null `tail` field must be valid and current.
+//! 3. All nodes preceding this node must have a correct, non-null `next` field.
+//! 4. All nodes following this node must have a correct, non-null `prev` field.
 //!
-//! While adding a new node to the queue may be done by any thread at any time,
-//! removing nodes may only be done by a single thread. Instead of using a
-//! separate lock bit for the queue like usync does, this implementation
-//! only allows the (last) lock owner to modify the queue.
+//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads
+//! try to set both after enqueuing themselves to eagerly add backlinks to the
+//! queue and after unlocking the lock to wake the next waiter(s). This is done
+//! atomically at the same time as the enqueuing/unlocking operation. The thread
+//! releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up
+//! waiters as appropriate. This guarantees forward-progress even if the unlocking
+//! thread could not acquire the queue lock.
 //!
 //! ## Memory orderings
 //!
 //! To properly synchronize changes to the data protected by the lock, the lock
 //! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively.
-//! To propagate the initialization of nodes, changes to the list are also propagated
-//! using these orderings.
+//! To propagate the initialization of nodes, changes to the queue lock are also
+//! performed using these orderings.
 
 #![forbid(unsafe_op_in_unsafe_fn)]
 
 use crate::cell::OnceCell;
 use crate::hint::spin_loop;
+use crate::mem;
 use crate::ptr::{self, invalid_mut, null_mut, NonNull};
 use crate::sync::atomic::{
     AtomicBool, AtomicPtr,
@@ -114,7 +117,10 @@ use crate::sync::atomic::{
 use crate::sys_common::thread_info;
 use crate::thread::Thread;
 
-const SPIN_COUNT: usize = 6;
+// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the
+// locking operation will be retried.
+// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
+const SPIN_COUNT: usize = 7;
 
 type State = *mut ();
 type AtomicState = AtomicPtr<()>;
@@ -122,23 +128,24 @@ type AtomicState = AtomicPtr<()>;
 const UNLOCKED: State = invalid_mut(0);
 const LOCKED: usize = 1;
 const QUEUED: usize = 2;
-const SINGLE: usize = 4;
-const MASK: usize = !(LOCKED | QUEUED);
+const QUEUE_LOCKED: usize = 4;
+const SINGLE: usize = 8;
+const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
 
 /// Returns a closure that changes the state to the lock state corresponding to
-/// the lock mode indicated in `read`.
+/// the lock mode indicated in `write`.
 #[inline]
-fn lock(read: bool) -> impl Fn(State) -> Option<State> {
+fn lock(write: bool) -> impl Fn(State) -> Option<State> {
     move |state| {
-        if read {
+        if write {
+            let state = state.wrapping_byte_add(LOCKED);
+            if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
+        } else {
             if state.addr() & QUEUED == 0 && state.addr() != LOCKED {
                 Some(invalid_mut(state.addr().checked_add(SINGLE)? | LOCKED))
             } else {
                 None
             }
-        } else {
-            let state = state.wrapping_byte_add(LOCKED);
-            if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
         }
     }
 }
@@ -169,24 +176,24 @@ impl AtomicLink {
     }
 }
 
-#[repr(align(4))]
+#[repr(align(8))]
 struct Node {
     next: AtomicLink,
     prev: AtomicLink,
     tail: AtomicLink,
-    read: bool,
+    write: bool,
     thread: OnceCell<Thread>,
     completed: AtomicBool,
 }
 
 impl Node {
     /// Create a new queue node.
-    fn new(read: bool) -> Node {
+    fn new(write: bool) -> Node {
         Node {
             next: AtomicLink::new(None),
             prev: AtomicLink::new(None),
             tail: AtomicLink::new(None),
-            read,
+            write,
             thread: OnceCell::new(),
             completed: AtomicBool::new(false),
         }
@@ -201,9 +208,9 @@ impl Node {
     }
 
     /// Assuming the node contains a reader lock count, decrement that count.
-    /// Returns `true` if there are other lock owners.
+    /// Returns `true` if this thread was the last lock owner.
     fn decrement_count(&self) -> bool {
-        self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() > SINGLE
+        self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0
     }
 
     /// Prepare this node for waiting.
@@ -239,6 +246,14 @@ impl Node {
     }
 }
 
+struct PanicGuard;
+
+impl Drop for PanicGuard {
+    fn drop(&mut self) {
+        rtabort!("tried to drop node in intrusive list.");
+    }
+}
+
 /// Find the tail of the queue beginning with `head`, caching the result in `head`.
 ///
 /// May be called from multiple threads at the same time, while the queue is not
@@ -257,9 +272,8 @@ unsafe fn find_tail(head: NonNull<Node>) -> NonNull<Node> {
         match c.tail.get() {
             Some(tail) => break tail,
             // SAFETY:
-            // Only the `next` field of the tail is null (invariants 1. and 2.)
-            // Since at least one element in the queue has a non-null tail (invariant 3.),
-            // this code will never be run for `current == tail`.
+            // All `next` fields before the first node with a `set` tail are
+            // non-null and valid (invariant 3).
             None => unsafe {
                 let next = c.next.get().unwrap_unchecked();
                 next.as_ref().prev.set(Some(current));
@@ -286,13 +300,13 @@ impl RwLock {
 
     #[inline]
     pub fn try_read(&self) -> bool {
-        self.state.fetch_update(Acquire, Relaxed, lock(true)).is_ok()
+        self.state.fetch_update(Acquire, Relaxed, lock(false)).is_ok()
     }
 
     #[inline]
     pub fn read(&self) {
         if !self.try_read() {
-            self.lock_contended(true)
+            self.lock_contended(false)
         }
     }
 
@@ -300,22 +314,22 @@ impl RwLock {
     pub fn try_write(&self) -> bool {
         // This is lowered to a single atomic instruction on most modern processors
         // (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
-        // is more efficient than `fetch_update(lock(false))`, which can spuriously
+        // is more efficient than `fetch_update(lock(true))`, which can spuriously
         // fail if a new node is appended to the queue.
-        self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED != LOCKED
+        self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0
     }
 
     #[inline]
     pub fn write(&self) {
         if !self.try_write() {
-            self.lock_contended(false)
+            self.lock_contended(true)
         }
     }
 
     #[cold]
-    fn lock_contended(&self, read: bool) {
-        let update = lock(read);
-        let mut node = Node::new(read);
+    fn lock_contended(&self, write: bool) {
+        let update = lock(write);
+        let mut node = Node::new(write);
         let mut state = self.state.load(Relaxed);
         let mut count = 0;
         loop {
@@ -326,8 +340,9 @@ impl RwLock {
                     Err(new) => state = new,
                 }
             } else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
-                // If the lock is not available but no threads are queued, spin
-                // for a while.
+                // If the lock is not available and no threads are queued, spin
+                // for a while, using exponential backoff to decrease cache
+                // contention.
                 for _ in 0..(1 << count) {
                     spin_loop();
                 }
@@ -338,18 +353,26 @@ impl RwLock {
                 node.prepare();
                 node.set_state(state);
                 node.prev = AtomicLink::new(None);
-                // If this is the first node in the queue, set the tail field to
-                // the node itself to ensure there is a current `tail` field in
-                // the queue (invariants 3. and 4.). This needs to use `set` to
-                // avoid invalidating the new pointer.
-                node.tail.set((state.addr() & QUEUED == 0).then_some(NonNull::from(&node)));
-
-                let next = ptr::from_ref(&node)
+                let mut next = ptr::from_ref(&node)
                     .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
                     as State;
+
+                if state.addr() & QUEUED == 0 {
+                    // If this is the first node in the queue, set the tail field to
+                    // the node itself to ensure there is a current `tail` field in
+                    // the queue (invariants 1 and 2). This needs to use `set` to
+                    // avoid invalidating the new pointer.
+                    node.tail.set(Some(NonNull::from(&node)));
+                } else {
+                    // Otherwise, the tail of the queue is not known.
+                    node.tail.set(None);
+                    // Try locking the queue to fully link it.
+                    next = next.map_addr(|addr| addr | QUEUE_LOCKED);
+                }
+
                 // Use release ordering to propagate our changes to the waking
                 // thread.
-                if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Relaxed) {
+                if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
                     // The state has changed, just try again.
                     state = new;
                     continue;
@@ -357,13 +380,27 @@ impl RwLock {
 
                 // The node is registered, so the structure must not be
                 // mutably accessed or destroyed while other threads may
-                // be accessing it. Just wait until it is completed.
+                // be accessing it. Guard against unwinds using a panic
+                // guard that aborts when dropped.
+                let guard = PanicGuard;
+
+                // If the current thread locked the queue, unlock it again,
+                // linking it in the process.
+                if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
+                    unsafe {
+                        self.unlock_queue(next);
+                    }
+                }
 
+                // Wait until the node is removed from the queue.
                 // SAFETY: the node was created by the current thread.
                 unsafe {
                     node.wait();
                 }
 
+                // The node was removed from the queue, disarm the guard.
+                mem::forget(guard);
+
                 // Reload the state and try again.
                 state = self.state.load(Relaxed);
                 count = 0;
@@ -382,114 +419,128 @@ impl RwLock {
             }
         }) {
             Ok(_) => {}
-            Err(state) => unsafe { self.unlock_contended(state, true) },
+            // There are waiters queued and the lock count was moved to the
+            // tail of the queue.
+            Err(state) => unsafe { self.read_unlock_contended(state) },
+        }
+    }
+
+    #[cold]
+    unsafe fn read_unlock_contended(&self, state: State) {
+        // The state was observed with acquire ordering above, so the current
+        // thread will observe all node initializations.
+
+        let tail = unsafe { find_tail(to_node(state)) };
+        let was_last = unsafe { tail.as_ref().decrement_count() };
+        if was_last {
+            // SAFETY:
+            // Other threads cannot read-lock while threads are queued. Also,
+            // the `LOCKED` bit is still set, so there are no writers. Therefore,
+            // the current thread exclusively owns the lock.
+            unsafe { self.unlock_contended(state) }
         }
     }
 
     #[inline]
     pub unsafe fn write_unlock(&self) {
-        match self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Acquire) {
-            Ok(_) => {}
+        if let Err(state) =
+            self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Relaxed)
+        {
+            // SAFETY:
             // Since other threads cannot acquire the lock, the state can only
             // have changed because there are threads queued on the lock.
-            Err(state) => unsafe { self.unlock_contended(state, false) },
+            unsafe { self.unlock_contended(state) }
         }
     }
 
     /// # Safety
-    /// The lock must be locked by the current thread and threads must be queued on it.
+    /// * The lock must be exclusively owned by this thread.
+    /// * There must be threads queued on the lock.
     #[cold]
-    unsafe fn unlock_contended(&self, mut state: State, read: bool) {
-        // Find the last node in the linked queue.
-        let tail = unsafe { find_tail(to_node(state)) };
-        let not_last = unsafe { read && tail.as_ref().decrement_count() };
-        if not_last {
-            // There are other lock owners, leave waking up the next waiters to them.
-            return;
+    unsafe fn unlock_contended(&self, mut state: State) {
+        loop {
+            // Atomically release the lock and try to acquire the queue lock.
+            let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
+            match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
+                // The queue lock was acquired. Release it, waking up the next
+                // waiter in the process.
+                Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
+                    return self.unlock_queue(next);
+                },
+                // Another thread already holds the queue lock, leave waking up
+                // waiters to it.
+                Ok(_) => return,
+                Err(new) => state = new,
+            }
         }
+    }
 
-        // At this point, the `next` field on `tail` will always be null
-        // (invariant 2).
+    /// # Safety
+    /// The queue lock must be held by the current thread.
+    unsafe fn unlock_queue(&self, mut state: State) {
+        debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
 
-        let next_read = unsafe { tail.as_ref().read };
-        if next_read {
-            // The next waiter is a reader. Just wake all threads.
-            //
-            // SAFETY:
-            // `current` is the head of a valid queue, which no thread except the
-            // the current can observe.
-            unsafe {
-                let mut current = to_node(self.state.swap(UNLOCKED, AcqRel));
-                loop {
-                    let next = current.as_ref().next.get();
-                    Node::complete(current);
-                    match next {
-                        Some(next) => current = next,
-                        None => break,
-                    }
-                }
-            }
-        } else {
-            // The next waiter is a writer. Remove it from the queue and wake it.
-            let prev = match unsafe { tail.as_ref().prev.get() } {
-                // If the lock was read-locked, multiple threads have invoked
-                // `find_tail` above. Therefore, it is possible that one of
-                // them observed a newer state than this thread did, meaning
-                // there is a set `tail` field in a node before `state`. To
-                // make sure that the queue is valid after the link update
-                // below, reload the state and relink the queue.
-                //
-                // SAFETY: since the current thread holds the lock, the queue
-                // was not removed from since the last time and therefore is
-                // still valid.
-                Some(prev) if read => unsafe {
-                    let new = self.state.load(Acquire);
-                    if new != state {
+        loop {
+            // Find the last node in the linked list.
+            let tail = unsafe { find_tail(to_node(state)) };
+
+            if state.addr() & LOCKED == LOCKED {
+                // Another thread has locked the lock. Leave waking up waiters
+                // to them by releasing the queue lock.
+                match self.state.compare_exchange_weak(
+                    state,
+                    state.mask(!QUEUE_LOCKED),
+                    Release,
+                    Acquire,
+                ) {
+                    Ok(_) => return,
+                    Err(new) => {
                         state = new;
-                        find_tail(to_node(state));
+                        continue;
                     }
-                    Some(prev)
-                },
-                Some(prev) => Some(prev),
-                // The current node is the only one in the queue that we observed.
-                // Try setting the state to UNLOCKED.
-                None => self.state.compare_exchange(state, UNLOCKED, Release, Acquire).err().map(
-                    |new| {
-                        state = new;
-                        // Since the state was locked, it can only have changed
-                        // because a new node was added since `state` was loaded.
-                        // Relink the queue and get a pointer to the node before
-                        // `tail`.
-                        unsafe {
-                            find_tail(to_node(state));
-                            tail.as_ref().prev.get().unwrap()
-                        }
-                    },
-                ),
-            };
-
-            if let Some(prev) = prev {
-                unsafe {
-                    // The `next` field of the tail field must be zero when
-                    // releasing the lock (queue invariant 2).
-                    prev.as_ref().next.set(None);
-                    // There are no set `tail` links before the node pointed to by
-                    // `state`, so the first non-null tail field will be current
-                    // (queue invariant 4).
-                    to_node(state).as_ref().tail.set(Some(prev));
                 }
-
-                // Release the lock. Doing this by subtraction is more efficient
-                // on modern processors since it is a single instruction instead
-                // of an update loop, which will fail if new threads are added
-                // to the queue.
-                self.state.fetch_byte_sub(LOCKED, Release);
             }
 
-            // The tail was split off and the lock released. Mark the node as
-            // completed.
-            unsafe {
-                Node::complete(tail);
+            let is_writer = unsafe { tail.as_ref().write };
+            if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
+                // `tail` is a writer and there is a node before `tail`.
+                // Split off `tail`.
+
+                // There are no set `tail` links before the node pointed to by
+                // `state`, so the first non-null tail field will be current
+                // (invariant 2). Invariant 4 is fullfilled since `find_tail`
+                // was called on this node, which ensures all backlinks are set.
+                unsafe { to_node(state).as_ref().tail.set(Some(prev)); }
+
+                // Release the queue lock. Doing this by subtraction is more
+                // efficient on modern processors since it is a single instruction
+                // instead of an update loop, which will fail if new threads are
+                // added to the list.
+                self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
+
+                // The tail was split off and the lock released. Mark the node as
+                // completed.
+                unsafe { return Node::complete(tail); }
+            } else {
+                // The next waiter is a reader or the queue only consists of one
+                // waiter. Just wake all threads.
+
+                // The lock cannot be locked (checked above), so mark it as
+                // unlocked to reset the queue.
+                if let Err(new) = self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) {
+                    state = new;
+                    continue;
+                }
+
+                let mut current = tail;
+                loop {
+                    let prev = unsafe { current.as_ref().prev.get() };
+                    unsafe { Node::complete(current); }
+                    match prev {
+                        Some(prev) => current = prev,
+                        None => return,
+                    }
+                }
             }
         }
     }