about summary refs log tree commit diff
path: root/library/std/src/sys/sync
diff options
context:
space:
mode:
authorConnor Tsui <connor.tsui20@gmail.com>2024-10-25 15:01:57 -0400
committerConnor Tsui <connor.tsui20@gmail.com>2024-11-16 12:31:13 -0500
commit26b5a1485e60a1ea7fd7e638fd59ec6b25fbc3d0 (patch)
tree9ddadc941e1a4bce2616ca0c57a37ec0d4de1a40 /library/std/src/sys/sync
parent31e35c2131f0b71a7d2cdc3b515d1a22f5f3b61d (diff)
downloadrust-26b5a1485e60a1ea7fd7e638fd59ec6b25fbc3d0.tar.gz
rust-26b5a1485e60a1ea7fd7e638fd59ec6b25fbc3d0.zip
add `downgrade` to `queue` implementation
This commit adds the `downgrade` method onto the inner `RwLock` queue
implementation.

There are also a few other style patches included in this commit.

Co-authored-by: Jonas Böttiger <jonasboettiger@icloud.com>
Diffstat (limited to 'library/std/src/sys/sync')
-rw-r--r--library/std/src/sys/sync/rwlock/queue.rs276
1 files changed, 206 insertions, 70 deletions
diff --git a/library/std/src/sys/sync/rwlock/queue.rs b/library/std/src/sys/sync/rwlock/queue.rs
index 5879d1f8415..77a5ee23309 100644
--- a/library/std/src/sys/sync/rwlock/queue.rs
+++ b/library/std/src/sys/sync/rwlock/queue.rs
@@ -40,16 +40,16 @@
 //!
 //! ## State
 //!
-//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the
+//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the
 //! meaning of the remaining bits:
 //!
-//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining    |                                                                                                                             |
-//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
-//! | 0          | 0          | 0                | 0            | The lock is unlocked, no threads are waiting                                                                                |
-//! | 1          | 0          | 0                | 0            | The lock is write-locked, no threads waiting                                                                                |
-//! | 1          | 0          | 0                | n > 0        | The lock is read-locked with n readers                                                                                      |
-//! | 0          | 1          | *                | `*mut Node`  | The lock is unlocked, but some threads are waiting. Only writers may lock the lock                                          |
-//! | 1          | 1          | *                | `*mut Node`  | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
+//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining    |                                                                                                                             |
+//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
+//! | 0          | 0          | 0                | 0              | 0            | The lock is unlocked, no threads are waiting                                                                                |
+//! | 1          | 0          | 0                | 0              | 0            | The lock is write-locked, no threads waiting                                                                                |
+//! | 1          | 0          | 0                | 0              | n > 0        | The lock is read-locked with n readers                                                                                      |
+//! | 0          | 1          | *                | 0              | `*mut Node`  | The lock is unlocked, but some threads are waiting. Only writers may lock the lock                                          |
+//! | 1          | 1          | *                | *              | `*mut Node`  | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
 //!
 //! ## Waiter Queue
 //!
@@ -100,9 +100,9 @@
 //! wake up the next waiter(s).
 //!
 //! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The
-//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as
-//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the
-//! queue lock.
+//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether
+//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This
+//! guarantees forward progress even if the unlocking thread could not acquire the queue lock.
 //!
 //! ## Memory Orderings
 //!
@@ -129,8 +129,10 @@ const UNLOCKED: State = without_provenance_mut(0);
 const LOCKED: usize = 1 << 0;
 const QUEUED: usize = 1 << 1;
 const QUEUE_LOCKED: usize = 1 << 2;
-const SINGLE: usize = 1 << 3;
-const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
+const DOWNGRADED: usize = 1 << 3;
+const SINGLE: usize = 1 << 4;
+const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED;
+const NODE_MASK: usize = !STATE;
 
 /// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation
 /// will be retried.
@@ -141,8 +143,7 @@ const SPIN_COUNT: usize = 7;
 /// Marks the state as write-locked, if possible.
 #[inline]
 fn write_lock(state: State) -> Option<State> {
-    let state = state.wrapping_byte_add(LOCKED);
-    if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
+    if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None }
 }
 
 /// Marks the state as read-locked, if possible.
