about summary refs log tree commit diff
path: root/library/std/src/sys_common
diff options
context:
space:
mode:
authorjoboet <jonasboettiger@icloud.com>2022-12-19 15:59:00 +0100
committerjoboet <jonasboettiger@icloud.com>2022-12-29 17:45:07 +0100
commita9e5c1a309df80434ebc4c1f6bfaa5cb119b465d (patch)
treee1022d1e13d77934c51552358e520a362c1380ef /library/std/src/sys_common
parent0c0b403f19fc6febcd1e36a83fc307ecc11de943 (diff)
downloadrust-a9e5c1a309df80434ebc4c1f6bfaa5cb119b465d.tar.gz
rust-a9e5c1a309df80434ebc4c1f6bfaa5cb119b465d.zip
std: unify id-based thread parking implementations
Diffstat (limited to 'library/std/src/sys_common')
-rw-r--r--library/std/src/sys_common/mod.rs2
-rw-r--r--library/std/src/sys_common/thread_parking/futex.rs (renamed from library/std/src/sys_common/thread_parker/futex.rs)0
-rw-r--r--library/std/src/sys_common/thread_parking/generic.rs (renamed from library/std/src/sys_common/thread_parker/generic.rs)0
-rw-r--r--library/std/src/sys_common/thread_parking/id.rs104
-rw-r--r--library/std/src/sys_common/thread_parking/mod.rs (renamed from library/std/src/sys_common/thread_parker/mod.rs)10
-rw-r--r--library/std/src/sys_common/thread_parking/wait_flag.rs (renamed from library/std/src/sys_common/thread_parker/wait_flag.rs)0
6 files changed, 112 insertions, 4 deletions
diff --git a/library/std/src/sys_common/mod.rs b/library/std/src/sys_common/mod.rs
index 73da1ce066c..6b24b0e9aa8 100644
--- a/library/std/src/sys_common/mod.rs
+++ b/library/std/src/sys_common/mod.rs
@@ -30,7 +30,7 @@ pub mod process;
 pub mod thread;
 pub mod thread_info;
 pub mod thread_local_dtor;
-pub mod thread_parker;
+pub mod thread_parking;
 pub mod wstr;
 pub mod wtf8;
 
diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parking/futex.rs
index d9e2f39e345..d9e2f39e345 100644
--- a/library/std/src/sys_common/thread_parker/futex.rs
+++ b/library/std/src/sys_common/thread_parking/futex.rs
diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parking/generic.rs
index f3d8b34d3fd..f3d8b34d3fd 100644
--- a/library/std/src/sys_common/thread_parker/generic.rs
+++ b/library/std/src/sys_common/thread_parking/generic.rs
diff --git a/library/std/src/sys_common/thread_parking/id.rs b/library/std/src/sys_common/thread_parking/id.rs
new file mode 100644
index 00000000000..9525340b75f
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/id.rs
@@ -0,0 +1,104 @@
+//! Thread parking using thread ids.
+//!
+//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
+//! match those offered by `thread::park`, with the difference that the thread to
+//! be unparked is referenced by a platform-specific thread id. Since the thread
+//! parker is constructed before that id is known, an atomic state variable is used
+//! to manage the park state and propagate the thread id. This also avoids platform
+//! calls in the case where `unpark` is called before `park`.
+
+use crate::cell::UnsafeCell;
+use crate::pin::Pin;
+use crate::sync::atomic::{
+    fence, AtomicI8,
+    Ordering::{Acquire, Relaxed, Release},
+};
+use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
+use crate::time::Duration;
+
+pub struct Parker {
+    state: AtomicI8,
+    tid: UnsafeCell<Option<ThreadId>>,
+}
+
+const PARKED: i8 = -1;
+const EMPTY: i8 = 0;
+const NOTIFIED: i8 = 1;
+
+impl Parker {
+    /// Create a new thread parker. UNIX requires this to happen in-place.
+    pub unsafe fn new(parker: *mut Parker) {
+        parker.write(Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) })
+    }
+
+    /// # Safety
+    /// * must always be called from the same thread
+    /// * must be called before the state is set to PARKED
+    unsafe fn init_tid(&self) {
+        // The field is only ever written to from this thread, so we don't need
+        // synchronization to read it here.
+        if self.tid.get().read().is_none() {
+            // Because this point is only reached once, before the state is set
+            // to PARKED for the first time, the non-atomic write here can not
+            // conflict with reads by other threads.
+            self.tid.get().write(Some(current()));
+            // Ensure that the write can be observed by all threads reading the
+            // state. Synchronizes with the acquire barrier in `unpark`.
+            fence(Release);
+        }
+    }
+
+    pub unsafe fn park(self: Pin<&Self>) {
+        self.init_tid();
+
+        // Changes NOTIFIED to EMPTY and EMPTY to PARKED.
+        let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
+        if state == PARKED {
+            // Loop to guard against spurious wakeups.
+            while state == PARKED {
+                park();
+                state = self.state.load(Acquire);
+            }
+
+            // Since the state change has already been observed with acquire
+            // ordering, the state can be reset with a relaxed store instead
+            // of a swap.
+            self.state.store(EMPTY, Relaxed);
+        }
+    }
+
+    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+        self.init_tid();
+
+        let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
+        if state == PARKED {
+            park_timeout(dur);
+            // Swap to ensure that we observe all state changes with acquire
+            // ordering, even if the state has been changed after the timeout
+            // occured.
+            self.state.swap(EMPTY, Acquire);
+        }
+    }
+
+    pub fn unpark(self: Pin<&Self>) {
+        let state = self.state.swap(NOTIFIED, Release);
+        if state == PARKED {
+            // Synchronize with the release fence in `init_tid` to observe the
+            // write to `tid`.
+            fence(Acquire);
+            // # Safety
+            // The thread id is initialized before the state is set to `PARKED`
+            // for the first time and is not written to from that point on
+            // (negating the need for an atomic read).
+            let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
+            // It is possible that the waiting thread woke up because of a timeout
+            // and terminated before this call is made. This call then returns an
+            // error or wakes up an unrelated thread. The platform API and
+            // environment does allow this, however.
+            unpark(tid);
+        }
+    }
+}
+
+unsafe impl Send for Parker {}
+unsafe impl Sync for Parker {}
diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parking/mod.rs
index 08a2bdd8229..0ead6633c35 100644
--- a/library/std/src/sys_common/thread_parker/mod.rs
+++ b/library/std/src/sys_common/thread_parking/mod.rs
@@ -11,13 +11,17 @@ cfg_if::cfg_if! {
     ))] {
         mod futex;
         pub use futex::Parker;
+    } else if #[cfg(any(
+        target_os = "netbsd",
+        all(target_vendor = "fortanix", target_env = "sgx"),
+    ))] {
+        mod id;
+        pub use id::Parker;
     } else if #[cfg(target_os = "solid_asp3")] {
         mod wait_flag;
         pub use wait_flag::Parker;
     } else if #[cfg(any(windows, target_family = "unix"))] {
-        pub use crate::sys::thread_parker::Parker;
-    } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
-        pub use crate::sys::thread_parker::Parker;
+        pub use crate::sys::thread_parking::Parker;
     } else {
         mod generic;
         pub use generic::Parker;
diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parking/wait_flag.rs
index 6561c186655..6561c186655 100644
--- a/library/std/src/sys_common/thread_parker/wait_flag.rs
+++ b/library/std/src/sys_common/thread_parking/wait_flag.rs