about summary refs log tree commit diff
path: root/library/std/src/sys/pal/unix
diff options
context:
space:
mode:
authorjoboet <jonasboettiger@icloud.com>2024-04-23 11:49:37 +0200
committerjoboet <jonasboettiger@icloud.com>2024-05-02 12:38:26 +0200
commita56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f (patch)
treea1fc69d5c82808c6018e0500d1f77bafdf002b85 /library/std/src/sys/pal/unix
parentfcc06c894b17f4d0c80b8934ea5f27faa894c960 (diff)
downloadrust-a56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f.tar.gz
rust-a56fd370fc3e6b5cfbd9eb0b96f533d0bcca0b1f.zip
std: move thread parking to `sys::sync`
Diffstat (limited to 'library/std/src/sys/pal/unix')
-rw-r--r--library/std/src/sys/pal/unix/thread_parking.rs (renamed from library/std/src/sys/pal/unix/thread_parking/netbsd.rs)4
-rw-r--r--library/std/src/sys/pal/unix/thread_parking/darwin.rs130
-rw-r--r--library/std/src/sys/pal/unix/thread_parking/mod.rs24
-rw-r--r--library/std/src/sys/pal/unix/thread_parking/pthread.rs269
4 files changed, 4 insertions, 423 deletions
diff --git a/library/std/src/sys/pal/unix/thread_parking/netbsd.rs b/library/std/src/sys/pal/unix/thread_parking.rs
index 5eeb37f8763..66ffc006057 100644
--- a/library/std/src/sys/pal/unix/thread_parking/netbsd.rs
+++ b/library/std/src/sys/pal/unix/thread_parking.rs
@@ -1,3 +1,7 @@
+// Only used on NetBSD. If other platforms start using id-based parking, use
+// separate modules for each platform.
+#![cfg(target_os = "netbsd")]
+
 use crate::ffi::{c_int, c_void};
 use crate::ptr;
 use crate::time::Duration;
