diff options
| -rw-r--r-- | library/std/src/sys/pal/unix/locks/queue_rwlock.rs | 339 |
1 files changed, 195 insertions, 144 deletions
diff --git a/library/std/src/sys/pal/unix/locks/queue_rwlock.rs b/library/std/src/sys/pal/unix/locks/queue_rwlock.rs index 315c59a02d5..7619adbf839 100644 --- a/library/std/src/sys/pal/unix/locks/queue_rwlock.rs +++ b/library/std/src/sys/pal/unix/locks/queue_rwlock.rs @@ -39,16 +39,16 @@ //! //! ## State //! -//! A single [`AtomicPtr`] is used as state variable. The lowest two bits are used +//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used //! to indicate the meaning of the remaining bits: //! -//! | `LOCKED` | `QUEUED` | Remaining | | -//! |:----------|:----------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| -//! | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | -//! | 1 | 0 | 0 | The lock is write-locked, no threads waiting | -//! | 1 | 0 | n > 0 | The lock is read-locked with n readers | -//! | 0 | 1 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | -//! | 1 | 1 | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | +//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | +//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| +//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | +//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | +//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers | +//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | +//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | //! //! ## Waiter queue //! @@ -84,28 +84,31 @@ //! ``` //! //! Invariants: -//! 1. The `next` field always points to a valid node, except in the tail node. -//! 2. The `next` field of the tail node must be null while the queue is unlocked. -//! 3. At least one node must contain a non-null, current `tail` field. -//! 4. The first non-null `tail` field must be valid and current. -//! 5. All nodes following this node must have a correct, non-null `prev` field. +//! 1. At least one node must contain a non-null, current `tail` field. +//! 2. The first non-null `tail` field must be valid and current. +//! 3. All nodes preceding this node must have a correct, non-null `next` field. +//! 4. All nodes following this node must have a correct, non-null `prev` field. //! -//! While adding a new node to the queue may be done by any thread at any time, -//! removing nodes may only be done by a single thread. Instead of using a -//! separate lock bit for the queue like usync does, this implementation -//! only allows the (last) lock owner to modify the queue. +//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads +//! try to set both after enqueuing themselves to eagerly add backlinks to the +//! queue and after unlocking the lock to wake the next waiter(s). This is done +//! atomically at the same time as the enqueuing/unlocking operation. The thread +//! releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up +//! waiters as appropriate. This guarantees forward-progress even if the unlocking +//! thread could not acquire the queue lock. //! //! ## Memory orderings //! //! To properly synchronize changes to the data protected by the lock, the lock //! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively. -//! To propagate the initialization of nodes, changes to the list are also propagated -//! using these orderings. +//! To propagate the initialization of nodes, changes to the queue lock are also +//! performed using these orderings. #![forbid(unsafe_op_in_unsafe_fn)] use crate::cell::OnceCell; use crate::hint::spin_loop; +use crate::mem; use crate::ptr::{self, invalid_mut, null_mut, NonNull}; use crate::sync::atomic::{ AtomicBool, AtomicPtr, @@ -114,7 +117,10 @@ use crate::sync::atomic::{ use crate::sys_common::thread_info; use crate::thread::Thread; -const SPIN_COUNT: usize = 6; +// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the +// locking operation will be retried. +// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times. +const SPIN_COUNT: usize = 7; type State = *mut (); type AtomicState = AtomicPtr<()>; @@ -122,23 +128,24 @@ type AtomicState = AtomicPtr<()>; const UNLOCKED: State = invalid_mut(0); const LOCKED: usize = 1; const QUEUED: usize = 2; -const SINGLE: usize = 4; -const MASK: usize = !(LOCKED | QUEUED); +const QUEUE_LOCKED: usize = 4; +const SINGLE: usize = 8; +const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); /// Returns a closure that changes the state to the lock state corresponding to -/// the lock mode indicated in `read`. +/// the lock mode indicated in `write`. #[inline] -fn lock(read: bool) -> impl Fn(State) -> Option<State> { +fn lock(write: bool) -> impl Fn(State) -> Option<State> { move |state| { - if read { + if write { + let state = state.wrapping_byte_add(LOCKED); + if state.addr() & LOCKED == LOCKED { Some(state) } else { None } + } else { if state.addr() & QUEUED == 0 && state.addr() != LOCKED { Some(invalid_mut(state.addr().checked_add(SINGLE)? | LOCKED)) } else { None } - } else { - let state = state.wrapping_byte_add(LOCKED); - if state.addr() & LOCKED == LOCKED { Some(state) } else { None } } } } @@ -169,24 +176,24 @@ impl AtomicLink { } } -#[repr(align(4))] +#[repr(align(8))] struct Node { next: AtomicLink, prev: AtomicLink, tail: AtomicLink, - read: bool, + write: bool, thread: OnceCell<Thread>, completed: AtomicBool, } impl Node { /// Create a new queue node. - fn new(read: bool) -> Node { + fn new(write: bool) -> Node { Node { next: AtomicLink::new(None), prev: AtomicLink::new(None), tail: AtomicLink::new(None), - read, + write, thread: OnceCell::new(), completed: AtomicBool::new(false), } @@ -201,9 +208,9 @@ impl Node { } /// Assuming the node contains a reader lock count, decrement that count. - /// Returns `true` if there are other lock owners. + /// Returns `true` if this thread was the last lock owner. fn decrement_count(&self) -> bool { - self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() > SINGLE + self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0 } /// Prepare this node for waiting. @@ -239,6 +246,14 @@ impl Node { } } +struct PanicGuard; + +impl Drop for PanicGuard { + fn drop(&mut self) { + rtabort!("tried to drop node in intrusive list."); + } +} + /// Find the tail of the queue beginning with `head`, caching the result in `head`. /// /// May be called from multiple threads at the same time, while the queue is not @@ -257,9 +272,8 @@ unsafe fn find_tail(head: NonNull<Node>) -> NonNull<Node> { match c.tail.get() { Some(tail) => break tail, // SAFETY: - // Only the `next` field of the tail is null (invariants 1. and 2.) - // Since at least one element in the queue has a non-null tail (invariant 3.), - // this code will never be run for `current == tail`. + // All `next` fields before the first node with a `set` tail are + // non-null and valid (invariant 3). None => unsafe { let next = c.next.get().unwrap_unchecked(); next.as_ref().prev.set(Some(current)); @@ -286,13 +300,13 @@ impl RwLock { #[inline] pub fn try_read(&self) -> bool { - self.state.fetch_update(Acquire, Relaxed, lock(true)).is_ok() + self.state.fetch_update(Acquire, Relaxed, lock(false)).is_ok() } #[inline] pub fn read(&self) { if !self.try_read() { - self.lock_contended(true) + self.lock_contended(false) } } @@ -300,22 +314,22 @@ impl RwLock { pub fn try_write(&self) -> bool { // This is lowered to a single atomic instruction on most modern processors // (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore - // is more efficient than `fetch_update(lock(false))`, which can spuriously + // is more efficient than `fetch_update(lock(true))`, which can spuriously // fail if a new node is appended to the queue. - self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED != LOCKED + self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0 } #[inline] pub fn write(&self) { if !self.try_write() { - self.lock_contended(false) + self.lock_contended(true) } } #[cold] - fn lock_contended(&self, read: bool) { - let update = lock(read); - let mut node = Node::new(read); + fn lock_contended(&self, write: bool) { + let update = lock(write); + let mut node = Node::new(write); let mut state = self.state.load(Relaxed); let mut count = 0; loop { @@ -326,8 +340,9 @@ impl RwLock { Err(new) => state = new, } } else if state.addr() & QUEUED == 0 && count < SPIN_COUNT { - // If the lock is not available but no threads are queued, spin - // for a while. + // If the lock is not available and no threads are queued, spin + // for a while, using exponential backoff to decrease cache + // contention. for _ in 0..(1 << count) { spin_loop(); } @@ -338,18 +353,26 @@ impl RwLock { node.prepare(); node.set_state(state); node.prev = AtomicLink::new(None); - // If this is the first node in the queue, set the tail field to - // the node itself to ensure there is a current `tail` field in - // the queue (invariants 3. and 4.). This needs to use `set` to - // avoid invalidating the new pointer. - node.tail.set((state.addr() & QUEUED == 0).then_some(NonNull::from(&node))); - - let next = ptr::from_ref(&node) + let mut next = ptr::from_ref(&node) .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) as State; + + if state.addr() & QUEUED == 0 { + // If this is the first node in the queue, set the tail field to + // the node itself to ensure there is a current `tail` field in + // the queue (invariants 1 and 2). This needs to use `set` to + // avoid invalidating the new pointer. + node.tail.set(Some(NonNull::from(&node))); + } else { + // Otherwise, the tail of the queue is not known. + node.tail.set(None); + // Try locking the queue to fully link it. + next = next.map_addr(|addr| addr | QUEUE_LOCKED); + } + // Use release ordering to propagate our changes to the waking // thread. - if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Relaxed) { + if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { // The state has changed, just try again. state = new; continue; @@ -357,13 +380,27 @@ impl RwLock { // The node is registered, so the structure must not be // mutably accessed or destroyed while other threads may - // be accessing it. Just wait until it is completed. + // be accessing it. Guard against unwinds using a panic + // guard that aborts when dropped. + let guard = PanicGuard; + + // If the current thread locked the queue, unlock it again, + // linking it in the process. + if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { + unsafe { + self.unlock_queue(next); + } + } + // Wait until the node is removed from the queue. // SAFETY: the node was created by the current thread. unsafe { node.wait(); } + // The node was removed from the queue, disarm the guard. + mem::forget(guard); + // Reload the state and try again. state = self.state.load(Relaxed); count = 0; @@ -382,114 +419,128 @@ impl RwLock { } }) { Ok(_) => {} - Err(state) => unsafe { self.unlock_contended(state, true) }, + // There are waiters queued and the lock count was moved to the + // tail of the queue. + Err(state) => unsafe { self.read_unlock_contended(state) }, + } + } + + #[cold] + unsafe fn read_unlock_contended(&self, state: State) { + // The state was observed with acquire ordering above, so the current + // thread will observe all node initializations. + + let tail = unsafe { find_tail(to_node(state)) }; + let was_last = unsafe { tail.as_ref().decrement_count() }; + if was_last { + // SAFETY: + // Other threads cannot read-lock while threads are queued. Also, + // the `LOCKED` bit is still set, so there are no writers. Therefore, + // the current thread exclusively owns the lock. + unsafe { self.unlock_contended(state) } } } #[inline] pub unsafe fn write_unlock(&self) { - match self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Acquire) { - Ok(_) => {} + if let Err(state) = + self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Relaxed) + { + // SAFETY: // Since other threads cannot acquire the lock, the state can only // have changed because there are threads queued on the lock. - Err(state) => unsafe { self.unlock_contended(state, false) }, + unsafe { self.unlock_contended(state) } } } /// # Safety - /// The lock must be locked by the current thread and threads must be queued on it. + /// * The lock must be exclusively owned by this thread. + /// * There must be threads queued on the lock. #[cold] - unsafe fn unlock_contended(&self, mut state: State, read: bool) { - // Find the last node in the linked queue. - let tail = unsafe { find_tail(to_node(state)) }; - let not_last = unsafe { read && tail.as_ref().decrement_count() }; - if not_last { - // There are other lock owners, leave waking up the next waiters to them. - return; + unsafe fn unlock_contended(&self, mut state: State) { + loop { + // Atomically release the lock and try to acquire the queue lock. + let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED); + match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { + // The queue lock was acquired. Release it, waking up the next + // waiter in the process. + Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe { + return self.unlock_queue(next); + }, + // Another thread already holds the queue lock, leave waking up + // waiters to it. + Ok(_) => return, + Err(new) => state = new, + } } + } - // At this point, the `next` field on `tail` will always be null - // (invariant 2). + /// # Safety + /// The queue lock must be held by the current thread. + unsafe fn unlock_queue(&self, mut state: State) { + debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); - let next_read = unsafe { tail.as_ref().read }; - if next_read { - // The next waiter is a reader. Just wake all threads. - // - // SAFETY: - // `current` is the head of a valid queue, which no thread except the - // the current can observe. - unsafe { - let mut current = to_node(self.state.swap(UNLOCKED, AcqRel)); - loop { - let next = current.as_ref().next.get(); - Node::complete(current); - match next { - Some(next) => current = next, - None => break, - } - } - } - } else { - // The next waiter is a writer. Remove it from the queue and wake it. - let prev = match unsafe { tail.as_ref().prev.get() } { - // If the lock was read-locked, multiple threads have invoked - // `find_tail` above. Therefore, it is possible that one of - // them observed a newer state than this thread did, meaning - // there is a set `tail` field in a node before `state`. To - // make sure that the queue is valid after the link update - // below, reload the state and relink the queue. - // - // SAFETY: since the current thread holds the lock, the queue - // was not removed from since the last time and therefore is - // still valid. - Some(prev) if read => unsafe { - let new = self.state.load(Acquire); - if new != state { + loop { + // Find the last node in the linked list. + let tail = unsafe { find_tail(to_node(state)) }; + + if state.addr() & LOCKED == LOCKED { + // Another thread has locked the lock. Leave waking up waiters + // to them by releasing the queue lock. + match self.state.compare_exchange_weak( + state, + state.mask(!QUEUE_LOCKED), + Release, + Acquire, + ) { + Ok(_) => return, + Err(new) => { state = new; - find_tail(to_node(state)); + continue; } - Some(prev) - }, - Some(prev) => Some(prev), - // The current node is the only one in the queue that we observed. - // Try setting the state to UNLOCKED. - None => self.state.compare_exchange(state, UNLOCKED, Release, Acquire).err().map( - |new| { - state = new; - // Since the state was locked, it can only have changed - // because a new node was added since `state` was loaded. - // Relink the queue and get a pointer to the node before - // `tail`. - unsafe { - find_tail(to_node(state)); - tail.as_ref().prev.get().unwrap() - } - }, - ), - }; - - if let Some(prev) = prev { - unsafe { - // The `next` field of the tail field must be zero when - // releasing the lock (queue invariant 2). - prev.as_ref().next.set(None); - // There are no set `tail` links before the node pointed to by - // `state`, so the first non-null tail field will be current - // (queue invariant 4). - to_node(state).as_ref().tail.set(Some(prev)); } - - // Release the lock. Doing this by subtraction is more efficient - // on modern processors since it is a single instruction instead - // of an update loop, which will fail if new threads are added - // to the queue. - self.state.fetch_byte_sub(LOCKED, Release); } - // The tail was split off and the lock released. Mark the node as - // completed. - unsafe { - Node::complete(tail); + let is_writer = unsafe { tail.as_ref().write }; + if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } { + // `tail` is a writer and there is a node before `tail`. + // Split off `tail`. + + // There are no set `tail` links before the node pointed to by + // `state`, so the first non-null tail field will be current + // (invariant 2). Invariant 4 is fullfilled since `find_tail` + // was called on this node, which ensures all backlinks are set. + unsafe { to_node(state).as_ref().tail.set(Some(prev)); } + + // Release the queue lock. Doing this by subtraction is more + // efficient on modern processors since it is a single instruction + // instead of an update loop, which will fail if new threads are + // added to the list. + self.state.fetch_byte_sub(QUEUE_LOCKED, Release); + + // The tail was split off and the lock released. Mark the node as + // completed. + unsafe { return Node::complete(tail); } + } else { + // The next waiter is a reader or the queue only consists of one + // waiter. Just wake all threads. + + // The lock cannot be locked (checked above), so mark it as + // unlocked to reset the queue. + if let Err(new) = self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) { + state = new; + continue; + } + + let mut current = tail; + loop { + let prev = unsafe { current.as_ref().prev.get() }; + unsafe { Node::complete(current); } + match prev { + Some(prev) => current = prev, + None => return, + } + } } } } |
