diff options
| author | joboet <jonasboettiger@icloud.com> | 2024-03-12 14:55:06 +0100 |
|---|---|---|
| committer | joboet <jonasboettiger@icloud.com> | 2024-03-12 15:41:06 +0100 |
| commit | 22a5267c83a3e17f2b763279eb24bb632c45dc6b (patch) | |
| tree | 81eca4925b77ec92b4f2fd66962af948fe59d094 /library/std/src/sys_common | |
| parent | 3b85d2c7fc6d1698e68b94f7bc1a5c9633f2554d (diff) | |
| download | rust-22a5267c83a3e17f2b763279eb24bb632c45dc6b.tar.gz rust-22a5267c83a3e17f2b763279eb24bb632c45dc6b.zip | |
std: move `Once` implementations to `sys`
Diffstat (limited to 'library/std/src/sys_common')
| -rw-r--r-- | library/std/src/sys_common/mod.rs | 1 | ||||
| -rw-r--r-- | library/std/src/sys_common/once/futex.rs | 146 | ||||
| -rw-r--r-- | library/std/src/sys_common/once/mod.rs | 35 | ||||
| -rw-r--r-- | library/std/src/sys_common/once/queue.rs | 294 |
4 files changed, 0 insertions, 476 deletions
diff --git a/library/std/src/sys_common/mod.rs b/library/std/src/sys_common/mod.rs index c9025a81bf3..5410f135a73 100644 --- a/library/std/src/sys_common/mod.rs +++ b/library/std/src/sys_common/mod.rs @@ -24,7 +24,6 @@ pub mod backtrace; pub mod fs; pub mod io; pub mod lazy_box; -pub mod once; pub mod process; pub mod thread; pub mod thread_info; diff --git a/library/std/src/sys_common/once/futex.rs b/library/std/src/sys_common/once/futex.rs deleted file mode 100644 index 609085dcd47..00000000000 --- a/library/std/src/sys_common/once/futex.rs +++ /dev/null @@ -1,146 +0,0 @@ -use crate::cell::Cell; -use crate::sync as public; -use crate::sync::atomic::{ - AtomicU32, - Ordering::{Acquire, Relaxed, Release}, -}; -use crate::sync::once::ExclusiveState; -use crate::sys::futex::{futex_wait, futex_wake_all}; - -// On some platforms, the OS is very nice and handles the waiter queue for us. -// This means we only need one atomic value with 5 states: - -/// No initialization has run yet, and no thread is currently using the Once. -const INCOMPLETE: u32 = 0; -/// Some thread has previously attempted to initialize the Once, but it panicked, -/// so the Once is now poisoned. There are no other threads currently accessing -/// this Once. -const POISONED: u32 = 1; -/// Some thread is currently attempting to run initialization. It may succeed, -/// so all future threads need to wait for it to finish. -const RUNNING: u32 = 2; -/// Some thread is currently attempting to run initialization and there are threads -/// waiting for it to finish. -const QUEUED: u32 = 3; -/// Initialization has completed and all future calls should finish immediately. -const COMPLETE: u32 = 4; - -// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state -// variable. When the running thread finishes, it will wake all waiting threads using -// `futex_wake_all`. - -pub struct OnceState { - poisoned: bool, - set_state_to: Cell<u32>, -} - -impl OnceState { - #[inline] - pub fn is_poisoned(&self) -> bool { - self.poisoned - } - - #[inline] - pub fn poison(&self) { - self.set_state_to.set(POISONED); - } -} - -struct CompletionGuard<'a> { - state: &'a AtomicU32, - set_state_on_drop_to: u32, -} - -impl<'a> Drop for CompletionGuard<'a> { - fn drop(&mut self) { - // Use release ordering to propagate changes to all threads checking - // up on the Once. `futex_wake_all` does its own synchronization, hence - // we do not need `AcqRel`. - if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED { - futex_wake_all(&self.state); - } - } -} - -pub struct Once { - state: AtomicU32, -} - -impl Once { - #[inline] - pub const fn new() -> Once { - Once { state: AtomicU32::new(INCOMPLETE) } - } - - #[inline] - pub fn is_completed(&self) -> bool { - // Use acquire ordering to make all initialization changes visible to the - // current thread. - self.state.load(Acquire) == COMPLETE - } - - #[inline] - pub(crate) fn state(&mut self) -> ExclusiveState { - match *self.state.get_mut() { - INCOMPLETE => ExclusiveState::Incomplete, - POISONED => ExclusiveState::Poisoned, - COMPLETE => ExclusiveState::Complete, - _ => unreachable!("invalid Once state"), - } - } - - // This uses FnMut to match the API of the generic implementation. As this - // implementation is quite light-weight, it is generic over the closure and - // so avoids the cost of dynamic dispatch. - #[cold] - #[track_caller] - pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) { - let mut state = self.state.load(Acquire); - loop { - match state { - POISONED if !ignore_poisoning => { - // Panic to propagate the poison. - panic!("Once instance has previously been poisoned"); - } - INCOMPLETE | POISONED => { - // Try to register the current thread as the one running. - if let Err(new) = - self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire) - { - state = new; - continue; - } - // `waiter_queue` will manage other waiting threads, and - // wake them up on drop. - let mut waiter_queue = - CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED }; - // Run the function, letting it know if we're poisoned or not. - let f_state = public::OnceState { - inner: OnceState { - poisoned: state == POISONED, - set_state_to: Cell::new(COMPLETE), - }, - }; - f(&f_state); - waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); - return; - } - RUNNING | QUEUED => { - // Set the state to QUEUED if it is not already. - if state == RUNNING - && let Err(new) = - self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire) - { - state = new; - continue; - } - - futex_wait(&self.state, QUEUED, None); - state = self.state.load(Acquire); - } - COMPLETE => return, - _ => unreachable!("state is never set to invalid values"), - } - } - } -} diff --git a/library/std/src/sys_common/once/mod.rs b/library/std/src/sys_common/once/mod.rs deleted file mode 100644 index ec57568c54c..00000000000 --- a/library/std/src/sys_common/once/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// A "once" is a relatively simple primitive, and it's also typically provided -// by the OS as well (see `pthread_once` or `InitOnceExecuteOnce`). The OS -// primitives, however, tend to have surprising restrictions, such as the Unix -// one doesn't allow an argument to be passed to the function. -// -// As a result, we end up implementing it ourselves in the standard library. -// This also gives us the opportunity to optimize the implementation a bit which -// should help the fast path on call sites. - -cfg_if::cfg_if! { - if #[cfg(any( - target_os = "linux", - target_os = "android", - all(target_arch = "wasm32", target_feature = "atomics"), - target_os = "freebsd", - target_os = "openbsd", - target_os = "dragonfly", - target_os = "fuchsia", - target_os = "hermit", - ))] { - mod futex; - pub use futex::{Once, OnceState}; - } else if #[cfg(any( - windows, - target_family = "unix", - all(target_vendor = "fortanix", target_env = "sgx"), - target_os = "solid_asp3", - target_os = "xous", - ))] { - mod queue; - pub use queue::{Once, OnceState}; - } else { - pub use crate::sys::once::{Once, OnceState}; - } -} diff --git a/library/std/src/sys_common/once/queue.rs b/library/std/src/sys_common/once/queue.rs deleted file mode 100644 index 730cdb768bd..00000000000 --- a/library/std/src/sys_common/once/queue.rs +++ /dev/null @@ -1,294 +0,0 @@ -// Each `Once` has one word of atomic state, and this state is CAS'd on to -// determine what to do. There are four possible state of a `Once`: -// -// * Incomplete - no initialization has run yet, and no thread is currently -// using the Once. -// * Poisoned - some thread has previously attempted to initialize the Once, but -// it panicked, so the Once is now poisoned. There are no other -// threads currently accessing this Once. -// * Running - some thread is currently attempting to run initialization. It may -// succeed, so all future threads need to wait for it to finish. -// Note that this state is accompanied with a payload, described -// below. -// * Complete - initialization has completed and all future calls should finish -// immediately. -// -// With 4 states we need 2 bits to encode this, and we use the remaining bits -// in the word we have allocated as a queue of threads waiting for the thread -// responsible for entering the RUNNING state. This queue is just a linked list -// of Waiter nodes which is monotonically increasing in size. Each node is -// allocated on the stack, and whenever the running closure finishes it will -// consume the entire queue and notify all waiters they should try again. -// -// You'll find a few more details in the implementation, but that's the gist of -// it! -// -// Atomic orderings: -// When running `Once` we deal with multiple atomics: -// `Once.state_and_queue` and an unknown number of `Waiter.signaled`. -// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the -// result of the `Once`, and (3) for synchronizing `Waiter` nodes. -// - At the end of the `call` function we have to make sure the result -// of the `Once` is acquired. So every load which can be the only one to -// load COMPLETED must have at least acquire ordering, which means all -// three of them. -// - `WaiterQueue::drop` is the only place that may store COMPLETED, and -// must do so with release ordering to make the result available. -// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and -// needs to make the nodes available with release ordering. The load in -// its `compare_exchange` can be relaxed because it only has to compare -// the atomic, not to read other data. -// - `WaiterQueue::drop` must see the `Waiter` nodes, so it must load -// `state_and_queue` with acquire ordering. -// - There is just one store where `state_and_queue` is used only as a -// state flag, without having to synchronize data: switching the state -// from INCOMPLETE to RUNNING in `call`. This store can be Relaxed, -// but the read has to be Acquire because of the requirements mentioned -// above. -// * `Waiter.signaled` is both used as a flag, and to protect a field with -// interior mutability in `Waiter`. `Waiter.thread` is changed in -// `WaiterQueue::drop` which then sets `signaled` with release ordering. -// After `wait` loads `signaled` with acquire ordering and sees it is true, -// it needs to see the changes to drop the `Waiter` struct correctly. -// * There is one place where the two atomics `Once.state_and_queue` and -// `Waiter.signaled` come together, and might be reordered by the compiler or -// processor. Because both use acquire ordering such a reordering is not -// allowed, so no need for `SeqCst`. - -use crate::cell::Cell; -use crate::fmt; -use crate::ptr; -use crate::sync as public; -use crate::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use crate::sync::once::ExclusiveState; -use crate::thread::{self, Thread}; - -type Masked = (); - -pub struct Once { - state_and_queue: AtomicPtr<Masked>, -} - -pub struct OnceState { - poisoned: bool, - set_state_on_drop_to: Cell<*mut Masked>, -} - -// Four states that a Once can be in, encoded into the lower bits of -// `state_and_queue` in the Once structure. -const INCOMPLETE: usize = 0x0; -const POISONED: usize = 0x1; -const RUNNING: usize = 0x2; -const COMPLETE: usize = 0x3; - -// Mask to learn about the state. All other bits are the queue of waiters if -// this is in the RUNNING state. -const STATE_MASK: usize = 0x3; - -// Representation of a node in the linked list of waiters, used while in the -// RUNNING state. -// Note: `Waiter` can't hold a mutable pointer to the next thread, because then -// `wait` would both hand out a mutable reference to its `Waiter` node, and keep -// a shared reference to check `signaled`. Instead we hold shared references and -// use interior mutability. -#[repr(align(4))] // Ensure the two lower bits are free to use as state bits. -struct Waiter { - thread: Cell<Option<Thread>>, - signaled: AtomicBool, - next: *const Waiter, -} - -// Head of a linked list of waiters. -// Every node is a struct on the stack of a waiting thread. -// Will wake up the waiters when it gets dropped, i.e. also on panic. -struct WaiterQueue<'a> { - state_and_queue: &'a AtomicPtr<Masked>, - set_state_on_drop_to: *mut Masked, -} - -impl Once { - #[inline] - #[rustc_const_stable(feature = "const_once_new", since = "1.32.0")] - pub const fn new() -> Once { - Once { state_and_queue: AtomicPtr::new(ptr::without_provenance_mut(INCOMPLETE)) } - } - - #[inline] - pub fn is_completed(&self) -> bool { - // An `Acquire` load is enough because that makes all the initialization - // operations visible to us, and, this being a fast path, weaker - // ordering helps with performance. This `Acquire` synchronizes with - // `Release` operations on the slow path. - self.state_and_queue.load(Ordering::Acquire).addr() == COMPLETE - } - - #[inline] - pub(crate) fn state(&mut self) -> ExclusiveState { - match self.state_and_queue.get_mut().addr() { - INCOMPLETE => ExclusiveState::Incomplete, - POISONED => ExclusiveState::Poisoned, - COMPLETE => ExclusiveState::Complete, - _ => unreachable!("invalid Once state"), - } - } - - // This is a non-generic function to reduce the monomorphization cost of - // using `call_once` (this isn't exactly a trivial or small implementation). - // - // Additionally, this is tagged with `#[cold]` as it should indeed be cold - // and it helps let LLVM know that calls to this function should be off the - // fast path. Essentially, this should help generate more straight line code - // in LLVM. - // - // Finally, this takes an `FnMut` instead of a `FnOnce` because there's - // currently no way to take an `FnOnce` and call it via virtual dispatch - // without some allocation overhead. - #[cold] - #[track_caller] - pub fn call(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&public::OnceState)) { - let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire); - loop { - match state_and_queue.addr() { - COMPLETE => break, - POISONED if !ignore_poisoning => { - // Panic to propagate the poison. - panic!("Once instance has previously been poisoned"); - } - POISONED | INCOMPLETE => { - // Try to register this thread as the one RUNNING. - let exchange_result = self.state_and_queue.compare_exchange( - state_and_queue, - ptr::without_provenance_mut(RUNNING), - Ordering::Acquire, - Ordering::Acquire, - ); - if let Err(old) = exchange_result { - state_and_queue = old; - continue; - } - // `waiter_queue` will manage other waiting threads, and - // wake them up on drop. - let mut waiter_queue = WaiterQueue { - state_and_queue: &self.state_and_queue, - set_state_on_drop_to: ptr::without_provenance_mut(POISONED), - }; - // Run the initialization function, letting it know if we're - // poisoned or not. - let init_state = public::OnceState { - inner: OnceState { - poisoned: state_and_queue.addr() == POISONED, - set_state_on_drop_to: Cell::new(ptr::without_provenance_mut(COMPLETE)), - }, - }; - init(&init_state); - waiter_queue.set_state_on_drop_to = init_state.inner.set_state_on_drop_to.get(); - break; - } - _ => { - // All other values must be RUNNING with possibly a - // pointer to the waiter queue in the more significant bits. - assert!(state_and_queue.addr() & STATE_MASK == RUNNING); - wait(&self.state_and_queue, state_and_queue); - state_and_queue = self.state_and_queue.load(Ordering::Acquire); - } - } - } - } -} - -fn wait(state_and_queue: &AtomicPtr<Masked>, mut current_state: *mut Masked) { - // Note: the following code was carefully written to avoid creating a - // mutable reference to `node` that gets aliased. - loop { - // Don't queue this thread if the status is no longer running, - // otherwise we will not be woken up. - if current_state.addr() & STATE_MASK != RUNNING { - return; - } - - // Create the node for our current thread. - let node = Waiter { - thread: Cell::new(Some(thread::current())), - signaled: AtomicBool::new(false), - next: current_state.with_addr(current_state.addr() & !STATE_MASK) as *const Waiter, - }; - let me = core::ptr::addr_of!(node) as *const Masked as *mut Masked; - - // Try to slide in the node at the head of the linked list, making sure - // that another thread didn't just replace the head of the linked list. - let exchange_result = state_and_queue.compare_exchange( - current_state, - me.with_addr(me.addr() | RUNNING), - Ordering::Release, - Ordering::Relaxed, - ); - if let Err(old) = exchange_result { - current_state = old; - continue; - } - - // We have enqueued ourselves, now lets wait. - // It is important not to return before being signaled, otherwise we - // would drop our `Waiter` node and leave a hole in the linked list - // (and a dangling reference). Guard against spurious wakeups by - // reparking ourselves until we are signaled. - while !node.signaled.load(Ordering::Acquire) { - // If the managing thread happens to signal and unpark us before we - // can park ourselves, the result could be this thread never gets - // unparked. Luckily `park` comes with the guarantee that if it got - // an `unpark` just before on an unparked thread it does not park. - thread::park(); - } - break; - } -} - -#[stable(feature = "std_debug", since = "1.16.0")] -impl fmt::Debug for Once { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Once").finish_non_exhaustive() - } -} - -impl Drop for WaiterQueue<'_> { - fn drop(&mut self) { - // Swap out our state with however we finished. - let state_and_queue = - self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel); - - // We should only ever see an old state which was RUNNING. - assert_eq!(state_and_queue.addr() & STATE_MASK, RUNNING); - - // Walk the entire linked list of waiters and wake them up (in lifo - // order, last to register is first to wake up). - unsafe { - // Right after setting `node.signaled = true` the other thread may - // free `node` if there happens to be has a spurious wakeup. - // So we have to take out the `thread` field and copy the pointer to - // `next` first. - let mut queue = - state_and_queue.with_addr(state_and_queue.addr() & !STATE_MASK) as *const Waiter; - while !queue.is_null() { - let next = (*queue).next; - let thread = (*queue).thread.take().unwrap(); - (*queue).signaled.store(true, Ordering::Release); - // ^- FIXME (maybe): This is another case of issue #55005 - // `store()` has a potentially dangling ref to `signaled`. - queue = next; - thread.unpark(); - } - } - } -} - -impl OnceState { - #[inline] - pub fn is_poisoned(&self) -> bool { - self.poisoned - } - - #[inline] - pub fn poison(&self) { - self.set_state_on_drop_to.set(ptr::without_provenance_mut(POISONED)); - } -} |