diff --git a/library/std/src/sys/pal/unix/thread_parking/darwin.rs b/library/std/src/sys/pal/unix/thread_parking/darwin.rs
deleted file mode 100644
index 8231f3cba2d..00000000000
--- a/library/std/src/sys/pal/unix/thread_parking/darwin.rs
+++ /dev/null
@@ -1,130 +0,0 @@
-//! Thread parking for Darwin-based systems.
-//!
-//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
-//! cannot be used in `std` because they are non-public (their use will lead to
-//! rejection from the App Store).
-//!
-//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
-//! supports semaphores, which allow us to implement the behaviour we need with
-//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
-//! provided by libdispatch, as the underlying Mach semaphore is only dubiously
-//! public.
-
-use crate::pin::Pin;
-use crate::sync::atomic::{
-    AtomicI8,
-    Ordering::{Acquire, Release},
-};
-use crate::time::Duration;
-
-type dispatch_semaphore_t = *mut crate::ffi::c_void;
-type dispatch_time_t = u64;
-
-const DISPATCH_TIME_NOW: dispatch_time_t = 0;
-const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
-
-// Contained in libSystem.dylib, which is linked by default.
-extern "C" {
-    fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
-    fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
-    fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
-    fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
-    fn dispatch_release(object: *mut crate::ffi::c_void);
-}
-
-const EMPTY: i8 = 0;
-const NOTIFIED: i8 = 1;
-const PARKED: i8 = -1;
-
-pub struct Parker {
-    semaphore: dispatch_semaphore_t,
-    state: AtomicI8,
-}
-
-unsafe impl Sync for Parker {}
-unsafe impl Send for Parker {}
-
-impl Parker {
-    pub unsafe fn new_in_place(parker: *mut Parker) {
-        let semaphore = dispatch_semaphore_create(0);
-        assert!(
-            !semaphore.is_null(),
-            "failed to create dispatch semaphore for thread synchronization"
-        );
-        parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
-    }
-
-    // Does not need `Pin`, but other implementation do.
-    pub unsafe fn park(self: Pin<&Self>) {
-        // The semaphore counter must be zero at this point, because unparking
-        // threads will not actually increase it until we signalled that we
-        // are waiting.
-
-        // Change NOTIFIED to EMPTY and EMPTY to PARKED.
-        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
-            return;
-        }
-
-        // Another thread may increase the semaphore counter from this point on.
-        // If it is faster than us, we will decrement it again immediately below.
-        // If we are faster, we wait.
-
-        // Ensure that the semaphore counter has actually been decremented, even
-        // if the call timed out for some reason.
-        while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
-
-        // At this point, the semaphore counter is zero again.
-
-        // We were definitely woken up, so we don't need to check the state.
-        // Still, we need to reset the state using a swap to observe the state
-        // change with acquire ordering.
-        self.state.swap(EMPTY, Acquire);
-    }
-
-    // Does not need `Pin`, but other implementation do.
-    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
-        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
-            return;
-        }
-
-        let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
-        let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);
-
-        let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;
-
-        let state = self.state.swap(EMPTY, Acquire);
-        if state == NOTIFIED && timeout {
-            // If the state was NOTIFIED but semaphore_wait returned without
-            // decrementing the count because of a timeout, it means another
-            // thread is about to call semaphore_signal. We must wait for that
-            // to happen to ensure the semaphore count is reset.
-            while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
-        } else {
-            // Either a timeout occurred and we reset the state before any thread
-            // tried to wake us up, or we were woken up and reset the state,
-            // making sure to observe the state change with acquire ordering.
-            // Either way, the semaphore counter is now zero again.
-        }
-    }
-
-    // Does not need `Pin`, but other implementation do.
-    pub fn unpark(self: Pin<&Self>) {
-        let state = self.state.swap(NOTIFIED, Release);
-        if state == PARKED {
-            unsafe {
-                dispatch_semaphore_signal(self.semaphore);
-            }
-        }
-    }
-}
-
-impl Drop for Parker {
-    fn drop(&mut self) {
-        // SAFETY:
-        // We always ensure that the semaphore count is reset, so this will
-        // never cause an exception.
-        unsafe {
-            dispatch_release(self.semaphore);
-        }
-    }
-}
diff --git a/library/std/src/sys/pal/unix/thread_parking/mod.rs b/library/std/src/sys/pal/unix/thread_parking/mod.rs
deleted file mode 100644
index c7fa39f07b6..00000000000
--- a/library/std/src/sys/pal/unix/thread_parking/mod.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-//! Thread parking on systems without futex support.
-
-#![cfg(not(any(
-    target_os = "linux",
-    target_os = "android",
-    all(target_os = "emscripten", target_feature = "atomics"),
-    target_os = "freebsd",
-    target_os = "openbsd",
-    target_os = "dragonfly",
-    target_os = "fuchsia",
-)))]
-
-cfg_if::cfg_if! {
-    if #[cfg(all(target_vendor = "apple", not(miri)))] {
-        mod darwin;
-        pub use darwin::Parker;
-    } else if #[cfg(target_os = "netbsd")] {
-        mod netbsd;
-        pub use netbsd::{current, park, park_timeout, unpark, ThreadId};
-    } else {
-        mod pthread;
-        pub use pthread::Parker;
-    }
-}
diff --git a/library/std/src/sys/pal/unix/thread_parking/pthread.rs b/library/std/src/sys/pal/unix/thread_parking/pthread.rs
deleted file mode 100644
index 8e295453d76..00000000000
--- a/library/std/src/sys/pal/unix/thread_parking/pthread.rs
+++ /dev/null
@@ -1,269 +0,0 @@
-//! Thread parking without `futex` using the `pthread` synchronization primitives.
-
-use crate::cell::UnsafeCell;
-use crate::marker::PhantomPinned;
-use crate::pin::Pin;
-use crate::ptr::addr_of_mut;
-use crate::sync::atomic::AtomicUsize;
-use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
-#[cfg(not(target_os = "nto"))]
-use crate::sys::time::TIMESPEC_MAX;
-#[cfg(target_os = "nto")]
-use crate::sys::time::TIMESPEC_MAX_CAPPED;
-use crate::time::Duration;
-
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
-unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_mutex_lock(lock);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_mutex_unlock(lock);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
-    let r = libc::pthread_cond_signal(cond);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_cond_wait(cond, lock);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn wait_timeout(
-    cond: *mut libc::pthread_cond_t,
-    lock: *mut libc::pthread_mutex_t,
-    dur: Duration,
-) {
-    // Use the system clock on systems that do not support pthread_condattr_setclock.
-    // This unfortunately results in problems when the system time changes.
-    #[cfg(any(target_os = "espidf", target_os = "horizon", target_vendor = "apple"))]
-    let (now, dur) = {
-        use crate::cmp::min;
-        use crate::sys::time::SystemTime;
-
-        // OSX implementation of `pthread_cond_timedwait` is buggy
-        // with super long durations. When duration is greater than
-        // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
-        // in macOS Sierra return error 316.
-        //
-        // This program demonstrates the issue:
-        // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
-        //
-        // To work around this issue, and possible bugs of other OSes, timeout
-        // is clamped to 1000 years, which is allowable per the API of `park_timeout`
-        // because of spurious wakeups.
-        let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
-        let now = SystemTime::now().t;
-        (now, dur)
-    };
-    // Use the monotonic clock on other systems.
-    #[cfg(not(any(target_os = "espidf", target_os = "horizon", target_vendor = "apple")))]
-    let (now, dur) = {
-        use crate::sys::time::Timespec;
-
-        (Timespec::now(libc::CLOCK_MONOTONIC), dur)
-    };
-
-    #[cfg(not(target_os = "nto"))]
-    let timeout =
-        now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX);
-    #[cfg(target_os = "nto")]
-    let timeout = now
-        .checked_add_duration(&dur)
-        .and_then(|t| t.to_timespec_capped())
-        .unwrap_or(TIMESPEC_MAX_CAPPED);
-    let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
-    debug_assert!(r == libc::ETIMEDOUT || r == 0);
-}
-
-pub struct Parker {
-    state: AtomicUsize,
-    lock: UnsafeCell<libc::pthread_mutex_t>,
-    cvar: UnsafeCell<libc::pthread_cond_t>,
-    // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
-    _pinned: PhantomPinned,
-}
-
-impl Parker {
-    /// Construct the UNIX parker in-place.
-    ///
-    /// # Safety
-    /// The constructed parker must never be moved.
-    pub unsafe fn new_in_place(parker: *mut Parker) {
-        // Use the default mutex implementation to allow for simpler initialization.
-        // This could lead to undefined behaviour when deadlocking. This is avoided
-        // by not deadlocking. Note in particular the unlocking operation before any
-        // panic, as code after the panic could try to park again.
-        addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
-        addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
-
-        cfg_if::cfg_if! {
-            if #[cfg(any(
-                target_os = "l4re",
-                target_os = "android",
-                target_os = "redox",
-                target_os = "vita",
-                target_vendor = "apple",
-            ))] {
-                addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
-            } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] {
-                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
-                assert_eq!(r, 0);
-            } else {
-                use crate::mem::MaybeUninit;
-                let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
-                let r = libc::pthread_condattr_init(attr.as_mut_ptr());
-                assert_eq!(r, 0);
-                let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
-                assert_eq!(r, 0);
-                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
-                assert_eq!(r, 0);
-                let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
-                assert_eq!(r, 0);
-            }
-        }
-    }
-
-    // This implementation doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    //
-    // For memory ordering, see std/src/sys_common/thread_parking/futex.rs
-    pub unsafe fn park(self: Pin<&Self>) {
-        // If we were previously notified then we consume this notification and
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed).is_ok() {
-            return;
-        }
-
-        // Otherwise we need to coordinate going to sleep
-        lock(self.lock.get());
-        match self.state.compare_exchange(EMPTY, PARKED, Relaxed, Relaxed) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read here, even though we know it will be `NOTIFIED`.
-                // This is because `unpark` may have been called again since we read
-                // `NOTIFIED` in the `compare_exchange` above. We must perform an
-                // acquire operation that synchronizes with that `unpark` to observe
-                // any writes it made before the call to unpark. To do that we must
-                // read from the write it made to `state`.
-                let old = self.state.swap(EMPTY, Acquire);
-
-                unlock(self.lock.get());
-
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => {
-                unlock(self.lock.get());
-
-                panic!("inconsistent park state")
-            }
-        }
-
-        loop {
-            wait(self.cvar.get(), self.lock.get());
-
-            match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) {
-                Ok(_) => break, // got a notification
-                Err(_) => {}    // spurious wakeup, go back to sleep
-            }
-        }
-
-        unlock(self.lock.get());
-    }
-
-    // This implementation doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker. Use
-    // `Pin` to guarantee a stable address for the mutex and condition variable.
-    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
-        // Like `park` above we have a fast path for an already-notified thread, and
-        // afterwards we start coordinating for a sleep.
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed).is_ok() {
-            return;
-        }
-
-        lock(self.lock.get());
-        match self.state.compare_exchange(EMPTY, PARKED, Relaxed, Relaxed) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read again here, see `park`.
-                let old = self.state.swap(EMPTY, Acquire);
-                unlock(self.lock.get());
-
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => {
-                unlock(self.lock.get());
-                panic!("inconsistent park_timeout state")
-            }
-        }
-
-        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
-        // from a notification we just want to unconditionally set the state back to
-        // empty, either consuming a notification or un-flagging ourselves as
-        // parked.
-        wait_timeout(self.cvar.get(), self.lock.get(), dur);
-
-        match self.state.swap(EMPTY, Acquire) {
-            NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
-            PARKED => unlock(self.lock.get()),   // no notification, alas
-            n => {
-                unlock(self.lock.get());
-                panic!("inconsistent park_timeout state: {n}")
-            }
-        }
-    }
-
-    pub fn unpark(self: Pin<&Self>) {
-        // To ensure the unparked thread will observe any writes we made
-        // before this call, we must perform a release operation that `park`
-        // can synchronize with. To do that we must write `NOTIFIED` even if
-        // `state` is already `NOTIFIED`. That is why this must be a swap
-        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
-        // on failure.
-        match self.state.swap(NOTIFIED, Release) {
-            EMPTY => return,    // no one was waiting
-            NOTIFIED => return, // already unparked
-            PARKED => {}        // gotta go wake someone up
-            _ => panic!("inconsistent state in unpark"),
-        }
-
-        // There is a period between when the parked thread sets `state` to
-        // `PARKED` (or last checked `state` in the case of a spurious wake
-        // up) and when it actually waits on `cvar`. If we were to notify
-        // during this period it would be ignored and then when the parked
-        // thread went to sleep it would never wake up. Fortunately, it has
-        // `lock` locked at this stage so we can acquire `lock` to wait until
-        // it is ready to receive the notification.
-        //
-        // Releasing `lock` before the call to `notify_one` means that when the
-        // parked thread wakes it doesn't get woken only to have to wait for us
-        // to release `lock`.
-        unsafe {
-            lock(self.lock.get());
-            unlock(self.lock.get());
-            notify_one(self.cvar.get());
-        }
-    }
-}
-
-impl Drop for Parker {
-    fn drop(&mut self) {
-        unsafe {
-            libc::pthread_cond_destroy(self.cvar.get_mut());
-            libc::pthread_mutex_destroy(self.lock.get_mut());
-        }
-    }
-}
-
-unsafe impl Sync for Parker {}
-unsafe impl Send for Parker {}