about summary refs log tree commit diff
path: root/library/std/src/sync/mpsc/blocking.rs
diff options
context:
space:
mode:
authormark <markm@cs.wisc.edu>2020-06-11 21:31:49 -0500
committermark <markm@cs.wisc.edu>2020-07-27 19:51:13 -0500
commit2c31b45ae878b821975c4ebd94cc1e49f6073fd0 (patch)
tree14f64e683e3f64dcbcfb8c2c7cb45ac7592e6e09 /library/std/src/sync/mpsc/blocking.rs
parent9be8ffcb0206fc1558069a7b4766090df7877659 (diff)
downloadrust-2c31b45ae878b821975c4ebd94cc1e49f6073fd0.tar.gz
rust-2c31b45ae878b821975c4ebd94cc1e49f6073fd0.zip
mv std libs to library/
Diffstat (limited to 'library/std/src/sync/mpsc/blocking.rs')
-rw-r--r--library/std/src/sync/mpsc/blocking.rs79
1 files changed, 79 insertions, 0 deletions
diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs
new file mode 100644
index 00000000000..d34de6a4fac
--- /dev/null
+++ b/library/std/src/sync/mpsc/blocking.rs
@@ -0,0 +1,79 @@
+//! Generic support for building blocking abstractions.
+
+use crate::mem;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Arc;
+use crate::thread::{self, Thread};
+use crate::time::Instant;
+
+struct Inner {
+    thread: Thread,
+    woken: AtomicBool,
+}
+
+unsafe impl Send for Inner {}
+unsafe impl Sync for Inner {}
+
+#[derive(Clone)]
+pub struct SignalToken {
+    inner: Arc<Inner>,
+}
+
+pub struct WaitToken {
+    inner: Arc<Inner>,
+}
+
+impl !Send for WaitToken {}
+
+impl !Sync for WaitToken {}
+
+pub fn tokens() -> (WaitToken, SignalToken) {
+    let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) });
+    let wait_token = WaitToken { inner: inner.clone() };
+    let signal_token = SignalToken { inner };
+    (wait_token, signal_token)
+}
+
+impl SignalToken {
+    pub fn signal(&self) -> bool {
+        let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
+        if wake {
+            self.inner.thread.unpark();
+        }
+        wake
+    }
+
+    /// Converts to an unsafe usize value. Useful for storing in a pipe's state
+    /// flag.
+    #[inline]
+    pub unsafe fn cast_to_usize(self) -> usize {
+        mem::transmute(self.inner)
+    }
+
+    /// Converts from an unsafe usize value. Useful for retrieving a pipe's state
+    /// flag.
+    #[inline]
+    pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
+        SignalToken { inner: mem::transmute(signal_ptr) }
+    }
+}
+
+impl WaitToken {
+    pub fn wait(self) {
+        while !self.inner.woken.load(Ordering::SeqCst) {
+            thread::park()
+        }
+    }
+
+    /// Returns `true` if we wake up normally.
+    pub fn wait_max_until(self, end: Instant) -> bool {
+        while !self.inner.woken.load(Ordering::SeqCst) {
+            let now = Instant::now();
+            if now >= end {
+                return false;
+            }
+            thread::park_timeout(end - now)
+        }
+        true
+    }
+}