about summary refs log tree commit diff
path: root/library/std/src/thread
diff options
context:
space:
mode:
authorMara Bos <m-ou.se@m-ou.se>2020-09-19 12:42:14 +0200
committerMara Bos <m-ou.se@m-ou.se>2020-09-27 11:56:42 +0200
commit1464fc3a0cb2b8a92c54357fcff7c632b334cb29 (patch)
tree502085e007d9538db4fc9a1004813887d7a532e6 /library/std/src/thread
parentc9e5e6a53aef5cd1b939ecfa18f56bdf5bf0451c (diff)
downloadrust-1464fc3a0cb2b8a92c54357fcff7c632b334cb29.tar.gz
rust-1464fc3a0cb2b8a92c54357fcff7c632b334cb29.zip
Move thread parker to a separate module.
Diffstat (limited to 'library/std/src/thread')
-rw-r--r--library/std/src/thread/mod.rs129
-rw-r--r--library/std/src/thread/parker/mod.rs125
2 files changed, 142 insertions, 112 deletions
diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs
index 8c353e2484e..a5a8d5c9fbb 100644
--- a/library/std/src/thread/mod.rs
+++ b/library/std/src/thread/mod.rs
@@ -149,6 +149,8 @@
 #[cfg(all(test, not(target_os = "emscripten")))]
 mod tests;
 
+mod parker;
+
 use crate::any::Any;
 use crate::cell::UnsafeCell;
 use crate::ffi::{CStr, CString};
@@ -159,15 +161,14 @@ use crate::num::NonZeroU64;
 use crate::panic;
 use crate::panicking;
 use crate::str;
-use crate::sync::atomic::AtomicUsize;
-use crate::sync::atomic::Ordering::SeqCst;
-use crate::sync::{Arc, Condvar, Mutex};
+use crate::sync::Arc;
 use crate::sys::thread as imp;
 use crate::sys_common::mutex;
 use crate::sys_common::thread;
 use crate::sys_common::thread_info;
 use crate::sys_common::{AsInner, IntoInner};
 use crate::time::Duration;
+use parker::Parker;
 
 ////////////////////////////////////////////////////////////////////////////////
 // Thread-local storage
