diff options
| author | joboet <jonasboettiger@icloud.com> | 2024-04-23 11:49:37 +0200 |
|---|---|---|
| committer | joboet <jonasboettiger@icloud.com> | 2024-05-02 12:38:26 +0200 |
| commit | a56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f (patch) | |
| tree | a1fc69d5c82808c6018e0500d1f77bafdf002b85 /library/std/src/sys/pal/unix | |
| parent | fcc06c894b17f4d0c80b8934ea5f27faa894c960 (diff) | |
| download | rust-a56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f.tar.gz rust-a56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f.zip | |
std: move thread parking to `sys::sync`
Diffstat (limited to 'library/std/src/sys/pal/unix')
| -rw-r--r-- | library/std/src/sys/pal/unix/thread_parking.rs (renamed from library/std/src/sys/pal/unix/thread_parking/netbsd.rs) | 4 | ||||
| -rw-r--r-- | library/std/src/sys/pal/unix/thread_parking/darwin.rs | 130 | ||||
| -rw-r--r-- | library/std/src/sys/pal/unix/thread_parking/mod.rs | 24 | ||||
| -rw-r--r-- | library/std/src/sys/pal/unix/thread_parking/pthread.rs | 269 |
4 files changed, 4 insertions, 423 deletions
diff --git a/library/std/src/sys/pal/unix/thread_parking/netbsd.rs b/library/std/src/sys/pal/unix/thread_parking.rs index 5eeb37f8763..66ffc006057 100644 --- a/library/std/src/sys/pal/unix/thread_parking/netbsd.rs +++ b/library/std/src/sys/pal/unix/thread_parking.rs @@ -1,3 +1,7 @@ +// Only used on NetBSD. If other platforms start using id-based parking, use +// separate modules for each platform. +#![cfg(target_os = "netbsd")] + use crate::ffi::{c_int, c_void}; use crate::ptr; use crate::time::Duration; diff --git a/library/std/src/sys/pal/unix/thread_parking/darwin.rs b/library/std/src/sys/pal/unix/thread_parking/darwin.rs deleted file mode 100644 index 8231f3cba2d..00000000000 --- a/library/std/src/sys/pal/unix/thread_parking/darwin.rs +++ /dev/null @@ -1,130 +0,0 @@ -//! Thread parking for Darwin-based systems. -//! -//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they -//! cannot be used in `std` because they are non-public (their use will lead to -//! rejection from the App Store). -//! -//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin -//! supports semaphores, which allow us to implement the behaviour we need with -//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore -//! provided by libdispatch, as the underlying Mach semaphore is only dubiously -//! public. - -use crate::pin::Pin; -use crate::sync::atomic::{ - AtomicI8, - Ordering::{Acquire, Release}, -}; -use crate::time::Duration; - -type dispatch_semaphore_t = *mut crate::ffi::c_void; -type dispatch_time_t = u64; - -const DISPATCH_TIME_NOW: dispatch_time_t = 0; -const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; - -// Contained in libSystem.dylib, which is linked by default. -extern "C" { - fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t; - fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t; - fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize; - fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize; - fn dispatch_release(object: *mut crate::ffi::c_void); -} - -const EMPTY: i8 = 0; -const NOTIFIED: i8 = 1; -const PARKED: i8 = -1; - -pub struct Parker { - semaphore: dispatch_semaphore_t, - state: AtomicI8, -} - -unsafe impl Sync for Parker {} -unsafe impl Send for Parker {} - -impl Parker { - pub unsafe fn new_in_place(parker: *mut Parker) { - let semaphore = dispatch_semaphore_create(0); - assert!( - !semaphore.is_null(), - "failed to create dispatch semaphore for thread synchronization" - ); - parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) }) - } - - // Does not need `Pin`, but other implementation do. - pub unsafe fn park(self: Pin<&Self>) { - // The semaphore counter must be zero at this point, because unparking - // threads will not actually increase it until we signalled that we - // are waiting. - - // Change NOTIFIED to EMPTY and EMPTY to PARKED. - if self.state.fetch_sub(1, Acquire) == NOTIFIED { - return; - } - - // Another thread may increase the semaphore counter from this point on. - // If it is faster than us, we will decrement it again immediately below. - // If we are faster, we wait. - - // Ensure that the semaphore counter has actually been decremented, even - // if the call timed out for some reason. - while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} - - // At this point, the semaphore counter is zero again. - - // We were definitely woken up, so we don't need to check the state. - // Still, we need to reset the state using a swap to observe the state - // change with acquire ordering. - self.state.swap(EMPTY, Acquire); - } - - // Does not need `Pin`, but other implementation do. - pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - if self.state.fetch_sub(1, Acquire) == NOTIFIED { - return; - } - - let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX); - let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos); - - let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0; - - let state = self.state.swap(EMPTY, Acquire); - if state == NOTIFIED && timeout { - // If the state was NOTIFIED but semaphore_wait returned without - // decrementing the count because of a timeout, it means another - // thread is about to call semaphore_signal. We must wait for that - // to happen to ensure the semaphore count is reset. - while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} - } else { - // Either a timeout occurred and we reset the state before any thread - // tried to wake us up, or we were woken up and reset the state, - // making sure to observe the state change with acquire ordering. - // Either way, the semaphore counter is now zero again. - } - } - - // Does not need `Pin`, but other implementation do. - pub fn unpark(self: Pin<&Self>) { - let state = self.state.swap(NOTIFIED, Release); - if state == PARKED { - unsafe { - dispatch_semaphore_signal(self.semaphore); - } - } - } -} - -impl Drop for Parker { - fn drop(&mut self) { - // SAFETY: - // We always ensure that the semaphore count is reset, so this will - // never cause an exception. - unsafe { - dispatch_release(self.semaphore); - } - } -} diff --git a/library/std/src/sys/pal/unix/thread_parking/mod.rs b/library/std/src/sys/pal/unix/thread_parking/mod.rs deleted file mode 100644 index c7fa39f07b6..00000000000 --- a/library/std/src/sys/pal/unix/thread_parking/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -//! Thread parking on systems without futex support. - -#![cfg(not(any( - target_os = "linux", - target_os = "android", - all(target_os = "emscripten", target_feature = "atomics"), - target_os = "freebsd", - target_os = "openbsd", - target_os = "dragonfly", - target_os = "fuchsia", -)))] - -cfg_if::cfg_if! { - if #[cfg(all(target_vendor = "apple", not(miri)))] { - mod darwin; - pub use darwin::Parker; - } else if #[cfg(target_os = "netbsd")] { - mod netbsd; - pub use netbsd::{current, park, park_timeout, unpark, ThreadId}; - } else { - mod pthread; - pub use pthread::Parker; - } -} diff --git a/library/std/src/sys/pal/unix/thread_parking/pthread.rs b/library/std/src/sys/pal/unix/thread_parking/pthread.rs deleted file mode 100644 index 8e295453d76..00000000000 --- a/library/std/src/sys/pal/unix/thread_parking/pthread.rs +++ /dev/null @@ -1,269 +0,0 @@ -//! Thread parking without `futex` using the `pthread` synchronization primitives. - -use crate::cell::UnsafeCell; -use crate::marker::PhantomPinned; -use crate::pin::Pin; -use crate::ptr::addr_of_mut; -use crate::sync::atomic::AtomicUsize; -use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -#[cfg(not(target_os = "nto"))] -use crate::sys::time::TIMESPEC_MAX; -#[cfg(target_os = "nto")] -use crate::sys::time::TIMESPEC_MAX_CAPPED; -use crate::time::Duration; - -const EMPTY: usize = 0; -const PARKED: usize = 1; -const NOTIFIED: usize = 2; - -unsafe fn lock(lock: *mut libc::pthread_mutex_t) { - let r = libc::pthread_mutex_lock(lock); - debug_assert_eq!(r, 0); -} - -unsafe fn unlock(lock: *mut libc::pthread_mutex_t) { - let r = libc::pthread_mutex_unlock(lock); - debug_assert_eq!(r, 0); -} - -unsafe fn notify_one(cond: *mut libc::pthread_cond_t) { - let r = libc::pthread_cond_signal(cond); - debug_assert_eq!(r, 0); -} - -unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) { - let r = libc::pthread_cond_wait(cond, lock); - debug_assert_eq!(r, 0); -} - -unsafe fn wait_timeout( - cond: *mut libc::pthread_cond_t, - lock: *mut libc::pthread_mutex_t, - dur: Duration, -) { - // Use the system clock on systems that do not support pthread_condattr_setclock. - // This unfortunately results in problems when the system time changes. - #[cfg(any(target_os = "espidf", target_os = "horizon", target_vendor = "apple"))] - let (now, dur) = { - use crate::cmp::min; - use crate::sys::time::SystemTime; - - // OSX implementation of `pthread_cond_timedwait` is buggy - // with super long durations. When duration is greater than - // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait` - // in macOS Sierra return error 316. - // - // This program demonstrates the issue: - // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c - // - // To work around this issue, and possible bugs of other OSes, timeout - // is clamped to 1000 years, which is allowable per the API of `park_timeout` - // because of spurious wakeups. - let dur = min(dur, Duration::from_secs(1000 * 365 * 86400)); - let now = SystemTime::now().t; - (now, dur) - }; - // Use the monotonic clock on other systems. - #[cfg(not(any(target_os = "espidf", target_os = "horizon", target_vendor = "apple")))] - let (now, dur) = { - use crate::sys::time::Timespec; - - (Timespec::now(libc::CLOCK_MONOTONIC), dur) - }; - - #[cfg(not(target_os = "nto"))] - let timeout = - now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX); - #[cfg(target_os = "nto")] - let timeout = now - .checked_add_duration(&dur) - .and_then(|t| t.to_timespec_capped()) - .unwrap_or(TIMESPEC_MAX_CAPPED); - let r = libc::pthread_cond_timedwait(cond, lock, &timeout); - debug_assert!(r == libc::ETIMEDOUT || r == 0); -} - -pub struct Parker { - state: AtomicUsize, - lock: UnsafeCell<libc::pthread_mutex_t>, - cvar: UnsafeCell<libc::pthread_cond_t>, - // The `pthread` primitives require a stable address, so make this struct `!Unpin`. - _pinned: PhantomPinned, -} - -impl Parker { - /// Construct the UNIX parker in-place. - /// - /// # Safety - /// The constructed parker must never be moved. - pub unsafe fn new_in_place(parker: *mut Parker) { - // Use the default mutex implementation to allow for simpler initialization. - // This could lead to undefined behaviour when deadlocking. This is avoided - // by not deadlocking. Note in particular the unlocking operation before any - // panic, as code after the panic could try to park again. - addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY)); - addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER)); - - cfg_if::cfg_if! { - if #[cfg(any( - target_os = "l4re", - target_os = "android", - target_os = "redox", - target_os = "vita", - target_vendor = "apple", - ))] { - addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER)); - } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] { - let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null()); - assert_eq!(r, 0); - } else { - use crate::mem::MaybeUninit; - let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit(); - let r = libc::pthread_condattr_init(attr.as_mut_ptr()); - assert_eq!(r, 0); - let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC); - assert_eq!(r, 0); - let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr()); - assert_eq!(r, 0); - let r = libc::pthread_condattr_destroy(attr.as_mut_ptr()); - assert_eq!(r, 0); - } - } - } - - // This implementation doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. - // - // For memory ordering, see std/src/sys_common/thread_parking/futex.rs - pub unsafe fn park(self: Pin<&Self>) { - // If we were previously notified then we consume this notification and - // return quickly. - if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed).is_ok() { - return; - } - - // Otherwise we need to coordinate going to sleep - lock(self.lock.get()); - match self.state.compare_exchange(EMPTY, PARKED, Relaxed, Relaxed) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read here, even though we know it will be `NOTIFIED`. - // This is because `unpark` may have been called again since we read - // `NOTIFIED` in the `compare_exchange` above. We must perform an - // acquire operation that synchronizes with that `unpark` to observe - // any writes it made before the call to unpark. To do that we must - // read from the write it made to `state`. - let old = self.state.swap(EMPTY, Acquire); - - unlock(self.lock.get()); - - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => { - unlock(self.lock.get()); - - panic!("inconsistent park state") - } - } - - loop { - wait(self.cvar.get(), self.lock.get()); - - match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) { - Ok(_) => break, // got a notification - Err(_) => {} // spurious wakeup, go back to sleep - } - } - - unlock(self.lock.get()); - } - - // This implementation doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. Use - // `Pin` to guarantee a stable address for the mutex and condition variable. - pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - // Like `park` above we have a fast path for an already-notified thread, and - // afterwards we start coordinating for a sleep. - // return quickly. - if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed).is_ok() { - return; - } - - lock(self.lock.get()); - match self.state.compare_exchange(EMPTY, PARKED, Relaxed, Relaxed) { - Ok(_) => {} - Err(NOTIFIED) => { - // We must read again here, see `park`. - let old = self.state.swap(EMPTY, Acquire); - unlock(self.lock.get()); - - assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; - } // should consume this notification, so prohibit spurious wakeups in next park. - Err(_) => { - unlock(self.lock.get()); - panic!("inconsistent park_timeout state") - } - } - - // Wait with a timeout, and if we spuriously wake up or otherwise wake up - // from a notification we just want to unconditionally set the state back to - // empty, either consuming a notification or un-flagging ourselves as - // parked. - wait_timeout(self.cvar.get(), self.lock.get(), dur); - - match self.state.swap(EMPTY, Acquire) { - NOTIFIED => unlock(self.lock.get()), // got a notification, hurray! - PARKED => unlock(self.lock.get()), // no notification, alas - n => { - unlock(self.lock.get()); - panic!("inconsistent park_timeout state: {n}") - } - } - } - - pub fn unpark(self: Pin<&Self>) { - // To ensure the unparked thread will observe any writes we made - // before this call, we must perform a release operation that `park` - // can synchronize with. To do that we must write `NOTIFIED` even if - // `state` is already `NOTIFIED`. That is why this must be a swap - // rather than a compare-and-swap that returns if it reads `NOTIFIED` - // on failure. - match self.state.swap(NOTIFIED, Release) { - EMPTY => return, // no one was waiting - NOTIFIED => return, // already unparked - PARKED => {} // gotta go wake someone up - _ => panic!("inconsistent state in unpark"), - } - - // There is a period between when the parked thread sets `state` to - // `PARKED` (or last checked `state` in the case of a spurious wake - // up) and when it actually waits on `cvar`. If we were to notify - // during this period it would be ignored and then when the parked - // thread went to sleep it would never wake up. Fortunately, it has - // `lock` locked at this stage so we can acquire `lock` to wait until - // it is ready to receive the notification. - // - // Releasing `lock` before the call to `notify_one` means that when the - // parked thread wakes it doesn't get woken only to have to wait for us - // to release `lock`. - unsafe { - lock(self.lock.get()); - unlock(self.lock.get()); - notify_one(self.cvar.get()); - } - } -} - -impl Drop for Parker { - fn drop(&mut self) { - unsafe { - libc::pthread_cond_destroy(self.cvar.get_mut()); - libc::pthread_mutex_destroy(self.lock.get_mut()); - } - } -} - -unsafe impl Sync for Parker {} -unsafe impl Send for Parker {} |
