about summary refs log tree commit diff
path: root/library/std/src
diff options
context:
space:
mode:
authorMara Bos <m-ou.se@m-ou.se>2020-09-19 14:54:21 +0200
committerMara Bos <m-ou.se@m-ou.se>2020-09-27 11:56:42 +0200
commitec13df4ec4780b679dde227db2185a82cf31c93b (patch)
treed979638c4aea0d94a7fd817af7496d2ba1f3d220 /library/std/src
parent1464fc3a0cb2b8a92c54357fcff7c632b334cb29 (diff)
downloadrust-ec13df4ec4780b679dde227db2185a82cf31c93b.tar.gz
rust-ec13df4ec4780b679dde227db2185a82cf31c93b.zip
Add fast futex-based thread parker for Linux.
Diffstat (limited to 'library/std/src')
-rw-r--r--library/std/src/thread/parker/generic.rs119
-rw-r--r--library/std/src/thread/parker/linux.rs105
-rw-r--r--library/std/src/thread/parker/mod.rs130
3 files changed, 231 insertions, 123 deletions
diff --git a/library/std/src/thread/parker/generic.rs b/library/std/src/thread/parker/generic.rs
new file mode 100644
index 00000000000..14cfa958e5e
--- /dev/null
+++ b/library/std/src/thread/parker/generic.rs
@@ -0,0 +1,119 @@
+//! Parker implementaiton based on a Mutex and Condvar.
+
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::sync::{Condvar, Mutex};
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+pub struct Parker {
+    state: AtomicUsize,
+    lock: Mutex<()>,
+    cvar: Condvar,
+}
+
+impl Parker {
+    pub fn new() -> Self {
+        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park(&self) {
+        // If we were previously notified then we consume this notification and
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+
+        // Otherwise we need to coordinate going to sleep
+        let mut m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read here, even though we know it will be `NOTIFIED`.
+                // This is because `unpark` may have been called again since we read
+                // `NOTIFIED` in the `compare_exchange` above. We must perform an
+                // acquire operation that synchronizes with that `unpark` to observe
+                // any writes it made before the call to unpark. To do that we must
+                // read from the write it made to `state`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park state"),
+        }
+        loop {
+            m = self.cvar.wait(m).unwrap();
+            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+                Ok(_) => return, // got a notification
+                Err(_) => {}     // spurious wakeup, go back to sleep
+            }
+        }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park_timeout(&self, dur: Duration) {
+        // Like `park` above we have a fast path for an already-notified thread, and
+        // afterwards we start coordinating for a sleep.
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+        let m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read again here, see `park`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park_timeout state"),
+        }
+
+        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+        // from a notification we just want to unconditionally set the state back to
+        // empty, either consuming a notification or un-flagging ourselves as
+        // parked.
+        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
+        match self.state.swap(EMPTY, SeqCst) {
+            NOTIFIED => {} // got a notification, hurray!
+            PARKED => {}   // no notification, alas
+            n => panic!("inconsistent park_timeout state: {}", n),
+        }
+    }
+
+    pub fn unpark(&self) {
+        // To ensure the unparked thread will observe any writes we made
+        // before this call, we must perform a release operation that `park`
+        // can synchronize with. To do that we must write `NOTIFIED` even if
+        // `state` is already `NOTIFIED`. That is why this must be a swap
+        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+        // on failure.
+        match self.state.swap(NOTIFIED, SeqCst) {
+            EMPTY => return,    // no one was waiting
+            NOTIFIED => return, // already unparked
+            PARKED => {}        // gotta go wake someone up
+            _ => panic!("inconsistent state in unpark"),
+        }
+
+        // There is a period between when the parked thread sets `state` to
+        // `PARKED` (or last checked `state` in the case of a spurious wake
+        // up) and when it actually waits on `cvar`. If we were to notify
+        // during this period it would be ignored and then when the parked
+        // thread went to sleep it would never wake up. Fortunately, it has
+        // `lock` locked at this stage so we can acquire `lock` to wait until
+        // it is ready to receive the notification.
+        //
+        // Releasing `lock` before the call to `notify_one` means that when the
+        // parked thread wakes it doesn't get woken only to have to wait for us
+        // to release `lock`.
+        drop(self.lock.lock().unwrap());
+        self.cvar.notify_one()
+    }
+}
diff --git a/library/std/src/thread/parker/linux.rs b/library/std/src/thread/parker/linux.rs
new file mode 100644
index 00000000000..05142b88c73
--- /dev/null
+++ b/library/std/src/thread/parker/linux.rs
@@ -0,0 +1,105 @@
+use crate::sync::atomic::AtomicI32;
+use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+use crate::time::Duration;
+
+const PARKED: i32 = -1;
+const EMPTY: i32 = 0;
+const NOTIFIED: i32 = 1;
+
+pub struct Parker {
+    state: AtomicI32,
+}
+
+impl Parker {
+    #[inline]
+    pub const fn new() -> Self {
+        Parker { state: AtomicI32::new(EMPTY) }
+    }
+
+    // Assumes this is only called by the thread that owns the Parker,
+    // which means that `self.state != PARKED`.
+    pub unsafe fn park(&self) {
+        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+        // first case.
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+        loop {
+            // Wait for something to happen, assuming it's still set to PARKED.
+            futex_wait(&self.state, PARKED, None);
+            // Change NOTIFIED=>EMPTY and return in that case.
+            if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
+                return;
+            } else {
+                // Spurious wake up. We loop to try again.
+            }
+        }
+    }
+
+    // Assumes this is only called by the thread that owns the Parker,
+    // which means that `self.state != PARKED`.
+    pub unsafe fn park_timeout(&self, timeout: Duration) {
+        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+        // first case.
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+        // Wait for something to happen, assuming it's still set to PARKED.
+        futex_wait(&self.state, PARKED, Some(timeout));
+        // This is not just a store, because we need to establish a
+        // release-acquire ordering with unpark().
+        if self.state.swap(EMPTY, Acquire) == NOTIFIED {
+            // Woke up because of unpark().
+        } else {
+            // Timeout or spurious wake up.
+            // We return either way, because we can't easily tell if it was the
+            // timeout or not.
+        }
+    }
+
+    pub fn unpark(&self) {
+        // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
+        // wake the thread in the first case.
+        //
+        // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
+        // purpose, to make sure every unpark() has a release-acquire ordering
+        // with park().
+        if self.state.swap(NOTIFIED, Release) == PARKED {
+            futex_wake(&self.state);
+        }
+    }
+}
+
+fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
+    let timespec;
+    let timespec_ptr = match timeout {
+        Some(timeout) => {
+            timespec = libc::timespec {
+                tv_sec: timeout.as_secs() as _,
+                tv_nsec: timeout.subsec_nanos() as _,
+            };
+            &timespec as *const libc::timespec
+        }
+        None => crate::ptr::null(),
+    };
+    unsafe {
+        libc::syscall(
+            libc::SYS_futex,
+            futex as *const AtomicI32,
+            libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG,
+            expected,
+            timespec_ptr,
+        );
+    }
+}
+
+fn futex_wake(futex: &AtomicI32) {
+    unsafe {
+        libc::syscall(
+            libc::SYS_futex,
+            futex as *const AtomicI32,
+            libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
+            1,
+        );
+    }
+}
diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs
index c8b9b7b1c79..4dc5e1aa271 100644
--- a/library/std/src/thread/parker/mod.rs
+++ b/library/std/src/thread/parker/mod.rs
@@ -1,125 +1,9 @@
-//! Parker implementaiton based on a Mutex and Condvar.
-//!
-//! The implementation currently uses the trivial strategy of a Mutex+Condvar
-//! with wakeup flag, which does not actually allow spurious wakeups. In the
-//! future, this will be implemented in a more efficient way, perhaps along the lines of
-//!   http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
-//! or futuxes, and in either case may allow spurious wakeups.
-
-use crate::sync::atomic::AtomicUsize;
-use crate::sync::atomic::Ordering::SeqCst;
-use crate::sync::{Condvar, Mutex};
-use crate::time::Duration;
-
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
-pub struct Parker {
-    state: AtomicUsize,
-    lock: Mutex<()>,
-    cvar: Condvar,
-}
-
-impl Parker {
-    pub fn new() -> Self {
-        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
-    }
-
-    // This implementaiton doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    pub unsafe fn park(&self) {
-        // If we were previously notified then we consume this notification and
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-
-        // Otherwise we need to coordinate going to sleep
-        let mut m = self.lock.lock().unwrap();
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read here, even though we know it will be `NOTIFIED`.
-                // This is because `unpark` may have been called again since we read
-                // `NOTIFIED` in the `compare_exchange` above. We must perform an
-                // acquire operation that synchronizes with that `unpark` to observe
-                // any writes it made before the call to unpark. To do that we must
-                // read from the write it made to `state`.
-                let old = self.state.swap(EMPTY, SeqCst);
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => panic!("inconsistent park state"),
-        }
-        loop {
-            m = self.cvar.wait(m).unwrap();
-            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
-                Ok(_) => return, // got a notification
-                Err(_) => {}     // spurious wakeup, go back to sleep
-            }
-        }
-    }
-
-    // This implementaiton doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    pub unsafe fn park_timeout(&self, dur: Duration) {
-        // Like `park` above we have a fast path for an already-notified thread, and
-        // afterwards we start coordinating for a sleep.
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-        let m = self.lock.lock().unwrap();
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read again here, see `park`.
-                let old = self.state.swap(EMPTY, SeqCst);
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => panic!("inconsistent park_timeout state"),
-        }
-
-        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
-        // from a notification we just want to unconditionally set the state back to
-        // empty, either consuming a notification or un-flagging ourselves as
-        // parked.
-        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
-        match self.state.swap(EMPTY, SeqCst) {
-            NOTIFIED => {} // got a notification, hurray!
-            PARKED => {}   // no notification, alas
-            n => panic!("inconsistent park_timeout state: {}", n),
-        }
-    }
-
-    pub fn unpark(&self) {
-        // To ensure the unparked thread will observe any writes we made
-        // before this call, we must perform a release operation that `park`
-        // can synchronize with. To do that we must write `NOTIFIED` even if
-        // `state` is already `NOTIFIED`. That is why this must be a swap
-        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
-        // on failure.
-        match self.state.swap(NOTIFIED, SeqCst) {
-            EMPTY => return,    // no one was waiting
-            NOTIFIED => return, // already unparked
-            PARKED => {}        // gotta go wake someone up
-            _ => panic!("inconsistent state in unpark"),
-        }
-
-        // There is a period between when the parked thread sets `state` to
-        // `PARKED` (or last checked `state` in the case of a spurious wake
-        // up) and when it actually waits on `cvar`. If we were to notify
-        // during this period it would be ignored and then when the parked
-        // thread went to sleep it would never wake up. Fortunately, it has
-        // `lock` locked at this stage so we can acquire `lock` to wait until
-        // it is ready to receive the notification.
-        //
-        // Releasing `lock` before the call to `notify_one` means that when the
-        // parked thread wakes it doesn't get woken only to have to wait for us
-        // to release `lock`.
-        drop(self.lock.lock().unwrap());
-        self.cvar.notify_one()
+cfg_if::cfg_if! {
+    if #[cfg(any(target_os = "linux", target_os = "android"))] {
+        mod linux;
+        pub use linux::Parker;
+    } else {
+        mod generic;
+        pub use generic::Parker;
     }
 }