about summary refs log tree commit diff
path: root/library
diff options
context:
space:
mode:
Diffstat (limited to 'library')
-rw-r--r--library/std/src/sync/mpmc/array.rs14
-rw-r--r--library/std/src/sync/mpmc/list.rs16
-rw-r--r--library/std/src/sync/mpmc/mod.rs2
-rw-r--r--library/std/src/sync/mpmc/utils.rs31
-rw-r--r--library/std/src/sync/mpmc/zero.rs2
-rw-r--r--library/std/src/sync/mpsc/mod.rs9
-rw-r--r--library/std/src/sync/mpsc/sync_tests.rs8
7 files changed, 49 insertions, 33 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
index f71edc6c525..c1e3e48b044 100644
--- a/library/std/src/sync/mpmc/array.rs
+++ b/library/std/src/sync/mpmc/array.rs
@@ -168,7 +168,7 @@ impl<T> Channel<T> {
                         return true;
                     }
                     Err(_) => {
-                        backoff.spin();
+                        backoff.spin_light();
                         tail = self.tail.load(Ordering::Relaxed);
                     }
                 }
@@ -182,11 +182,11 @@ impl<T> Channel<T> {
                     return false;
                 }
 
-                backoff.spin();
+                backoff.spin_light();
                 tail = self.tail.load(Ordering::Relaxed);
             } else {
                 // Snooze because we need to wait for the stamp to get updated.
-                backoff.snooze();
+                backoff.spin_heavy();
                 tail = self.tail.load(Ordering::Relaxed);
             }
         }
@@ -251,7 +251,7 @@ impl<T> Channel<T> {
                         return true;
                     }
                     Err(_) => {
-                        backoff.spin();
+                        backoff.spin_light();
                         head = self.head.load(Ordering::Relaxed);
                     }
                 }
@@ -273,11 +273,11 @@ impl<T> Channel<T> {
                     }
                 }
 
-                backoff.spin();
+                backoff.spin_light();
                 head = self.head.load(Ordering::Relaxed);
             } else {
                 // Snooze because we need to wait for the stamp to get updated.
-                backoff.snooze();
+                backoff.spin_heavy();
                 head = self.head.load(Ordering::Relaxed);
             }
         }
@@ -330,7 +330,7 @@ impl<T> Channel<T> {
                 if backoff.is_completed() {
                     break;
                 } else {
-                    backoff.spin();
+                    backoff.spin_light();
                 }
             }
 
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
index 2d5b2fb3b23..ec6c0726ac7 100644
--- a/library/std/src/sync/mpmc/list.rs
+++ b/library/std/src/sync/mpmc/list.rs
@@ -46,7 +46,7 @@ impl<T> Slot<T> {
     fn wait_write(&self) {
         let backoff = Backoff::new();
         while self.state.load(Ordering::Acquire) & WRITE == 0 {
-            backoff.snooze();
+            backoff.spin_heavy();
         }
     }
 }
@@ -82,7 +82,7 @@ impl<T> Block<T> {
             if !next.is_null() {
                 return next;
             }
-            backoff.snooze();
+            backoff.spin_heavy();
         }
     }
 