@@ -169,7 +170,11 @@ unsafe fn to_node(state: State) -> NonNull<Node> {
 /// The representation of a thread waiting on the lock queue.
 ///
 /// We initialize these `Node`s on thread execution stacks to avoid allocation.
-#[repr(align(8))]
+///
+/// Note that we need an alignment of 16 to ensure that the last 4 bits of any
+/// pointers to `Node`s are always zeroed (for the bit flags described in the
+/// module-level documentation).
+#[repr(align(16))]
 struct Node {
     next: AtomicLink,
     prev: AtomicLink,
@@ -255,7 +260,7 @@ impl Node {
 /// # Safety
 ///
 /// * `head` must point to a node in a valid queue.
-/// * `head` must be in front of the head of the queue at the time of the last removal.
+/// * `head` must be in front of the previous head node that was used to perform the last removal.
 /// * The part of the queue starting with `head` must not be modified during this call.
 unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
     let mut current = head;
@@ -282,6 +287,28 @@ unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
     }
 }
 
+/// [`complete`](Node::complete)s all threads in the queue ending with `tail`.
+///
+/// # Safety
+///
+/// * `tail` must be a valid tail of a fully linked queue.
+/// * The current thread must have exclusive access to that queue.
+unsafe fn complete_all(tail: NonNull<Node>) {
+    let mut current = tail;
+
+    // Traverse backwards through the queue (FIFO) and `complete` all of the nodes.
+    loop {
+        let prev = unsafe { current.as_ref().prev.get() };
+        unsafe {
+            Node::complete(current);
+        }
+        match prev {
+            Some(prev) => current = prev,
+            None => return,
+        }
+    }
+}
+
 /// A type to guard against the unwinds of stacks that nodes are located on due to panics.
 struct PanicGuard;
 
@@ -332,10 +359,11 @@ impl RwLock {
 
     #[cold]
     fn lock_contended(&self, write: bool) {
-        let update_fn = if write { write_lock } else { read_lock };
         let mut node = Node::new(write);
         let mut state = self.state.load(Relaxed);
         let mut count = 0;
+        let update_fn = if write { write_lock } else { read_lock };
+
         loop {
             // Optimistically update the state.
             if let Some(next) = update_fn(state) {
@@ -372,6 +400,7 @@ impl RwLock {
                 .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
                 as State;
 
+            let mut is_queue_locked = false;
             if state.addr() & QUEUED == 0 {
                 // If this is the first node in the queue, set the `tail` field to the node itself
                 // to ensure there is a valid `tail` field in the queue (Invariants 1 & 2).
@@ -383,6 +412,9 @@ impl RwLock {
 
                 // Try locking the queue to eagerly add backlinks.
                 next = next.map_addr(|addr| addr | QUEUE_LOCKED);
+
+                // Track if we changed the `QUEUE_LOCKED` bit from off to on.
+                is_queue_locked = state.addr() & QUEUE_LOCKED == 0;
             }
 
             // Register the node, using release ordering to propagate our changes to the waking
@@ -398,8 +430,9 @@ impl RwLock {
             // Guard against unwinds using a `PanicGuard` that aborts when dropped.
             let guard = PanicGuard;
 
-            // If the current thread locked the queue, unlock it to eagerly add backlinks.
-            if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
+            // If the current thread locked the queue, unlock it to eagerly adding backlinks.
+            if is_queue_locked {
+                // SAFETY: This thread set the `QUEUE_LOCKED` bit above.
                 unsafe {
                     self.unlock_queue(next);
                 }
@@ -427,6 +460,12 @@ impl RwLock {
                 // If there are no threads queued, simply decrement the reader count.
                 let count = state.addr() - (SINGLE | LOCKED);
                 Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
+            } else if state.addr() & DOWNGRADED != 0 {
+                // This thread used to have exclusive access, but requested a downgrade. This has
+                // not been completed yet, so we still have exclusive access.
+                // Retract the downgrade request and unlock, but leave waking up new threads to the
+                // thread that already holds the queue lock.
+                Some(state.mask(!(DOWNGRADED | LOCKED)))
             } else {
                 None
             }
@@ -476,40 +515,127 @@ impl RwLock {
     ///
     /// * The lock must be exclusively owned by this thread.
     /// * There must be threads queued on the lock.
+    /// * There cannot be a `downgrade` in progress.
     #[cold]
-    unsafe fn unlock_contended(&self, mut state: State) {
+    unsafe fn unlock_contended(&self, state: State) {
+        debug_assert!(state.addr() & STATE == (QUEUED | LOCKED));
+
+        let mut current = state;
+
+        // We want to atomically release the lock and try to acquire the queue lock.
         loop {
+            // First check if the queue lock is already held.
+            if current.addr() & QUEUE_LOCKED != 0 {
+                // Another thread holds the queue lock, so let them wake up waiters for us.
+                let next = current.mask(!LOCKED);
+                match self.state.compare_exchange_weak(current, next, Release, Relaxed) {
+                    Ok(_) => return,
+                    Err(new) => {
+                        current = new;
+                        continue;
+                    }
+                }
+            }
+
             // Atomically release the lock and try to acquire the queue lock.
-            let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
-            match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
-                // The queue lock was acquired. Release it, waking up the next
-                // waiter in the process.
-                Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
-                    return self.unlock_queue(next);
-                },
-                // Another thread already holds the queue lock, leave waking up
-                // waiters to it.
-                Ok(_) => return,
-                Err(new) => state = new,
+            let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED);
+            match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) {
+                Ok(_) => {
+                    // Now that we have the queue lock, we can wake up the next waiter.
+                    // SAFETY: This thread is exclusively owned by this thread.
+                    unsafe { self.unlock_queue(next) };
+                    return;
+                }
+                Err(new) => current = new,
+            }
+        }
+    }
+
+    /// # Safety
+    ///
+    /// * The lock must be write-locked by this thread.
+    #[inline]
+    pub unsafe fn downgrade(&self) {
+        // Optimistically change the state from write-locked with a single writer and no waiters to
+        // read-locked with a single reader and no waiters.
+        if let Err(state) = self.state.compare_exchange(
+            without_provenance_mut(LOCKED),
+            without_provenance_mut(SINGLE | LOCKED),
+            Release,
+            Relaxed,
+        ) {
+            // SAFETY: The only way the state can have changed is if there are threads queued.
+            // Wake all of them up.
+            unsafe { self.downgrade_slow(state) }
+        }
+    }
+
+    /// Downgrades the lock from write-locked to read-locked in the case that there are threads
+    /// waiting on the wait queue.
+    ///
+    /// This function will either wake up all of the waiters on the wait queue or designate the
+    /// current holder of the queue lock to wake up all of the waiters instead. Once the waiters
+    /// wake up, they will continue in the execution loop of `lock_contended`.
+    ///
+    /// # Safety
+    ///
+    /// * The lock must be write-locked by this thread.
+    /// * There must be threads queued on the lock.
+    #[cold]
+    unsafe fn downgrade_slow(&self, mut state: State) {
+        debug_assert!(state.addr() & (DOWNGRADED | QUEUED | LOCKED) == (QUEUED | LOCKED));
+
+        // Attempt to wake up all waiters by taking ownership of the entire waiter queue.
+        loop {
+            if state.addr() & QUEUE_LOCKED != 0 {
+                // Another thread already holds the queue lock. Tell it to wake up all waiters.
+                // If the other thread succeeds in waking up waiters before we release our lock, the
+                // effect will be just the same as if we had changed the state below.
+                // Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread
+                // calls `read_unlock` later (because it holds a read lock and must unlock
+                // eventually), it will realize that the lock is still exclusively locked and act
+                // accordingly.
+                let next = state.map_addr(|addr| addr | DOWNGRADED);
+                match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
+                    Ok(_) => return,
+                    Err(new) => state = new,
+                }
+            } else {
+                // Grab the entire queue by swapping the `state` with a single reader.
+                let next = ptr::without_provenance_mut(SINGLE | LOCKED);
+                if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
+                    state = new;
+                    continue;
+                }
+
+                // SAFETY: We have full ownership of this queue now, so nobody else can modify it.
+                let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
+
+                // Wake up all waiters.
+                // SAFETY: `tail` was just computed, meaning the whole queue is linked.
+                unsafe { complete_all(tail) };
+
+                return;
             }
         }
     }
 
-    /// Unlocks the queue. If the lock is unlocked, wakes up the next eligible
-    /// thread(s).
+    /// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the
+    /// next eligible thread(s) if the lock is unlocked.
     ///
     /// # Safety
     ///
-    /// The queue lock must be held by the current thread.
+    /// * The queue lock must be held by the current thread.
+    /// * There must be threads queued on the lock.
     unsafe fn unlock_queue(&self, mut state: State) {
         debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
 
         loop {
             let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
 
-            if state.addr() & LOCKED == LOCKED {
-                // Another thread has locked the lock. Leave waking up waiters
-                // to them by releasing the queue lock.
+            if state.addr() & (DOWNGRADED | LOCKED) == LOCKED {
+                // Another thread has locked the lock and no downgrade was requested.
+                // Leave waking up waiters to them by releasing the queue lock.
                 match self.state.compare_exchange_weak(
                     state,
                     state.mask(!QUEUE_LOCKED),
@@ -524,53 +650,63 @@ impl RwLock {
                 }
             }
 
+            // Since we hold the queue lock and downgrades cannot be requested if the lock is
+            // already read-locked, we have exclusive control over the queue here and can make
+            // modifications.
+
+            let downgrade = state.addr() & DOWNGRADED != 0;
             let is_writer = unsafe { tail.as_ref().write };
-            if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
-                // `tail` is a writer and there is a node before `tail`.
-                // Split off `tail`.
+            if !downgrade
+                && is_writer
+                && let Some(prev) = unsafe { tail.as_ref().prev.get() }
+            {
+                // If we are not downgrading and the next thread is a writer, only wake up that
+                // writing thread.
 
-                // There are no set `tail` links before the node pointed to by
-                // `state`, so the first non-null tail field will be current
-                // (invariant 2). Invariant 4 is fullfilled since `find_tail`
-                // was called on this node, which ensures all backlinks are set.
+                // Split off `tail`.
+                // There are no set `tail` links before the node pointed to by `state`, so the first
+                // non-null tail field will be current (Invariant 2).
+                // We also fulfill Invariant 4 since `find_tail` was called on this node, which
+                // ensures all backlinks are set.
                 unsafe {
                     to_node(state).as_ref().tail.set(Some(prev));
                 }
 
-                // Release the queue lock. Doing this by subtraction is more
-                // efficient on modern processors since it is a single instruction
-                // instead of an update loop, which will fail if new threads are
-                // added to the list.
-                self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
+                // Try to release the queue lock. We need to check the state again since another
+                // thread might have acquired the lock and requested a downgrade.
+                let next = state.mask(!QUEUE_LOCKED);
+                if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
+                    // Undo the tail modification above, so that we can find the tail again above.
+                    // As mentioned above, we have exclusive control over the queue, so no other
+                    // thread could have noticed the change.
+                    unsafe {
+                        to_node(state).as_ref().tail.set(Some(tail));
+                    }
+                    state = new;
+                    continue;
+                }
 
-                // The tail was split off and the lock released. Mark the node as
-                // completed.
+                // The tail was split off and the lock was released. Mark the node as completed.
                 unsafe {
                     return Node::complete(tail);
                 }
             } else {
-                // The next waiter is a reader or the queue only consists of one
-                // waiter. Just wake all threads.
-
-                // The lock cannot be locked (checked above), so mark it as
-                // unlocked to reset the queue.
-                if let Err(new) =
-                    self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire)
-                {
+                // We are either downgrading, the next waiter is a reader, or the queue only
+                // consists of one waiter. In any case, just wake all threads.
+
+                // Clear the queue.
+                let next =
+                    if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED };
+                if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
                     state = new;
                     continue;
                 }
 
-                let mut current = tail;
-                loop {
-                    let prev = unsafe { current.as_ref().prev.get() };
-                    unsafe {
-                        Node::complete(current);
-                    }
-                    match prev {
-                        Some(prev) => current = prev,
-                        None => return,
-                    }
+                // SAFETY: we computed `tail` above, and no new nodes can have been added since
+                // (otherwise the CAS above would have failed).
+                // Thus we have complete control over the whole queue.
+                unsafe {
+                    return complete_all(tail);
                 }
             }
         }