@@ -667,6 +668,8 @@ pub fn current() -> Thread {
 ///
 /// [`channel`]: crate::sync::mpsc
 /// [`join`]: JoinHandle::join
+/// [`Condvar`]: crate::sync::Condvar
+/// [`Mutex`]: crate::sync::Mutex
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn yield_now() {
     imp::Thread::yield_now()
@@ -712,6 +715,8 @@ pub fn yield_now() {
 ///     panic!()
 /// }
 /// ```
+///
+/// [Mutex]: crate::sync::Mutex
 #[inline]
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn panicking() -> bool {
@@ -779,11 +784,6 @@ pub fn sleep(dur: Duration) {
     imp::Thread::sleep(dur)
 }
 
-// constants for park/unpark
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
 /// Blocks unless or until the current thread's token is made available.
 ///
 /// A call to `park` does not guarantee that the thread will remain parked
@@ -870,45 +870,11 @@ const NOTIFIED: usize = 2;
 ///
 /// [`unpark`]: Thread::unpark
 /// [`thread::park_timeout`]: park_timeout
-//
-// The implementation currently uses the trivial strategy of a Mutex+Condvar
-// with wakeup flag, which does not actually allow spurious wakeups. In the
-// future, this will be implemented in a more efficient way, perhaps along the lines of
-//   http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
-// or futuxes, and in either case may allow spurious wakeups.
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn park() {
-    let thread = current();
-
-    // If we were previously notified then we consume this notification and
-    // return quickly.
-    if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-        return;
-    }
-
-    // Otherwise we need to coordinate going to sleep
-    let mut m = thread.inner.lock.lock().unwrap();
-    match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-        Ok(_) => {}
-        Err(NOTIFIED) => {
-            // We must read here, even though we know it will be `NOTIFIED`.
-            // This is because `unpark` may have been called again since we read
-            // `NOTIFIED` in the `compare_exchange` above. We must perform an
-            // acquire operation that synchronizes with that `unpark` to observe
-            // any writes it made before the call to unpark. To do that we must
-            // read from the write it made to `state`.
-            let old = thread.inner.state.swap(EMPTY, SeqCst);
-            assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-            return;
-        } // should consume this notification, so prohibit spurious wakeups in next park.
-        Err(_) => panic!("inconsistent park state"),
-    }
-    loop {
-        m = thread.inner.cvar.wait(m).unwrap();
-        match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
-            Ok(_) => return, // got a notification
-            Err(_) => {}     // spurious wakeup, go back to sleep
-        }
+    // SAFETY: park_timeout is called on the parker owned by this thread.
+    unsafe {
+        current().inner.parker.park();
     }
 }
 
@@ -970,35 +936,9 @@ pub fn park_timeout_ms(ms: u32) {
 /// ```
 #[stable(feature = "park_timeout", since = "1.4.0")]
 pub fn park_timeout(dur: Duration) {
-    let thread = current();
-
-    // Like `park` above we have a fast path for an already-notified thread, and
-    // afterwards we start coordinating for a sleep.
-    // return quickly.
-    if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-        return;
-    }
-    let m = thread.inner.lock.lock().unwrap();
-    match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-        Ok(_) => {}
-        Err(NOTIFIED) => {
-            // We must read again here, see `park`.
-            let old = thread.inner.state.swap(EMPTY, SeqCst);
-            assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-            return;
-        } // should consume this notification, so prohibit spurious wakeups in next park.
-        Err(_) => panic!("inconsistent park_timeout state"),
-    }
-
-    // Wait with a timeout, and if we spuriously wake up or otherwise wake up
-    // from a notification we just want to unconditionally set the state back to
-    // empty, either consuming a notification or un-flagging ourselves as
-    // parked.
-    let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap();
-    match thread.inner.state.swap(EMPTY, SeqCst) {
-        NOTIFIED => {} // got a notification, hurray!
-        PARKED => {}   // no notification, alas
-        n => panic!("inconsistent park_timeout state: {}", n),
+    // SAFETY: park_timeout is called on the parker owned by this thread.
+    unsafe {
+        current().inner.parker.park_timeout(dur);
     }
 }
 
@@ -1077,11 +1017,7 @@ impl ThreadId {
 struct Inner {
     name: Option<CString>, // Guaranteed to be UTF-8
     id: ThreadId,
-
-    // state for thread park/unpark
-    state: AtomicUsize,
-    lock: Mutex<()>,
-    cvar: Condvar,
+    parker: Parker,
 }
 
 #[derive(Clone)]
@@ -1115,13 +1051,7 @@ impl Thread {
         let cname =
             name.map(|n| CString::new(n).expect("thread name may not contain interior null bytes"));
         Thread {
-            inner: Arc::new(Inner {
-                name: cname,
-                id: ThreadId::new(),
-                state: AtomicUsize::new(EMPTY),
-                lock: Mutex::new(()),
-                cvar: Condvar::new(),
-            }),
+            inner: Arc::new(Inner { name: cname, id: ThreadId::new(), parker: Parker::new() }),
         }
     }
 
@@ -1157,32 +1087,7 @@ impl Thread {
     /// ```
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn unpark(&self) {
-        // To ensure the unparked thread will observe any writes we made
-        // before this call, we must perform a release operation that `park`
-        // can synchronize with. To do that we must write `NOTIFIED` even if
-        // `state` is already `NOTIFIED`. That is why this must be a swap
-        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
-        // on failure.
-        match self.inner.state.swap(NOTIFIED, SeqCst) {
-            EMPTY => return,    // no one was waiting
-            NOTIFIED => return, // already unparked
-            PARKED => {}        // gotta go wake someone up
-            _ => panic!("inconsistent state in unpark"),
-        }
-
-        // There is a period between when the parked thread sets `state` to
-        // `PARKED` (or last checked `state` in the case of a spurious wake
-        // up) and when it actually waits on `cvar`. If we were to notify
-        // during this period it would be ignored and then when the parked
-        // thread went to sleep it would never wake up. Fortunately, it has
-        // `lock` locked at this stage so we can acquire `lock` to wait until
-        // it is ready to receive the notification.
-        //
-        // Releasing `lock` before the call to `notify_one` means that when the
-        // parked thread wakes it doesn't get woken only to have to wait for us
-        // to release `lock`.
-        drop(self.inner.lock.lock().unwrap());
-        self.inner.cvar.notify_one()
+        self.inner.parker.unpark();
     }
 
     /// Gets the thread's unique identifier.
diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs
new file mode 100644
index 00000000000..c8b9b7b1c79
--- /dev/null
+++ b/library/std/src/thread/parker/mod.rs
@@ -0,0 +1,125 @@
+//! Parker implementaiton based on a Mutex and Condvar.
+//!
+//! The implementation currently uses the trivial strategy of a Mutex+Condvar
+//! with wakeup flag, which does not actually allow spurious wakeups. In the
+//! future, this will be implemented in a more efficient way, perhaps along the lines of
+//!   http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
+//! or futuxes, and in either case may allow spurious wakeups.
+
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::sync::{Condvar, Mutex};
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+pub struct Parker {
+    state: AtomicUsize,
+    lock: Mutex<()>,
+    cvar: Condvar,
+}
+
+impl Parker {
+    pub fn new() -> Self {
+        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park(&self) {
+        // If we were previously notified then we consume this notification and
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+
+        // Otherwise we need to coordinate going to sleep
+        let mut m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read here, even though we know it will be `NOTIFIED`.
+                // This is because `unpark` may have been called again since we read
+                // `NOTIFIED` in the `compare_exchange` above. We must perform an
+                // acquire operation that synchronizes with that `unpark` to observe
+                // any writes it made before the call to unpark. To do that we must
+                // read from the write it made to `state`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park state"),
+        }
+        loop {
+            m = self.cvar.wait(m).unwrap();
+            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+                Ok(_) => return, // got a notification
+                Err(_) => {}     // spurious wakeup, go back to sleep
+            }
+        }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park_timeout(&self, dur: Duration) {
+        // Like `park` above we have a fast path for an already-notified thread, and
+        // afterwards we start coordinating for a sleep.
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+        let m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read again here, see `park`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park_timeout state"),
+        }
+
+        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+        // from a notification we just want to unconditionally set the state back to
+        // empty, either consuming a notification or un-flagging ourselves as
+        // parked.
+        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
+        match self.state.swap(EMPTY, SeqCst) {
+            NOTIFIED => {} // got a notification, hurray!
+            PARKED => {}   // no notification, alas
+            n => panic!("inconsistent park_timeout state: {}", n),
+        }
+    }
+
+    pub fn unpark(&self) {
+        // To ensure the unparked thread will observe any writes we made
+        // before this call, we must perform a release operation that `park`
+        // can synchronize with. To do that we must write `NOTIFIED` even if
+        // `state` is already `NOTIFIED`. That is why this must be a swap
+        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+        // on failure.
+        match self.state.swap(NOTIFIED, SeqCst) {
+            EMPTY => return,    // no one was waiting
+            NOTIFIED => return, // already unparked
+            PARKED => {}        // gotta go wake someone up
+            _ => panic!("inconsistent state in unpark"),
+        }
+
+        // There is a period between when the parked thread sets `state` to
+        // `PARKED` (or last checked `state` in the case of a spurious wake
+        // up) and when it actually waits on `cvar`. If we were to notify
+        // during this period it would be ignored and then when the parked
+        // thread went to sleep it would never wake up. Fortunately, it has
+        // `lock` locked at this stage so we can acquire `lock` to wait until
+        // it is ready to receive the notification.
+        //
+        // Releasing `lock` before the call to `notify_one` means that when the
+        // parked thread wakes it doesn't get woken only to have to wait for us
+        // to release `lock`.
+        drop(self.lock.lock().unwrap());
+        self.cvar.notify_one()
+    }
+}