@@ -191,7 +191,7 @@ impl<T> Channel<T> {
 
             // If we reached the end of the block, wait until the next one is installed.
             if offset == BLOCK_CAP {
-                backoff.snooze();
+                backoff.spin_heavy();
                 tail = self.tail.index.load(Ordering::Acquire);
                 block = self.tail.block.load(Ordering::Acquire);
                 continue;
@@ -247,7 +247,7 @@ impl<T> Channel<T> {
                     return true;
                 },
                 Err(_) => {
-                    backoff.spin();
+                    backoff.spin_light();
                     tail = self.tail.index.load(Ordering::Acquire);
                     block = self.tail.block.load(Ordering::Acquire);
                 }
@@ -286,7 +286,7 @@ impl<T> Channel<T> {
 
             // If we reached the end of the block, wait until the next one is installed.
             if offset == BLOCK_CAP {
-                backoff.snooze();
+                backoff.spin_heavy();
                 head = self.head.index.load(Ordering::Acquire);
                 block = self.head.block.load(Ordering::Acquire);
                 continue;
@@ -320,7 +320,7 @@ impl<T> Channel<T> {
             // The block can be null here only if the first message is being sent into the channel.
             // In that case, just wait until it gets initialized.
             if block.is_null() {
-                backoff.snooze();
+                backoff.spin_heavy();
                 head = self.head.index.load(Ordering::Acquire);
                 block = self.head.block.load(Ordering::Acquire);
                 continue;
@@ -351,7 +351,7 @@ impl<T> Channel<T> {
                     return true;
                 },
                 Err(_) => {
-                    backoff.spin();
+                    backoff.spin_light();
                     head = self.head.index.load(Ordering::Acquire);
                     block = self.head.block.load(Ordering::Acquire);
                 }
@@ -542,7 +542,7 @@ impl<T> Channel<T> {
             // New updates to tail will be rejected by MARK_BIT and aborted unless it's
             // at boundary. We need to wait for the updates take affect otherwise there
             // can be memory leaks.
-            backoff.snooze();
+            backoff.spin_heavy();
             tail = self.tail.index.load(Ordering::Acquire);
         }
 
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
index cef99c58843..7a602cecd3b 100644
--- a/library/std/src/sync/mpmc/mod.rs
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -43,7 +43,7 @@ mod zero;
 use crate::fmt;
 use crate::panic::{RefUnwindSafe, UnwindSafe};
 use crate::time::{Duration, Instant};
-use error::*;
+pub use error::*;
 
 /// Creates a channel of unbounded capacity.
 ///
diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs
index e030c55ce8f..cfe42750d52 100644
--- a/library/std/src/sync/mpmc/utils.rs
+++ b/library/std/src/sync/mpmc/utils.rs
@@ -91,9 +91,8 @@ impl<T> DerefMut for CachePadded<T> {
 }
 
 const SPIN_LIMIT: u32 = 6;
-const YIELD_LIMIT: u32 = 10;
 
-/// Performs exponential backoff in spin loops.
+/// Performs quadratic backoff in spin loops.
 pub struct Backoff {
     step: Cell<u32>,
 }
@@ -104,25 +103,27 @@ impl Backoff {
         Backoff { step: Cell::new(0) }
     }
 
-    /// Backs off in a lock-free loop.
+    /// Backs off using lightweight spinning.
     ///
-    /// This method should be used when we need to retry an operation because another thread made
-    /// progress.
+    /// This method should be used for:
+    ///     - Retrying an operation because another thread made progress. i.e. on CAS failure.
+    ///     - Waiting for an operation to complete by spinning optimistically for a few iterations
+    ///     before falling back to parking the thread (see `Backoff::is_completed`).
     #[inline]
-    pub fn spin(&self) {
+    pub fn spin_light(&self) {
         let step = self.step.get().min(SPIN_LIMIT);
         for _ in 0..step.pow(2) {
             crate::hint::spin_loop();
         }
 
-        if self.step.get() <= SPIN_LIMIT {
-            self.step.set(self.step.get() + 1);
-        }
+        self.step.set(self.step.get() + 1);
     }
 
-    /// Backs off in a blocking loop.
+    /// Backs off using heavyweight spinning.
+    ///
+    /// This method should be used in blocking loops where parking the thread is not an option.
     #[inline]
-    pub fn snooze(&self) {
+    pub fn spin_heavy(&self) {
         if self.step.get() <= SPIN_LIMIT {
             for _ in 0..self.step.get().pow(2) {
                 crate::hint::spin_loop()
@@ -131,14 +132,12 @@ impl Backoff {
             crate::thread::yield_now();
         }
 
-        if self.step.get() <= YIELD_LIMIT {
-            self.step.set(self.step.get() + 1);
-        }
+        self.step.set(self.step.get() + 1);
     }
 
-    /// Returns `true` if quadratic backoff has completed and blocking the thread is advised.
+    /// Returns `true` if quadratic backoff has completed and parking the thread is advised.
     #[inline]
     pub fn is_completed(&self) -> bool {
-        self.step.get() > YIELD_LIMIT
+        self.step.get() > SPIN_LIMIT
     }
 }
diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs
index fccd6c29a7e..33f768dcbe9 100644
--- a/library/std/src/sync/mpmc/zero.rs
+++ b/library/std/src/sync/mpmc/zero.rs
@@ -57,7 +57,7 @@ impl<T> Packet<T> {
     fn wait_ready(&self) {
         let backoff = Backoff::new();
         while !self.ready.load(Ordering::Acquire) {
-            backoff.snooze();
+            backoff.spin_heavy();
         }
     }
 }
diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs
index adb488d4378..6e3c28f10bb 100644
--- a/library/std/src/sync/mpsc/mod.rs
+++ b/library/std/src/sync/mpsc/mod.rs
@@ -738,6 +738,15 @@ impl<T> SyncSender<T> {
     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
         self.inner.try_send(t)
     }
+
+    // Attempts to send for a value on this receiver, returning an error if the
+    // corresponding channel has hung up, or if it waits more than `timeout`.
+    //
+    // This method is currently private and only used for tests.
+    #[allow(unused)]
+    fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
+        self.inner.send_timeout(t, timeout)
+    }
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs
index 63c79436974..9d2f92ffc9b 100644
--- a/library/std/src/sync/mpsc/sync_tests.rs
+++ b/library/std/src/sync/mpsc/sync_tests.rs
@@ -1,5 +1,6 @@
 use super::*;
 use crate::env;
+use crate::sync::mpmc::SendTimeoutError;
 use crate::thread;
 use crate::time::Duration;
 
@@ -42,6 +43,13 @@ fn recv_timeout() {
 }
 
 #[test]
+fn send_timeout() {
+    let (tx, _rx) = sync_channel::<i32>(1);
+    assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
+    assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
+}
+
+#[test]
 fn smoke_threads() {
     let (tx, rx) = sync_channel::<i32>(0);
     let _t = thread::spawn(move || {