From b8ae372e483ed194a4aff2c4500bb7f0fd7e2967 Mon Sep 17 00:00:00 2001 From: bjorn3 <17426603+bjorn3@users.noreply.github.com> Date: Fri, 17 Jan 2025 11:04:40 +0000 Subject: Move std::sync unit tests to integration tests This removes two minor OnceLock tests which test private methods. The rest of the tests should be more than enough to catch mistakes in those private methods. Also makes ReentrantLock::try_lock public. And finally it makes the mpmc tests actually run. --- library/std/src/sync/barrier.rs | 3 - library/std/src/sync/barrier/tests.rs | 35 - library/std/src/sync/lazy_lock.rs | 3 - library/std/src/sync/lazy_lock/tests.rs | 167 ---- library/std/src/sync/mpmc/tests.rs | 728 --------------- library/std/src/sync/mpsc.rs | 1242 +++++++++++++++++++++++++ library/std/src/sync/mpsc/mod.rs | 1247 -------------------------- library/std/src/sync/mpsc/sync_tests.rs | 669 -------------- library/std/src/sync/mpsc/tests.rs | 721 --------------- library/std/src/sync/once_lock.rs | 3 - library/std/src/sync/once_lock/tests.rs | 200 ----- library/std/src/sync/poison/condvar.rs | 3 - library/std/src/sync/poison/condvar/tests.rs | 190 ---- library/std/src/sync/poison/mutex.rs | 3 - library/std/src/sync/poison/mutex/tests.rs | 442 --------- library/std/src/sync/poison/once.rs | 3 - library/std/src/sync/poison/once/tests.rs | 162 ---- library/std/src/sync/poison/rwlock.rs | 3 - library/std/src/sync/poison/rwlock/tests.rs | 729 --------------- library/std/src/sync/reentrant_lock.rs | 8 +- library/std/src/sync/reentrant_lock/tests.rs | 53 -- 21 files changed, 1246 insertions(+), 5368 deletions(-) delete mode 100644 library/std/src/sync/barrier/tests.rs delete mode 100644 library/std/src/sync/lazy_lock/tests.rs delete mode 100644 library/std/src/sync/mpmc/tests.rs create mode 100644 library/std/src/sync/mpsc.rs delete mode 100644 library/std/src/sync/mpsc/mod.rs delete mode 100644 library/std/src/sync/mpsc/sync_tests.rs delete mode 100644 library/std/src/sync/mpsc/tests.rs delete mode 100644 library/std/src/sync/once_lock/tests.rs delete mode 100644 library/std/src/sync/poison/condvar/tests.rs delete mode 100644 library/std/src/sync/poison/mutex/tests.rs delete mode 100644 library/std/src/sync/poison/once/tests.rs delete mode 100644 library/std/src/sync/poison/rwlock/tests.rs delete mode 100644 library/std/src/sync/reentrant_lock/tests.rs (limited to 'library/std/src/sync') diff --git a/library/std/src/sync/barrier.rs b/library/std/src/sync/barrier.rs index 862753e4765..067ff66d9af 100644 --- a/library/std/src/sync/barrier.rs +++ b/library/std/src/sync/barrier.rs @@ -1,6 +1,3 @@ -#[cfg(test)] -mod tests; - use crate::fmt; // FIXME(nonpoison_mutex,nonpoison_condvar): switch to nonpoison versions once they are available use crate::sync::{Condvar, Mutex}; diff --git a/library/std/src/sync/barrier/tests.rs b/library/std/src/sync/barrier/tests.rs deleted file mode 100644 index 0fbcd998812..00000000000 --- a/library/std/src/sync/barrier/tests.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::sync::mpsc::{TryRecvError, channel}; -use crate::sync::{Arc, Barrier}; -use crate::thread; - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn test_barrier() { - const N: usize = 10; - - let barrier = Arc::new(Barrier::new(N)); - let (tx, rx) = channel(); - - for _ in 0..N - 1 { - let c = barrier.clone(); - let tx = tx.clone(); - thread::spawn(move || { - tx.send(c.wait().is_leader()).unwrap(); - }); - } - - // At this point, all spawned threads should be blocked, - // so we shouldn't get anything from the port - assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); - - let mut leader_found = barrier.wait().is_leader(); - - // Now, the barrier is cleared and we should get data. - for _ in 0..N - 1 { - if rx.recv().unwrap() { - assert!(!leader_found); - leader_found = true; - } - } - assert!(leader_found); -} diff --git a/library/std/src/sync/lazy_lock.rs b/library/std/src/sync/lazy_lock.rs index 98c83d8d326..78cf8841efe 100644 --- a/library/std/src/sync/lazy_lock.rs +++ b/library/std/src/sync/lazy_lock.rs @@ -350,6 +350,3 @@ unsafe impl Sync for LazyLock {} impl RefUnwindSafe for LazyLock {} #[stable(feature = "lazy_cell", since = "1.80.0")] impl UnwindSafe for LazyLock {} - -#[cfg(test)] -mod tests; diff --git a/library/std/src/sync/lazy_lock/tests.rs b/library/std/src/sync/lazy_lock/tests.rs deleted file mode 100644 index 7d7dde54349..00000000000 --- a/library/std/src/sync/lazy_lock/tests.rs +++ /dev/null @@ -1,167 +0,0 @@ -use crate::cell::LazyCell; -use crate::sync::atomic::AtomicUsize; -use crate::sync::atomic::Ordering::SeqCst; -use crate::sync::{LazyLock, Mutex, OnceLock}; -use crate::{panic, thread}; - -fn spawn_and_wait(f: impl FnOnce() -> R + Send + 'static) -> R { - thread::spawn(f).join().unwrap() -} - -#[test] -fn lazy_default() { - static CALLED: AtomicUsize = AtomicUsize::new(0); - - struct Foo(u8); - impl Default for Foo { - fn default() -> Self { - CALLED.fetch_add(1, SeqCst); - Foo(42) - } - } - - let lazy: LazyCell> = <_>::default(); - - assert_eq!(CALLED.load(SeqCst), 0); - - assert_eq!(lazy.lock().unwrap().0, 42); - assert_eq!(CALLED.load(SeqCst), 1); - - lazy.lock().unwrap().0 = 21; - - assert_eq!(lazy.lock().unwrap().0, 21); - assert_eq!(CALLED.load(SeqCst), 1); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] -fn lazy_poisoning() { - let x: LazyCell = LazyCell::new(|| panic!("kaboom")); - for _ in 0..2 { - let res = panic::catch_unwind(panic::AssertUnwindSafe(|| x.len())); - assert!(res.is_err()); - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn sync_lazy_new() { - static CALLED: AtomicUsize = AtomicUsize::new(0); - static SYNC_LAZY: LazyLock = LazyLock::new(|| { - CALLED.fetch_add(1, SeqCst); - 92 - }); - - assert_eq!(CALLED.load(SeqCst), 0); - - spawn_and_wait(|| { - let y = *SYNC_LAZY - 30; - assert_eq!(y, 62); - assert_eq!(CALLED.load(SeqCst), 1); - }); - - let y = *SYNC_LAZY - 30; - assert_eq!(y, 62); - assert_eq!(CALLED.load(SeqCst), 1); -} - -#[test] -fn sync_lazy_default() { - static CALLED: AtomicUsize = AtomicUsize::new(0); - - struct Foo(u8); - impl Default for Foo { - fn default() -> Self { - CALLED.fetch_add(1, SeqCst); - Foo(42) - } - } - - let lazy: LazyLock> = <_>::default(); - - assert_eq!(CALLED.load(SeqCst), 0); - - assert_eq!(lazy.lock().unwrap().0, 42); - assert_eq!(CALLED.load(SeqCst), 1); - - lazy.lock().unwrap().0 = 21; - - assert_eq!(lazy.lock().unwrap().0, 21); - assert_eq!(CALLED.load(SeqCst), 1); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn static_sync_lazy() { - static XS: LazyLock> = LazyLock::new(|| { - let mut xs = Vec::new(); - xs.push(1); - xs.push(2); - xs.push(3); - xs - }); - - spawn_and_wait(|| { - assert_eq!(&*XS, &vec![1, 2, 3]); - }); - - assert_eq!(&*XS, &vec![1, 2, 3]); -} - -#[test] -fn static_sync_lazy_via_fn() { - fn xs() -> &'static Vec { - static XS: OnceLock> = OnceLock::new(); - XS.get_or_init(|| { - let mut xs = Vec::new(); - xs.push(1); - xs.push(2); - xs.push(3); - xs - }) - } - assert_eq!(xs(), &vec![1, 2, 3]); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] -fn sync_lazy_poisoning() { - let x: LazyLock = LazyLock::new(|| panic!("kaboom")); - for _ in 0..2 { - let res = panic::catch_unwind(|| x.len()); - assert!(res.is_err()); - } -} - -// Check that we can infer `T` from closure's type. -#[test] -fn lazy_type_inference() { - let _ = LazyCell::new(|| ()); -} - -#[test] -fn is_sync_send() { - fn assert_traits() {} - assert_traits::>(); -} - -#[test] -#[should_panic = "has previously been poisoned"] -fn lazy_force_mut_panic() { - let mut lazy = LazyLock::::new(|| panic!()); - crate::panic::catch_unwind(crate::panic::AssertUnwindSafe(|| { - let _ = LazyLock::force_mut(&mut lazy); - })) - .unwrap_err(); - let _ = &*lazy; -} - -#[test] -fn lazy_force_mut() { - let s = "abc".to_owned(); - let mut lazy = LazyLock::new(move || s); - LazyLock::force_mut(&mut lazy); - let p = LazyLock::force_mut(&mut lazy); - p.clear(); - LazyLock::force_mut(&mut lazy); -} diff --git a/library/std/src/sync/mpmc/tests.rs b/library/std/src/sync/mpmc/tests.rs deleted file mode 100644 index ab14050df6c..00000000000 --- a/library/std/src/sync/mpmc/tests.rs +++ /dev/null @@ -1,728 +0,0 @@ -use super::*; -use crate::{env, thread}; - -pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } -} - -#[test] -fn smoke() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn drop_full() { - let (tx, _rx) = channel::>(); - tx.send(Box::new(1)).unwrap(); -} - -#[test] -fn drop_full_shared() { - let (tx, _rx) = channel::>(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(Box::new(1)).unwrap(); -} - -#[test] -fn smoke_shared() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn smoke_threads() { - let (tx, rx) = channel::(); - let t1 = thread::spawn(move || { - for i in 0..2 { - tx.send(i).unwrap(); - } - }); - let t2 = thread::spawn(move || { - assert_eq!(rx.recv().unwrap(), 0); - assert_eq!(rx.recv().unwrap(), 1); - }); - t1.join().unwrap(); - t2.join().unwrap(); -} - -#[test] -fn smoke_port_gone() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(1).is_err()); -} - -#[test] -fn smoke_shared_port_gone() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(1).is_err()) -} - -#[test] -fn smoke_shared_port_gone2() { - let (tx, rx) = channel::(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); -} - -#[test] -fn port_gone_concurrent() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} -} - -#[test] -fn port_gone_concurrent_shared() { - let (tx, rx) = channel::(); - let tx2 = tx.clone(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} -} - -#[test] -fn smoke_chan_gone() { - let (tx, rx) = channel::(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); -} - -#[test] -fn chan_gone_concurrent() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} -} - -#[test] -fn stress() { - let count = if cfg!(miri) { 100 } else { 10000 }; - let (tx, rx) = channel::(); - let t = thread::spawn(move || { - for _ in 0..count { - tx.send(1).unwrap(); - } - }); - for _ in 0..count { - assert_eq!(rx.recv().unwrap(), 1); - } - t.join().ok().expect("thread panicked"); -} - -#[test] -fn stress_shared() { - const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; - const NTHREADS: u32 = 8; - let (tx, rx) = channel::(); - - let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - drop(tx); - t.join().ok().expect("thread panicked"); -} - -#[test] -fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::(); - let t1 = thread::spawn(move || { - tx1.send(()).unwrap(); - for _ in 0..40 { - assert_eq!(rx2.recv().unwrap(), 1); - } - }); - rx1.recv().unwrap(); - let t2 = thread::spawn(move || { - for _ in 0..40 { - tx2.send(1).unwrap(); - } - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); -} - -#[test] -fn recv_from_outside_runtime() { - let (tx, rx) = channel::(); - let t = thread::spawn(move || { - for _ in 0..40 { - assert_eq!(rx.recv().unwrap(), 1); - } - }); - for _ in 0..40 { - tx.send(1).unwrap(); - } - t.join().ok().expect("thread panicked"); -} - -#[test] -fn no_runtime() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - let t1 = thread::spawn(move || { - assert_eq!(rx1.recv().unwrap(), 1); - tx2.send(2).unwrap(); - }); - let t2 = thread::spawn(move || { - tx1.send(1).unwrap(); - assert_eq!(rx2.recv().unwrap(), 2); - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); -} - -#[test] -fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::(); - drop(rx); -} - -#[test] -fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::(); - drop(tx); -} - -#[test] -fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::>(); - drop(rx); - assert!(tx.send(Box::new(0)).is_err()); -} - -#[test] -fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = thread::spawn(move || { - let (tx, rx) = channel::(); - drop(tx); - rx.recv().unwrap(); - }) - .join(); - // What is our res? - assert!(res.is_err()); -} - -#[test] -fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::>(); - tx.send(Box::new(10)).unwrap(); - assert!(*rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::(); - assert!(tx.send(10).is_ok()); - assert!(rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(10).is_err()); -} - -#[test] -fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::(); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); -} - -#[test] -fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); -} - -#[test] -fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::(); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); -} - -#[test] -fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); -} - -#[test] -fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(Box::new(10)).unwrap(); -} - -#[test] -fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - drop(tx); - }); - let res = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }) - .join(); - assert!(res.is_err()); -} - -#[test] -fn oneshot_multi_thread_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - drop(rx); - }); - drop(tx); - } -} - -#[test] -fn oneshot_multi_thread_send_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - drop(rx); - }); - let _ = thread::spawn(move || { - tx.send(1).unwrap(); - }) - .join(); - } -} - -#[test] -fn oneshot_multi_thread_recv_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - thread::spawn(move || { - let res = thread::spawn(move || { - rx.recv().unwrap(); - }) - .join(); - assert!(res.is_err()); - }); - let _t = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - } -} - -#[test] -fn oneshot_multi_thread_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - tx.send(Box::new(10)).unwrap(); - }); - assert!(*rx.recv().unwrap() == 10); - } -} - -#[test] -fn stream_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel(); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: Sender>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - tx.send(Box::new(i)).unwrap(); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - }); - } - } -} - -#[test] -fn oneshot_single_thread_recv_timeout() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); -} - -#[test] -fn stress_recv_timeout_two_threads() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - let timeout = Duration::from_millis(100); - - thread::spawn(move || { - for i in 0..stress { - if i % 2 == 0 { - thread::sleep(timeout * 2); - } - tx.send(1usize).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(timeout) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); -} - -#[test] -fn recv_timeout_upgrade() { - let (tx, rx) = channel::<()>(); - let timeout = Duration::from_millis(1); - let _tx_clone = tx.clone(); - - let start = Instant::now(); - assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); - assert!(Instant::now() >= start + timeout); -} - -#[test] -fn stress_recv_timeout_shared() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - - for i in 0..stress { - let tx = tx.clone(); - thread::spawn(move || { - thread::sleep(Duration::from_millis(i as u64 * 10)); - tx.send(1usize).unwrap(); - }); - } - - drop(tx); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); -} - -#[test] -fn very_long_recv_timeout_wont_panic() { - let (tx, rx) = channel::<()>(); - let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); - thread::sleep(Duration::from_secs(1)); - assert!(tx.send(()).is_ok()); - assert_eq!(join_handle.join().unwrap(), Ok(())); -} - -#[test] -fn recv_a_lot() { - let count = if cfg!(miri) { 1000 } else { 10000 }; - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in 0..count { - tx.send(()).unwrap(); - } - for _ in 0..count { - rx.recv().unwrap(); - } -} - -#[test] -fn shared_recv_timeout() { - let (tx, rx) = channel(); - let total = 5; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); -} - -#[test] -fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } -} - -#[test] -fn test_nested_recv_iter() { - let (tx, rx) = channel::(); - let (total_tx, total_rx) = channel::(); - - let _t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); -} - -#[test] -fn test_recv_iter_break() { - let (tx, rx) = channel::(); - let (count_tx, count_rx) = channel(); - - let _t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); -} - -#[test] -fn test_recv_try_iter() { - let (request_tx, request_rx) = channel(); - let (response_tx, response_rx) = channel(); - - // Request `x`s until we have `6`. - let t = thread::spawn(move || { - let mut count = 0; - loop { - for x in response_rx.try_iter() { - count += x; - if count == 6 { - return count; - } - } - request_tx.send(()).unwrap(); - } - }); - - for _ in request_rx.iter() { - if response_tx.send(2).is_err() { - break; - } - } - - assert_eq!(t.join().unwrap(), 6); -} - -#[test] -fn test_recv_into_iter_owned() { - let mut iter = { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - - rx.into_iter() - }; - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} - -#[test] -fn test_recv_into_iter_borrowed() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - let mut iter = (&rx).into_iter(); - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} - -#[test] -fn try_recv_states() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); -} - -// This bug used to end up in a livelock inside of the Receiver destructor -// because the internal state of the Shared packet was corrupted -#[test] -fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); -} - -#[test] -fn issue_32114() { - let (tx, _) = channel(); - let _ = tx.send(123); - assert_eq!(tx.send(123), Err(SendError(123))); -} - -#[test] -fn issue_39364() { - let (tx, rx) = channel::<()>(); - let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(300)); - let _ = tx.clone(); - // Don't drop; hand back to caller. - tx - }); - - let _ = rx.recv_timeout(Duration::from_millis(500)); - let _tx = t.join().unwrap(); // delay dropping until end of test - let _ = rx.recv_timeout(Duration::from_millis(500)); -} diff --git a/library/std/src/sync/mpsc.rs b/library/std/src/sync/mpsc.rs new file mode 100644 index 00000000000..f942937c14d --- /dev/null +++ b/library/std/src/sync/mpsc.rs @@ -0,0 +1,1242 @@ +//! Multi-producer, single-consumer FIFO queue communication primitives. +//! +//! This module provides message-based communication over channels, concretely +//! defined among three types: +//! +//! * [`Sender`] +//! * [`SyncSender`] +//! * [`Receiver`] +//! +//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both +//! senders are clone-able (multi-producer) such that many threads can send +//! simultaneously to one receiver (single-consumer). +//! +//! These channels come in two flavors: +//! +//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function +//! will return a `(Sender, Receiver)` tuple where all sends will be +//! **asynchronous** (they never block). The channel conceptually has an +//! infinite buffer. +//! +//! 2. A synchronous, bounded channel. The [`sync_channel`] function will +//! return a `(SyncSender, Receiver)` tuple where the storage for pending +//! messages is a pre-allocated buffer of a fixed size. All sends will be +//! **synchronous** by blocking until there is buffer space available. Note +//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" +//! channel where each sender atomically hands off a message to a receiver. +//! +//! [`send`]: Sender::send +//! +//! ## Disconnection +//! +//! The send and receive operations on channels will all return a [`Result`] +//! indicating whether the operation succeeded or not. An unsuccessful operation +//! is normally indicative of the other half of a channel having "hung up" by +//! being dropped in its corresponding thread. +//! +//! Once half of a channel has been deallocated, most operations can no longer +//! continue to make progress, so [`Err`] will be returned. Many applications +//! will continue to [`unwrap`] the results returned from this module, +//! instigating a propagation of failure among threads if one unexpectedly dies. +//! +//! [`unwrap`]: Result::unwrap +//! +//! # Examples +//! +//! Simple usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a simple streaming channel +//! let (tx, rx) = channel(); +//! thread::spawn(move || { +//! tx.send(10).unwrap(); +//! }); +//! assert_eq!(rx.recv().unwrap(), 10); +//! ``` +//! +//! Shared usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a shared channel that can be sent along from many threads +//! // where tx is the sending half (tx for transmission), and rx is the receiving +//! // half (rx for receiving). +//! let (tx, rx) = channel(); +//! for i in 0..10 { +//! let tx = tx.clone(); +//! thread::spawn(move || { +//! tx.send(i).unwrap(); +//! }); +//! } +//! +//! for _ in 0..10 { +//! let j = rx.recv().unwrap(); +//! assert!(0 <= j && j < 10); +//! } +//! ``` +//! +//! Propagating panics: +//! +//! ``` +//! use std::sync::mpsc::channel; +//! +//! // The call to recv() will return an error because the channel has already +//! // hung up (or been deallocated) +//! let (tx, rx) = channel::(); +//! drop(tx); +//! assert!(rx.recv().is_err()); +//! ``` +//! +//! Synchronous channels: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::sync_channel; +//! +//! let (tx, rx) = sync_channel::(0); +//! thread::spawn(move || { +//! // This will wait for the parent thread to start receiving +//! tx.send(53).unwrap(); +//! }); +//! rx.recv().unwrap(); +//! ``` +//! +//! Unbounded receive loop: +//! +//! ``` +//! use std::sync::mpsc::sync_channel; +//! use std::thread; +//! +//! let (tx, rx) = sync_channel(3); +//! +//! for _ in 0..3 { +//! // It would be the same without thread and clone here +//! // since there will still be one `tx` left. +//! let tx = tx.clone(); +//! // cloned tx dropped within thread +//! thread::spawn(move || tx.send("ok").unwrap()); +//! } +//! +//! // Drop the last sender to stop `rx` waiting for message. +//! // The program will not complete if we comment this out. +//! // **All** `tx` needs to be dropped for `rx` to have `Err`. +//! drop(tx); +//! +//! // Unbounded receiver waiting for all senders to complete. +//! while let Ok(msg) = rx.recv() { +//! println!("{msg}"); +//! } +//! +//! println!("completed"); +//! ``` + +#![stable(feature = "rust1", since = "1.0.0")] + +// MPSC channels are built as a wrapper around MPMC channels, which +// were ported from the `crossbeam-channel` crate. MPMC channels are +// not exposed publicly, but if you are curious about the implementation, +// that's where everything is. + +use crate::sync::mpmc; +use crate::time::{Duration, Instant}; +use crate::{error, fmt}; + +/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. +/// This half can only be owned by one thread. +/// +/// Messages sent to the channel can be retrieved using [`recv`]. +/// +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send("Hello world!").unwrap(); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// send.send("Delayed for 2 seconds").unwrap(); +/// }); +/// +/// println!("{}", recv.recv().unwrap()); // Received immediately +/// println!("Waiting..."); +/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")] +pub struct Receiver { + inner: mpmc::Receiver, +} + +// The receiver port can be sent from place to place, so long as it +// is not used to receive non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Receiver {} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Sync for Receiver {} + +/// An iterator over messages on a [`Receiver`], created by [`iter`]. +/// +/// This iterator will block whenever [`next`] is called, +/// waiting for a new message, and [`None`] will be returned +/// when the corresponding channel has hung up. +/// +/// [`iter`]: Receiver::iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(Debug)] +pub struct Iter<'a, T: 'a> { + rx: &'a Receiver, +} + +/// An iterator that attempts to yield all pending values for a [`Receiver`], +/// created by [`try_iter`]. +/// +/// [`None`] will be returned when there are no pending values remaining or +/// if the corresponding channel has hung up. +/// +/// This iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return [`None`]. +/// +/// [`try_iter`]: Receiver::try_iter +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (sender, receiver) = channel(); +/// +/// // Nothing is in the buffer yet +/// assert!(receiver.try_iter().next().is_none()); +/// println!("Nothing in the buffer..."); +/// +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// sender.send(2).unwrap(); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("Going to sleep..."); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// +/// for x in receiver.try_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +#[derive(Debug)] +pub struct TryIter<'a, T: 'a> { + rx: &'a Receiver, +} + +/// An owning iterator over messages on a [`Receiver`], +/// created by [`into_iter`]. +/// +/// This iterator will block whenever [`next`] +/// is called, waiting for a new message, and [`None`] will be +/// returned if the corresponding channel has hung up. +/// +/// [`into_iter`]: Receiver::into_iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.into_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +#[derive(Debug)] +pub struct IntoIter { + rx: Receiver, +} + +/// The sending-half of Rust's asynchronous [`channel`] type. +/// +/// Messages can be sent through this channel with [`send`]. +/// +/// Note: all senders (the original and its clones) need to be dropped for the receiver +/// to stop blocking to receive messages with [`Receiver::recv`]. +/// +/// [`send`]: Sender::send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// let sender2 = sender.clone(); +/// +/// // First thread owns sender +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// }); +/// +/// // Second thread owns sender2 +/// thread::spawn(move || { +/// sender2.send(2).unwrap(); +/// }); +/// +/// let msg = receiver.recv().unwrap(); +/// let msg2 = receiver.recv().unwrap(); +/// +/// assert_eq!(3, msg + msg2); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Sender { + inner: mpmc::Sender, +} + +// The send port can be sent from place to place, so long as it +// is not used to send non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Sender {} + +#[stable(feature = "mpsc_sender_sync", since = "1.72.0")] +unsafe impl Sync for Sender {} + +/// The sending-half of Rust's synchronous [`sync_channel`] type. +/// +/// Messages can be sent through this channel with [`send`] or [`try_send`]. +/// +/// [`send`] will block if there is no space in the internal buffer. +/// +/// [`send`]: SyncSender::send +/// [`try_send`]: SyncSender::try_send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// // Create a sync_channel with buffer size 2 +/// let (sync_sender, receiver) = sync_channel(2); +/// let sync_sender2 = sync_sender.clone(); +/// +/// // First thread owns sync_sender +/// thread::spawn(move || { +/// sync_sender.send(1).unwrap(); +/// sync_sender.send(2).unwrap(); +/// }); +/// +/// // Second thread owns sync_sender2 +/// thread::spawn(move || { +/// sync_sender2.send(3).unwrap(); +/// // thread will now block since the buffer is full +/// println!("Thread unblocked!"); +/// }); +/// +/// let mut msg; +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {msg} received"); +/// +/// // "Thread unblocked!" will be printed now +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {msg} received"); +/// +/// msg = receiver.recv().unwrap(); +/// +/// println!("message {msg} received"); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct SyncSender { + inner: mpmc::Sender, +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for SyncSender {} + +/// An error returned from the [`Sender::send`] or [`SyncSender::send`] +/// function on **channel**s. +/// +/// A **send** operation can only fail if the receiving end of a channel is +/// disconnected, implying that the data could never be received. The error +/// contains the data being sent as a payload so it can be recovered. +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(#[stable(feature = "rust1", since = "1.0.0")] pub T); + +/// An error returned from the [`recv`] function on a [`Receiver`]. +/// +/// The [`recv`] operation can only fail if the sending half of a +/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further +/// messages will ever be received. +/// +/// [`recv`]: Receiver::recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RecvError; + +/// This enumeration is the list of the possible reasons that [`try_recv`] could +/// not return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`try_recv`]: Receiver::try_recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub enum TryRecvError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "rust1", since = "1.0.0")] + Empty, + + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected, +} + +/// This enumeration is the list of possible errors that made [`recv_timeout`] +/// unable to return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`recv_timeout`]: Receiver::recv_timeout +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] +pub enum RecvTimeoutError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Timeout, + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Disconnected, +} + +/// This enumeration is the list of the possible error outcomes for the +/// [`try_send`] method. +/// +/// [`try_send`]: SyncSender::try_send +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum TrySendError { + /// The data could not be sent on the [`sync_channel`] because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no [`Receiver`] available to + /// acquire the data. + #[stable(feature = "rust1", since = "1.0.0")] + Full(#[stable(feature = "rust1", since = "1.0.0")] T), + + /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), +} + +/// Creates a new asynchronous channel, returning the sender/receiver halves. +/// +/// All data sent on the [`Sender`] will become available on the [`Receiver`] in +/// the same order as it was sent, and no [`send`] will block the calling thread +/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will +/// block after its buffer limit is reached). [`recv`] will block until a message +/// is available while there is at least one [`Sender`] alive (including clones). +/// +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but +/// only one [`Receiver`] is supported. +/// +/// If the [`Receiver`] is disconnected while trying to [`send`] with the +/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the +/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will +/// return a [`RecvError`]. +/// +/// [`send`]: Sender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// +/// // Spawn off an expensive computation +/// thread::spawn(move || { +/// # fn expensive_computation() {} +/// sender.send(expensive_computation()).unwrap(); +/// }); +/// +/// // Do some useful work for awhile +/// +/// // Let's see what that answer was +/// println!("{:?}", receiver.recv().unwrap()); +/// ``` +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn channel() -> (Sender, Receiver) { + let (tx, rx) = mpmc::channel(); + (Sender { inner: tx }, Receiver { inner: rx }) +} + +/// Creates a new synchronous, bounded channel. +/// +/// All data sent on the [`SyncSender`] will become available on the [`Receiver`] +/// in the same order as it was sent. Like asynchronous [`channel`]s, the +/// [`Receiver`] will block until a message becomes available. `sync_channel` +/// differs greatly in the semantics of the sender, however. +/// +/// This channel has an internal buffer on which messages will be queued. +/// `bound` specifies the buffer size. When the internal buffer becomes full, +/// future sends will *block* waiting for the buffer to open up. Note that a +/// buffer size of 0 is valid, in which case this becomes "rendezvous channel" +/// where each [`send`] will not return until a [`recv`] is paired with it. +/// +/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple +/// times, but only one [`Receiver`] is supported. +/// +/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying +/// to [`send`] with the [`SyncSender`], the [`send`] method will return a +/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying +/// to [`recv`], the [`recv`] method will return a [`RecvError`]. +/// +/// [`send`]: SyncSender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// let (sender, receiver) = sync_channel(1); +/// +/// // this returns immediately +/// sender.send(1).unwrap(); +/// +/// thread::spawn(move || { +/// // this will block until the previous message has been received +/// sender.send(2).unwrap(); +/// }); +/// +/// assert_eq!(receiver.recv().unwrap(), 1); +/// assert_eq!(receiver.recv().unwrap(), 2); +/// ``` +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { + let (tx, rx) = mpmc::sync_channel(bound); + (SyncSender { inner: tx }, Receiver { inner: rx }) +} + +//////////////////////////////////////////////////////////////////////////////// +// Sender +//////////////////////////////////////////////////////////////////////////////// + +impl Sender { + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of [`Err`] means that the data will never be + /// received, but a return value of [`Ok`] does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns [`Ok`]. + /// + /// This method will never block the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// tx.send(1).unwrap(); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send(1).unwrap_err().0, 1); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError> { + self.inner.send(t) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Clone for Sender { + /// Clone a sender to send to other threads. + /// + /// Note, be aware of the lifetime of the sender because all senders + /// (including the original) need to be dropped in order for + /// [`Receiver::recv`] to stop blocking. + fn clone(&self) -> Sender { + Sender { inner: self.inner.clone() } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").finish_non_exhaustive() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// SyncSender +//////////////////////////////////////////////////////////////////////////////// + +impl SyncSender { + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Items may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, the channel becomes a rendezvous + /// channel and it guarantees that the receiver has indeed received + /// the data if this function returns success. + /// + /// This function will never panic, but it may return [`Err`] if the + /// [`Receiver`] has disconnected and is no longer able to receive + /// information. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a rendezvous sync_channel with buffer size 0 + /// let (sync_sender, receiver) = sync_channel(0); + /// + /// thread::spawn(move || { + /// println!("sending message..."); + /// sync_sender.send(1).unwrap(); + /// // Thread is now blocked until the message is received + /// + /// println!("...message received!"); + /// }); + /// + /// let msg = receiver.recv().unwrap(); + /// assert_eq!(1, msg); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError> { + self.inner.send(t) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from [`send`] by returning immediately if the + /// channel's buffer is full or no receiver is waiting to acquire some + /// data. Compared with [`send`], this function has two failure cases + /// instead of one (one for disconnection, one for a full buffer). + /// + /// See [`send`] for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + /// + /// [`send`]: Self::send + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a sync_channel with buffer size 1 + /// let (sync_sender, receiver) = sync_channel(1); + /// let sync_sender2 = sync_sender.clone(); + /// + /// // First thread owns sync_sender + /// thread::spawn(move || { + /// sync_sender.send(1).unwrap(); + /// sync_sender.send(2).unwrap(); + /// // Thread blocked + /// }); + /// + /// // Second thread owns sync_sender2 + /// thread::spawn(move || { + /// // This will return an error and send + /// // no message if the buffer is full + /// let _ = sync_sender2.try_send(3); + /// }); + /// + /// let mut msg; + /// msg = receiver.recv().unwrap(); + /// println!("message {msg} received"); + /// + /// msg = receiver.recv().unwrap(); + /// println!("message {msg} received"); + /// + /// // Third message may have never been sent + /// match receiver.try_recv() { + /// Ok(msg) => println!("message {msg} received"), + /// Err(_) => println!("the third message was never sent"), + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { + self.inner.try_send(t) + } + + // Attempts to send for a value on this receiver, returning an error if the + // corresponding channel has hung up, or if it waits more than `timeout`. + // + // This method is currently only used for tests. + #[unstable(issue = "none", feature = "std_internals")] + #[doc(hidden)] + pub fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError> { + self.inner.send_timeout(t, timeout) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Clone for SyncSender { + fn clone(&self) -> SyncSender { + SyncSender { inner: self.inner.clone() } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for SyncSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SyncSender").finish_non_exhaustive() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Receiver +//////////////////////////////////////////////////////////////////////////////// + +impl Receiver { + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with [`recv`], this function has two failure cases instead of one + /// (one for disconnection, one for an empty buffer). + /// + /// [`recv`]: Self::recv + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::{Receiver, channel}; + /// + /// let (_, receiver): (_, Receiver) = channel(); + /// + /// assert!(receiver.try_recv().is_err()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_recv(&self) -> Result { + self.inner.try_recv() + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`] + /// (or [`SyncSender`]), this receiver will wake up and return that + /// message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// ``` + /// + /// Buffering behavior: + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// use std::sync::mpsc::RecvError; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// drop(send); + /// }); + /// + /// // wait for the thread to join so we ensure the sender is dropped + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// assert_eq!(Ok(2), recv.recv()); + /// assert_eq!(Ok(3), recv.recv()); + /// assert_eq!(Err(RecvError), recv.recv()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn recv(&self) -> Result { + self.inner.recv() + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if it waits more than `timeout`. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`] + /// (or [`SyncSender`]), this receiver will wake up and return that + /// message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// Successfully receiving value before encountering timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + pub fn recv_timeout(&self, timeout: Duration) -> Result { + self.inner.recv_timeout(timeout) + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "deadline_api", issue = "46316")] + pub fn recv_deadline(&self, deadline: Instant) -> Result { + self.inner.recv_deadline(deadline) + } + + /// Returns an iterator that will block waiting for messages, but never + /// [`panic!`]. It will return [`None`] when the channel has hung up. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::channel; + /// use std::thread; + /// + /// let (send, recv) = channel(); + /// + /// thread::spawn(move || { + /// send.send(1).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// }); + /// + /// let mut iter = recv.iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn iter(&self) -> Iter<'_, T> { + Iter { rx: self } + } + + /// Returns an iterator that will attempt to yield all pending values. + /// It will return `None` if there are no more pending values or if the + /// channel has hung up. The iterator will never [`panic!`] or block the + /// user by waiting for values. + /// + /// # Examples + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (sender, receiver) = channel(); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// sender.send(1).unwrap(); + /// sender.send(2).unwrap(); + /// sender.send(3).unwrap(); + /// }); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// // block for two seconds + /// thread::sleep(Duration::from_secs(2)); + /// + /// let mut iter = receiver.try_iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "receiver_try_iter", since = "1.15.0")] + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter { rx: self } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.rx.try_recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl<'a, T> IntoIterator for &'a Receiver { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl Iterator for IntoIter { + type Item = T; + fn next(&mut self) -> Option { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl IntoIterator for Receiver { + type Item = T; + type IntoIter = IntoIter; + + fn into_iter(self) -> IntoIter { + IntoIter { rx: self } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").finish_non_exhaustive() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendError").finish_non_exhaustive() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for SendError { + #[allow(deprecated)] + fn description(&self) -> &str { + "sending on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "Full(..)".fmt(f), + TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for TrySendError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TrySendError::Full(..) => "sending on a full channel", + TrySendError::Disconnected(..) => "sending on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From> for TrySendError { + /// Converts a `SendError` into a `TrySendError`. + /// + /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError`. + /// + /// No data is allocated on the heap. + fn from(err: SendError) -> TrySendError { + match err { + SendError(t) => TrySendError::Disconnected(t), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for RecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + "receiving on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for TryRecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From for TryRecvError { + /// Converts a `RecvError` into a `TryRecvError`. + /// + /// This conversion always returns `TryRecvError::Disconnected`. + /// + /// No data is allocated on the heap. + fn from(err: RecvError) -> TryRecvError { + match err { + RecvError => TryRecvError::Disconnected, + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl error::Error for RecvTimeoutError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From for RecvTimeoutError { + /// Converts a `RecvError` into a `RecvTimeoutError`. + /// + /// This conversion always returns `RecvTimeoutError::Disconnected`. + /// + /// No data is allocated on the heap. + fn from(err: RecvError) -> RecvTimeoutError { + match err { + RecvError => RecvTimeoutError::Disconnected, + } + } +} diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs deleted file mode 100644 index c86b546e011..00000000000 --- a/library/std/src/sync/mpsc/mod.rs +++ /dev/null @@ -1,1247 +0,0 @@ -//! Multi-producer, single-consumer FIFO queue communication primitives. -//! -//! This module provides message-based communication over channels, concretely -//! defined among three types: -//! -//! * [`Sender`] -//! * [`SyncSender`] -//! * [`Receiver`] -//! -//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both -//! senders are clone-able (multi-producer) such that many threads can send -//! simultaneously to one receiver (single-consumer). -//! -//! These channels come in two flavors: -//! -//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function -//! will return a `(Sender, Receiver)` tuple where all sends will be -//! **asynchronous** (they never block). The channel conceptually has an -//! infinite buffer. -//! -//! 2. A synchronous, bounded channel. The [`sync_channel`] function will -//! return a `(SyncSender, Receiver)` tuple where the storage for pending -//! messages is a pre-allocated buffer of a fixed size. All sends will be -//! **synchronous** by blocking until there is buffer space available. Note -//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" -//! channel where each sender atomically hands off a message to a receiver. -//! -//! [`send`]: Sender::send -//! -//! ## Disconnection -//! -//! The send and receive operations on channels will all return a [`Result`] -//! indicating whether the operation succeeded or not. An unsuccessful operation -//! is normally indicative of the other half of a channel having "hung up" by -//! being dropped in its corresponding thread. -//! -//! Once half of a channel has been deallocated, most operations can no longer -//! continue to make progress, so [`Err`] will be returned. Many applications -//! will continue to [`unwrap`] the results returned from this module, -//! instigating a propagation of failure among threads if one unexpectedly dies. -//! -//! [`unwrap`]: Result::unwrap -//! -//! # Examples -//! -//! Simple usage: -//! -//! ``` -//! use std::thread; -//! use std::sync::mpsc::channel; -//! -//! // Create a simple streaming channel -//! let (tx, rx) = channel(); -//! thread::spawn(move || { -//! tx.send(10).unwrap(); -//! }); -//! assert_eq!(rx.recv().unwrap(), 10); -//! ``` -//! -//! Shared usage: -//! -//! ``` -//! use std::thread; -//! use std::sync::mpsc::channel; -//! -//! // Create a shared channel that can be sent along from many threads -//! // where tx is the sending half (tx for transmission), and rx is the receiving -//! // half (rx for receiving). -//! let (tx, rx) = channel(); -//! for i in 0..10 { -//! let tx = tx.clone(); -//! thread::spawn(move || { -//! tx.send(i).unwrap(); -//! }); -//! } -//! -//! for _ in 0..10 { -//! let j = rx.recv().unwrap(); -//! assert!(0 <= j && j < 10); -//! } -//! ``` -//! -//! Propagating panics: -//! -//! ``` -//! use std::sync::mpsc::channel; -//! -//! // The call to recv() will return an error because the channel has already -//! // hung up (or been deallocated) -//! let (tx, rx) = channel::(); -//! drop(tx); -//! assert!(rx.recv().is_err()); -//! ``` -//! -//! Synchronous channels: -//! -//! ``` -//! use std::thread; -//! use std::sync::mpsc::sync_channel; -//! -//! let (tx, rx) = sync_channel::(0); -//! thread::spawn(move || { -//! // This will wait for the parent thread to start receiving -//! tx.send(53).unwrap(); -//! }); -//! rx.recv().unwrap(); -//! ``` -//! -//! Unbounded receive loop: -//! -//! ``` -//! use std::sync::mpsc::sync_channel; -//! use std::thread; -//! -//! let (tx, rx) = sync_channel(3); -//! -//! for _ in 0..3 { -//! // It would be the same without thread and clone here -//! // since there will still be one `tx` left. -//! let tx = tx.clone(); -//! // cloned tx dropped within thread -//! thread::spawn(move || tx.send("ok").unwrap()); -//! } -//! -//! // Drop the last sender to stop `rx` waiting for message. -//! // The program will not complete if we comment this out. -//! // **All** `tx` needs to be dropped for `rx` to have `Err`. -//! drop(tx); -//! -//! // Unbounded receiver waiting for all senders to complete. -//! while let Ok(msg) = rx.recv() { -//! println!("{msg}"); -//! } -//! -//! println!("completed"); -//! ``` - -#![stable(feature = "rust1", since = "1.0.0")] - -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod tests; - -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod sync_tests; - -// MPSC channels are built as a wrapper around MPMC channels, which -// were ported from the `crossbeam-channel` crate. MPMC channels are -// not exposed publicly, but if you are curious about the implementation, -// that's where everything is. - -use crate::sync::mpmc; -use crate::time::{Duration, Instant}; -use crate::{error, fmt}; - -/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. -/// This half can only be owned by one thread. -/// -/// Messages sent to the channel can be retrieved using [`recv`]. -/// -/// [`recv`]: Receiver::recv -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::channel; -/// use std::thread; -/// use std::time::Duration; -/// -/// let (send, recv) = channel(); -/// -/// thread::spawn(move || { -/// send.send("Hello world!").unwrap(); -/// thread::sleep(Duration::from_secs(2)); // block for two seconds -/// send.send("Delayed for 2 seconds").unwrap(); -/// }); -/// -/// println!("{}", recv.recv().unwrap()); // Received immediately -/// println!("Waiting..."); -/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")] -pub struct Receiver { - inner: mpmc::Receiver, -} - -// The receiver port can be sent from place to place, so long as it -// is not used to receive non-sendable things. -#[stable(feature = "rust1", since = "1.0.0")] -unsafe impl Send for Receiver {} - -#[stable(feature = "rust1", since = "1.0.0")] -impl !Sync for Receiver {} - -/// An iterator over messages on a [`Receiver`], created by [`iter`]. -/// -/// This iterator will block whenever [`next`] is called, -/// waiting for a new message, and [`None`] will be returned -/// when the corresponding channel has hung up. -/// -/// [`iter`]: Receiver::iter -/// [`next`]: Iterator::next -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::channel; -/// use std::thread; -/// -/// let (send, recv) = channel(); -/// -/// thread::spawn(move || { -/// send.send(1u8).unwrap(); -/// send.send(2u8).unwrap(); -/// send.send(3u8).unwrap(); -/// }); -/// -/// for x in recv.iter() { -/// println!("Got: {x}"); -/// } -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -#[derive(Debug)] -pub struct Iter<'a, T: 'a> { - rx: &'a Receiver, -} - -/// An iterator that attempts to yield all pending values for a [`Receiver`], -/// created by [`try_iter`]. -/// -/// [`None`] will be returned when there are no pending values remaining or -/// if the corresponding channel has hung up. -/// -/// This iterator will never block the caller in order to wait for data to -/// become available. Instead, it will return [`None`]. -/// -/// [`try_iter`]: Receiver::try_iter -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::channel; -/// use std::thread; -/// use std::time::Duration; -/// -/// let (sender, receiver) = channel(); -/// -/// // Nothing is in the buffer yet -/// assert!(receiver.try_iter().next().is_none()); -/// println!("Nothing in the buffer..."); -/// -/// thread::spawn(move || { -/// sender.send(1).unwrap(); -/// sender.send(2).unwrap(); -/// sender.send(3).unwrap(); -/// }); -/// -/// println!("Going to sleep..."); -/// thread::sleep(Duration::from_secs(2)); // block for two seconds -/// -/// for x in receiver.try_iter() { -/// println!("Got: {x}"); -/// } -/// ``` -#[stable(feature = "receiver_try_iter", since = "1.15.0")] -#[derive(Debug)] -pub struct TryIter<'a, T: 'a> { - rx: &'a Receiver, -} - -/// An owning iterator over messages on a [`Receiver`], -/// created by [`into_iter`]. -/// -/// This iterator will block whenever [`next`] -/// is called, waiting for a new message, and [`None`] will be -/// returned if the corresponding channel has hung up. -/// -/// [`into_iter`]: Receiver::into_iter -/// [`next`]: Iterator::next -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::channel; -/// use std::thread; -/// -/// let (send, recv) = channel(); -/// -/// thread::spawn(move || { -/// send.send(1u8).unwrap(); -/// send.send(2u8).unwrap(); -/// send.send(3u8).unwrap(); -/// }); -/// -/// for x in recv.into_iter() { -/// println!("Got: {x}"); -/// } -/// ``` -#[stable(feature = "receiver_into_iter", since = "1.1.0")] -#[derive(Debug)] -pub struct IntoIter { - rx: Receiver, -} - -/// The sending-half of Rust's asynchronous [`channel`] type. -/// -/// Messages can be sent through this channel with [`send`]. -/// -/// Note: all senders (the original and its clones) need to be dropped for the receiver -/// to stop blocking to receive messages with [`Receiver::recv`]. -/// -/// [`send`]: Sender::send -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::channel; -/// use std::thread; -/// -/// let (sender, receiver) = channel(); -/// let sender2 = sender.clone(); -/// -/// // First thread owns sender -/// thread::spawn(move || { -/// sender.send(1).unwrap(); -/// }); -/// -/// // Second thread owns sender2 -/// thread::spawn(move || { -/// sender2.send(2).unwrap(); -/// }); -/// -/// let msg = receiver.recv().unwrap(); -/// let msg2 = receiver.recv().unwrap(); -/// -/// assert_eq!(3, msg + msg2); -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -pub struct Sender { - inner: mpmc::Sender, -} - -// The send port can be sent from place to place, so long as it -// is not used to send non-sendable things. -#[stable(feature = "rust1", since = "1.0.0")] -unsafe impl Send for Sender {} - -#[stable(feature = "mpsc_sender_sync", since = "1.72.0")] -unsafe impl Sync for Sender {} - -/// The sending-half of Rust's synchronous [`sync_channel`] type. -/// -/// Messages can be sent through this channel with [`send`] or [`try_send`]. -/// -/// [`send`] will block if there is no space in the internal buffer. -/// -/// [`send`]: SyncSender::send -/// [`try_send`]: SyncSender::try_send -/// -/// # Examples -/// -/// ```rust -/// use std::sync::mpsc::sync_channel; -/// use std::thread; -/// -/// // Create a sync_channel with buffer size 2 -/// let (sync_sender, receiver) = sync_channel(2); -/// let sync_sender2 = sync_sender.clone(); -/// -/// // First thread owns sync_sender -/// thread::spawn(move || { -/// sync_sender.send(1).unwrap(); -/// sync_sender.send(2).unwrap(); -/// }); -/// -/// // Second thread owns sync_sender2 -/// thread::spawn(move || { -/// sync_sender2.send(3).unwrap(); -/// // thread will now block since the buffer is full -/// println!("Thread unblocked!"); -/// }); -/// -/// let mut msg; -/// -/// msg = receiver.recv().unwrap(); -/// println!("message {msg} received"); -/// -/// // "Thread unblocked!" will be printed now -/// -/// msg = receiver.recv().unwrap(); -/// println!("message {msg} received"); -/// -/// msg = receiver.recv().unwrap(); -/// -/// println!("message {msg} received"); -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -pub struct SyncSender { - inner: mpmc::Sender, -} - -#[stable(feature = "rust1", since = "1.0.0")] -unsafe impl Send for SyncSender {} - -/// An error returned from the [`Sender::send`] or [`SyncSender::send`] -/// function on **channel**s. -/// -/// A **send** operation can only fail if the receiving end of a channel is -/// disconnected, implying that the data could never be received. The error -/// contains the data being sent as a payload so it can be recovered. -#[stable(feature = "rust1", since = "1.0.0")] -#[derive(PartialEq, Eq, Clone, Copy)] -pub struct SendError(#[stable(feature = "rust1", since = "1.0.0")] pub T); - -/// An error returned from the [`recv`] function on a [`Receiver`]. -/// -/// The [`recv`] operation can only fail if the sending half of a -/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further -/// messages will ever be received. -/// -/// [`recv`]: Receiver::recv -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[stable(feature = "rust1", since = "1.0.0")] -pub struct RecvError; - -/// This enumeration is the list of the possible reasons that [`try_recv`] could -/// not return data when called. This can occur with both a [`channel`] and -/// a [`sync_channel`]. -/// -/// [`try_recv`]: Receiver::try_recv -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[stable(feature = "rust1", since = "1.0.0")] -pub enum TryRecvError { - /// This **channel** is currently empty, but the **Sender**(s) have not yet - /// disconnected, so data may yet become available. - #[stable(feature = "rust1", since = "1.0.0")] - Empty, - - /// The **channel**'s sending half has become disconnected, and there will - /// never be any more data received on it. - #[stable(feature = "rust1", since = "1.0.0")] - Disconnected, -} - -/// This enumeration is the list of possible errors that made [`recv_timeout`] -/// unable to return data when called. This can occur with both a [`channel`] and -/// a [`sync_channel`]. -/// -/// [`recv_timeout`]: Receiver::recv_timeout -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] -pub enum RecvTimeoutError { - /// This **channel** is currently empty, but the **Sender**(s) have not yet - /// disconnected, so data may yet become available. - #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] - Timeout, - /// The **channel**'s sending half has become disconnected, and there will - /// never be any more data received on it. - #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] - Disconnected, -} - -/// This enumeration is the list of the possible error outcomes for the -/// [`try_send`] method. -/// -/// [`try_send`]: SyncSender::try_send -#[stable(feature = "rust1", since = "1.0.0")] -#[derive(PartialEq, Eq, Clone, Copy)] -pub enum TrySendError { - /// The data could not be sent on the [`sync_channel`] because it would require that - /// the callee block to send the data. - /// - /// If this is a buffered channel, then the buffer is full at this time. If - /// this is not a buffered channel, then there is no [`Receiver`] available to - /// acquire the data. - #[stable(feature = "rust1", since = "1.0.0")] - Full(#[stable(feature = "rust1", since = "1.0.0")] T), - - /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be - /// sent. The data is returned back to the callee in this case. - #[stable(feature = "rust1", since = "1.0.0")] - Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), -} - -/// Creates a new asynchronous channel, returning the sender/receiver halves. -/// -/// All data sent on the [`Sender`] will become available on the [`Receiver`] in -/// the same order as it was sent, and no [`send`] will block the calling thread -/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will -/// block after its buffer limit is reached). [`recv`] will block until a message -/// is available while there is at least one [`Sender`] alive (including clones). -/// -/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but -/// only one [`Receiver`] is supported. -/// -/// If the [`Receiver`] is disconnected while trying to [`send`] with the -/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the -/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will -/// return a [`RecvError`]. -/// -/// [`send`]: Sender::send -/// [`recv`]: Receiver::recv -/// -/// # Examples -/// -/// ``` -/// use std::sync::mpsc::channel; -/// use std::thread; -/// -/// let (sender, receiver) = channel(); -/// -/// // Spawn off an expensive computation -/// thread::spawn(move || { -/// # fn expensive_computation() {} -/// sender.send(expensive_computation()).unwrap(); -/// }); -/// -/// // Do some useful work for awhile -/// -/// // Let's see what that answer was -/// println!("{:?}", receiver.recv().unwrap()); -/// ``` -#[must_use] -#[stable(feature = "rust1", since = "1.0.0")] -pub fn channel() -> (Sender, Receiver) { - let (tx, rx) = mpmc::channel(); - (Sender { inner: tx }, Receiver { inner: rx }) -} - -/// Creates a new synchronous, bounded channel. -/// -/// All data sent on the [`SyncSender`] will become available on the [`Receiver`] -/// in the same order as it was sent. Like asynchronous [`channel`]s, the -/// [`Receiver`] will block until a message becomes available. `sync_channel` -/// differs greatly in the semantics of the sender, however. -/// -/// This channel has an internal buffer on which messages will be queued. -/// `bound` specifies the buffer size. When the internal buffer becomes full, -/// future sends will *block* waiting for the buffer to open up. Note that a -/// buffer size of 0 is valid, in which case this becomes "rendezvous channel" -/// where each [`send`] will not return until a [`recv`] is paired with it. -/// -/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple -/// times, but only one [`Receiver`] is supported. -/// -/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying -/// to [`send`] with the [`SyncSender`], the [`send`] method will return a -/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying -/// to [`recv`], the [`recv`] method will return a [`RecvError`]. -/// -/// [`send`]: SyncSender::send -/// [`recv`]: Receiver::recv -/// -/// # Examples -/// -/// ``` -/// use std::sync::mpsc::sync_channel; -/// use std::thread; -/// -/// let (sender, receiver) = sync_channel(1); -/// -/// // this returns immediately -/// sender.send(1).unwrap(); -/// -/// thread::spawn(move || { -/// // this will block until the previous message has been received -/// sender.send(2).unwrap(); -/// }); -/// -/// assert_eq!(receiver.recv().unwrap(), 1); -/// assert_eq!(receiver.recv().unwrap(), 2); -/// ``` -#[must_use] -#[stable(feature = "rust1", since = "1.0.0")] -pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { - let (tx, rx) = mpmc::sync_channel(bound); - (SyncSender { inner: tx }, Receiver { inner: rx }) -} - -//////////////////////////////////////////////////////////////////////////////// -// Sender -//////////////////////////////////////////////////////////////////////////////// - -impl Sender { - /// Attempts to send a value on this channel, returning it back if it could - /// not be sent. - /// - /// A successful send occurs when it is determined that the other end of - /// the channel has not hung up already. An unsuccessful send would be one - /// where the corresponding receiver has already been deallocated. Note - /// that a return value of [`Err`] means that the data will never be - /// received, but a return value of [`Ok`] does *not* mean that the data - /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns [`Ok`]. - /// - /// This method will never block the current thread. - /// - /// # Examples - /// - /// ``` - /// use std::sync::mpsc::channel; - /// - /// let (tx, rx) = channel(); - /// - /// // This send is always successful - /// tx.send(1).unwrap(); - /// - /// // This send will fail because the receiver is gone - /// drop(rx); - /// assert_eq!(tx.send(1).unwrap_err().0, 1); - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn send(&self, t: T) -> Result<(), SendError> { - self.inner.send(t) - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl Clone for Sender { - /// Clone a sender to send to other threads. - /// - /// Note, be aware of the lifetime of the sender because all senders - /// (including the original) need to be dropped in order for - /// [`Receiver::recv`] to stop blocking. - fn clone(&self) -> Sender { - Sender { inner: self.inner.clone() } - } -} - -#[stable(feature = "mpsc_debug", since = "1.8.0")] -impl fmt::Debug for Sender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Sender").finish_non_exhaustive() - } -} - -//////////////////////////////////////////////////////////////////////////////// -// SyncSender -//////////////////////////////////////////////////////////////////////////////// - -impl SyncSender { - /// Sends a value on this synchronous channel. - /// - /// This function will *block* until space in the internal buffer becomes - /// available or a receiver is available to hand off the message to. - /// - /// Note that a successful send does *not* guarantee that the receiver will - /// ever see the data if there is a buffer on this channel. Items may be - /// enqueued in the internal buffer for the receiver to receive at a later - /// time. If the buffer size is 0, however, the channel becomes a rendezvous - /// channel and it guarantees that the receiver has indeed received - /// the data if this function returns success. - /// - /// This function will never panic, but it may return [`Err`] if the - /// [`Receiver`] has disconnected and is no longer able to receive - /// information. - /// - /// # Examples - /// - /// ```rust - /// use std::sync::mpsc::sync_channel; - /// use std::thread; - /// - /// // Create a rendezvous sync_channel with buffer size 0 - /// let (sync_sender, receiver) = sync_channel(0); - /// - /// thread::spawn(move || { - /// println!("sending message..."); - /// sync_sender.send(1).unwrap(); - /// // Thread is now blocked until the message is received - /// - /// println!("...message received!"); - /// }); - /// - /// let msg = receiver.recv().unwrap(); - /// assert_eq!(1, msg); - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn send(&self, t: T) -> Result<(), SendError> { - self.inner.send(t) - } - - /// Attempts to send a value on this channel without blocking. - /// - /// This method differs from [`send`] by returning immediately if the - /// channel's buffer is full or no receiver is waiting to acquire some - /// data. Compared with [`send`], this function has two failure cases - /// instead of one (one for disconnection, one for a full buffer). - /// - /// See [`send`] for notes about guarantees of whether the - /// receiver has received the data or not if this function is successful. - /// - /// [`send`]: Self::send - /// - /// # Examples - /// - /// ```rust - /// use std::sync::mpsc::sync_channel; - /// use std::thread; - /// - /// // Create a sync_channel with buffer size 1 - /// let (sync_sender, receiver) = sync_channel(1); - /// let sync_sender2 = sync_sender.clone(); - /// - /// // First thread owns sync_sender - /// thread::spawn(move || { - /// sync_sender.send(1).unwrap(); - /// sync_sender.send(2).unwrap(); - /// // Thread blocked - /// }); - /// - /// // Second thread owns sync_sender2 - /// thread::spawn(move || { - /// // This will return an error and send - /// // no message if the buffer is full - /// let _ = sync_sender2.try_send(3); - /// }); - /// - /// let mut msg; - /// msg = receiver.recv().unwrap(); - /// println!("message {msg} received"); - /// - /// msg = receiver.recv().unwrap(); - /// println!("message {msg} received"); - /// - /// // Third message may have never been sent - /// match receiver.try_recv() { - /// Ok(msg) => println!("message {msg} received"), - /// Err(_) => println!("the third message was never sent"), - /// } - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn try_send(&self, t: T) -> Result<(), TrySendError> { - self.inner.try_send(t) - } - - // Attempts to send for a value on this receiver, returning an error if the - // corresponding channel has hung up, or if it waits more than `timeout`. - // - // This method is currently private and only used for tests. - #[allow(unused)] - fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError> { - self.inner.send_timeout(t, timeout) - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl Clone for SyncSender { - fn clone(&self) -> SyncSender { - SyncSender { inner: self.inner.clone() } - } -} - -#[stable(feature = "mpsc_debug", since = "1.8.0")] -impl fmt::Debug for SyncSender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SyncSender").finish_non_exhaustive() - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Receiver -//////////////////////////////////////////////////////////////////////////////// - -impl Receiver { - /// Attempts to return a pending value on this receiver without blocking. - /// - /// This method will never block the caller in order to wait for data to - /// become available. Instead, this will always return immediately with a - /// possible option of pending data on the channel. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a receiver. - /// - /// Compared with [`recv`], this function has two failure cases instead of one - /// (one for disconnection, one for an empty buffer). - /// - /// [`recv`]: Self::recv - /// - /// # Examples - /// - /// ```rust - /// use std::sync::mpsc::{Receiver, channel}; - /// - /// let (_, receiver): (_, Receiver) = channel(); - /// - /// assert!(receiver.try_recv().is_err()); - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn try_recv(&self) -> Result { - self.inner.try_recv() - } - - /// Attempts to wait for a value on this receiver, returning an error if the - /// corresponding channel has hung up. - /// - /// This function will always block the current thread if there is no data - /// available and it's possible for more data to be sent (at least one sender - /// still exists). Once a message is sent to the corresponding [`Sender`] - /// (or [`SyncSender`]), this receiver will wake up and return that - /// message. - /// - /// If the corresponding [`Sender`] has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return [`Err`] to - /// indicate that no more messages can ever be received on this channel. - /// However, since channels are buffered, messages sent before the disconnect - /// will still be properly received. - /// - /// # Examples - /// - /// ``` - /// use std::sync::mpsc; - /// use std::thread; - /// - /// let (send, recv) = mpsc::channel(); - /// let handle = thread::spawn(move || { - /// send.send(1u8).unwrap(); - /// }); - /// - /// handle.join().unwrap(); - /// - /// assert_eq!(Ok(1), recv.recv()); - /// ``` - /// - /// Buffering behavior: - /// - /// ``` - /// use std::sync::mpsc; - /// use std::thread; - /// use std::sync::mpsc::RecvError; - /// - /// let (send, recv) = mpsc::channel(); - /// let handle = thread::spawn(move || { - /// send.send(1u8).unwrap(); - /// send.send(2).unwrap(); - /// send.send(3).unwrap(); - /// drop(send); - /// }); - /// - /// // wait for the thread to join so we ensure the sender is dropped - /// handle.join().unwrap(); - /// - /// assert_eq!(Ok(1), recv.recv()); - /// assert_eq!(Ok(2), recv.recv()); - /// assert_eq!(Ok(3), recv.recv()); - /// assert_eq!(Err(RecvError), recv.recv()); - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn recv(&self) -> Result { - self.inner.recv() - } - - /// Attempts to wait for a value on this receiver, returning an error if the - /// corresponding channel has hung up, or if it waits more than `timeout`. - /// - /// This function will always block the current thread if there is no data - /// available and it's possible for more data to be sent (at least one sender - /// still exists). Once a message is sent to the corresponding [`Sender`] - /// (or [`SyncSender`]), this receiver will wake up and return that - /// message. - /// - /// If the corresponding [`Sender`] has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return [`Err`] to - /// indicate that no more messages can ever be received on this channel. - /// However, since channels are buffered, messages sent before the disconnect - /// will still be properly received. - /// - /// # Examples - /// - /// Successfully receiving value before encountering timeout: - /// - /// ```no_run - /// use std::thread; - /// use std::time::Duration; - /// use std::sync::mpsc; - /// - /// let (send, recv) = mpsc::channel(); - /// - /// thread::spawn(move || { - /// send.send('a').unwrap(); - /// }); - /// - /// assert_eq!( - /// recv.recv_timeout(Duration::from_millis(400)), - /// Ok('a') - /// ); - /// ``` - /// - /// Receiving an error upon reaching timeout: - /// - /// ```no_run - /// use std::thread; - /// use std::time::Duration; - /// use std::sync::mpsc; - /// - /// let (send, recv) = mpsc::channel(); - /// - /// thread::spawn(move || { - /// thread::sleep(Duration::from_millis(800)); - /// send.send('a').unwrap(); - /// }); - /// - /// assert_eq!( - /// recv.recv_timeout(Duration::from_millis(400)), - /// Err(mpsc::RecvTimeoutError::Timeout) - /// ); - /// ``` - #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] - pub fn recv_timeout(&self, timeout: Duration) -> Result { - self.inner.recv_timeout(timeout) - } - - /// Attempts to wait for a value on this receiver, returning an error if the - /// corresponding channel has hung up, or if `deadline` is reached. - /// - /// This function will always block the current thread if there is no data - /// available and it's possible for more data to be sent. Once a message is - /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this - /// receiver will wake up and return that message. - /// - /// If the corresponding [`Sender`] has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return [`Err`] to - /// indicate that no more messages can ever be received on this channel. - /// However, since channels are buffered, messages sent before the disconnect - /// will still be properly received. - /// - /// # Examples - /// - /// Successfully receiving value before reaching deadline: - /// - /// ```no_run - /// #![feature(deadline_api)] - /// use std::thread; - /// use std::time::{Duration, Instant}; - /// use std::sync::mpsc; - /// - /// let (send, recv) = mpsc::channel(); - /// - /// thread::spawn(move || { - /// send.send('a').unwrap(); - /// }); - /// - /// assert_eq!( - /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), - /// Ok('a') - /// ); - /// ``` - /// - /// Receiving an error upon reaching deadline: - /// - /// ```no_run - /// #![feature(deadline_api)] - /// use std::thread; - /// use std::time::{Duration, Instant}; - /// use std::sync::mpsc; - /// - /// let (send, recv) = mpsc::channel(); - /// - /// thread::spawn(move || { - /// thread::sleep(Duration::from_millis(800)); - /// send.send('a').unwrap(); - /// }); - /// - /// assert_eq!( - /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), - /// Err(mpsc::RecvTimeoutError::Timeout) - /// ); - /// ``` - #[unstable(feature = "deadline_api", issue = "46316")] - pub fn recv_deadline(&self, deadline: Instant) -> Result { - self.inner.recv_deadline(deadline) - } - - /// Returns an iterator that will block waiting for messages, but never - /// [`panic!`]. It will return [`None`] when the channel has hung up. - /// - /// # Examples - /// - /// ```rust - /// use std::sync::mpsc::channel; - /// use std::thread; - /// - /// let (send, recv) = channel(); - /// - /// thread::spawn(move || { - /// send.send(1).unwrap(); - /// send.send(2).unwrap(); - /// send.send(3).unwrap(); - /// }); - /// - /// let mut iter = recv.iter(); - /// assert_eq!(iter.next(), Some(1)); - /// assert_eq!(iter.next(), Some(2)); - /// assert_eq!(iter.next(), Some(3)); - /// assert_eq!(iter.next(), None); - /// ``` - #[stable(feature = "rust1", since = "1.0.0")] - pub fn iter(&self) -> Iter<'_, T> { - Iter { rx: self } - } - - /// Returns an iterator that will attempt to yield all pending values. - /// It will return `None` if there are no more pending values or if the - /// channel has hung up. The iterator will never [`panic!`] or block the - /// user by waiting for values. - /// - /// # Examples - /// - /// ```no_run - /// use std::sync::mpsc::channel; - /// use std::thread; - /// use std::time::Duration; - /// - /// let (sender, receiver) = channel(); - /// - /// // nothing is in the buffer yet - /// assert!(receiver.try_iter().next().is_none()); - /// - /// thread::spawn(move || { - /// thread::sleep(Duration::from_secs(1)); - /// sender.send(1).unwrap(); - /// sender.send(2).unwrap(); - /// sender.send(3).unwrap(); - /// }); - /// - /// // nothing is in the buffer yet - /// assert!(receiver.try_iter().next().is_none()); - /// - /// // block for two seconds - /// thread::sleep(Duration::from_secs(2)); - /// - /// let mut iter = receiver.try_iter(); - /// assert_eq!(iter.next(), Some(1)); - /// assert_eq!(iter.next(), Some(2)); - /// assert_eq!(iter.next(), Some(3)); - /// assert_eq!(iter.next(), None); - /// ``` - #[stable(feature = "receiver_try_iter", since = "1.15.0")] - pub fn try_iter(&self) -> TryIter<'_, T> { - TryIter { rx: self } - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl<'a, T> Iterator for Iter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option { - self.rx.recv().ok() - } -} - -#[stable(feature = "receiver_try_iter", since = "1.15.0")] -impl<'a, T> Iterator for TryIter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option { - self.rx.try_recv().ok() - } -} - -#[stable(feature = "receiver_into_iter", since = "1.1.0")] -impl<'a, T> IntoIterator for &'a Receiver { - type Item = T; - type IntoIter = Iter<'a, T>; - - fn into_iter(self) -> Iter<'a, T> { - self.iter() - } -} - -#[stable(feature = "receiver_into_iter", since = "1.1.0")] -impl Iterator for IntoIter { - type Item = T; - fn next(&mut self) -> Option { - self.rx.recv().ok() - } -} - -#[stable(feature = "receiver_into_iter", since = "1.1.0")] -impl IntoIterator for Receiver { - type Item = T; - type IntoIter = IntoIter; - - fn into_iter(self) -> IntoIter { - IntoIter { rx: self } - } -} - -#[stable(feature = "mpsc_debug", since = "1.8.0")] -impl fmt::Debug for Receiver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Receiver").finish_non_exhaustive() - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Debug for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SendError").finish_non_exhaustive() - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "sending on a closed channel".fmt(f) - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl error::Error for SendError { - #[allow(deprecated)] - fn description(&self) -> &str { - "sending on a closed channel" - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Debug for TrySendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - TrySendError::Full(..) => "Full(..)".fmt(f), - TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), - } - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Display for TrySendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - TrySendError::Full(..) => "sending on a full channel".fmt(f), - TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), - } - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl error::Error for TrySendError { - #[allow(deprecated)] - fn description(&self) -> &str { - match *self { - TrySendError::Full(..) => "sending on a full channel", - TrySendError::Disconnected(..) => "sending on a closed channel", - } - } -} - -#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] -impl From> for TrySendError { - /// Converts a `SendError` into a `TrySendError`. - /// - /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError`. - /// - /// No data is allocated on the heap. - fn from(err: SendError) -> TrySendError { - match err { - SendError(t) => TrySendError::Disconnected(t), - } - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Display for RecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "receiving on a closed channel".fmt(f) - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl error::Error for RecvError { - #[allow(deprecated)] - fn description(&self) -> &str { - "receiving on a closed channel" - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl fmt::Display for TryRecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - TryRecvError::Empty => "receiving on an empty channel".fmt(f), - TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), - } - } -} - -#[stable(feature = "rust1", since = "1.0.0")] -impl error::Error for TryRecvError { - #[allow(deprecated)] - fn description(&self) -> &str { - match *self { - TryRecvError::Empty => "receiving on an empty channel", - TryRecvError::Disconnected => "receiving on a closed channel", - } - } -} - -#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] -impl From for TryRecvError { - /// Converts a `RecvError` into a `TryRecvError`. - /// - /// This conversion always returns `TryRecvError::Disconnected`. - /// - /// No data is allocated on the heap. - fn from(err: RecvError) -> TryRecvError { - match err { - RecvError => TryRecvError::Disconnected, - } - } -} - -#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] -impl fmt::Display for RecvTimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), - RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), - } - } -} - -#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] -impl error::Error for RecvTimeoutError { - #[allow(deprecated)] - fn description(&self) -> &str { - match *self { - RecvTimeoutError::Timeout => "timed out waiting on channel", - RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", - } - } -} - -#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] -impl From for RecvTimeoutError { - /// Converts a `RecvError` into a `RecvTimeoutError`. - /// - /// This conversion always returns `RecvTimeoutError::Disconnected`. - /// - /// No data is allocated on the heap. - fn from(err: RecvError) -> RecvTimeoutError { - match err { - RecvError => RecvTimeoutError::Disconnected, - } - } -} diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs deleted file mode 100644 index 49b65c8efe6..00000000000 --- a/library/std/src/sync/mpsc/sync_tests.rs +++ /dev/null @@ -1,669 +0,0 @@ -use super::*; -use crate::rc::Rc; -use crate::sync::mpmc::SendTimeoutError; -use crate::{env, thread}; - -pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } -} - -#[test] -fn smoke() { - let (tx, rx) = sync_channel::(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn drop_full() { - let (tx, _rx) = sync_channel::>(1); - tx.send(Box::new(1)).unwrap(); -} - -#[test] -fn smoke_shared() { - let (tx, rx) = sync_channel::(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn recv_timeout() { - let (tx, rx) = sync_channel::(1); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(1).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); -} - -#[test] -fn send_timeout() { - let (tx, _rx) = sync_channel::(1); - assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(())); - assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1))); -} - -#[test] -fn smoke_threads() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn smoke_port_gone() { - let (tx, rx) = sync_channel::(0); - drop(rx); - assert!(tx.send(1).is_err()); -} - -#[test] -fn smoke_shared_port_gone2() { - let (tx, rx) = sync_channel::(0); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); -} - -#[test] -fn port_gone_concurrent() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} -} - -#[test] -fn port_gone_concurrent_shared() { - let (tx, rx) = sync_channel::(0); - let tx2 = tx.clone(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} -} - -#[test] -fn smoke_chan_gone() { - let (tx, rx) = sync_channel::(0); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn smoke_chan_gone_shared() { - let (tx, rx) = sync_channel::<()>(0); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); -} - -#[test] -fn chan_gone_concurrent() { - let (tx, rx) = sync_channel::(0); - thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} -} - -#[test] -fn stress() { - let count = if cfg!(miri) { 100 } else { 10000 }; - let (tx, rx) = sync_channel::(0); - thread::spawn(move || { - for _ in 0..count { - tx.send(1).unwrap(); - } - }); - for _ in 0..count { - assert_eq!(rx.recv().unwrap(), 1); - } -} - -#[test] -fn stress_recv_timeout_two_threads() { - let count = if cfg!(miri) { 100 } else { 10000 }; - let (tx, rx) = sync_channel::(0); - - thread::spawn(move || { - for _ in 0..count { - tx.send(1).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(1)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, count); -} - -#[test] -fn stress_recv_timeout_shared() { - const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::(0); - let (dtx, drx) = sync_channel::<()>(0); - - thread::spawn(move || { - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, AMT * NTHREADS); - assert!(rx.try_recv().is_err()); - - dtx.send(()).unwrap(); - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - - drop(tx); - - drx.recv().unwrap(); -} - -#[test] -fn stress_shared() { - const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::(0); - let (dtx, drx) = sync_channel::<()>(0); - - thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - dtx.send(()).unwrap(); - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - drop(tx); - drx.recv().unwrap(); -} - -#[test] -fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = sync_channel::(0); - drop(rx); -} - -#[test] -fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = sync_channel::(0); - drop(tx); -} - -#[test] -fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = sync_channel::>(0); - drop(rx); - assert!(tx.send(Box::new(0)).is_err()); -} - -#[test] -fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = thread::spawn(move || { - let (tx, rx) = sync_channel::(0); - drop(tx); - rx.recv().unwrap(); - }) - .join(); - // What is our res? - assert!(res.is_err()); -} - -#[test] -fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = sync_channel::>(1); - tx.send(Box::new(10)).unwrap(); - assert!(*rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_open() { - let (tx, rx) = sync_channel::(1); - assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = sync_channel::(0); - drop(rx); - assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); -} - -#[test] -fn oneshot_single_thread_try_send_closed2() { - let (tx, _rx) = sync_channel::(0); - assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); -} - -#[test] -fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = sync_channel::(1); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); -} - -#[test] -fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = sync_channel::(0); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn oneshot_single_thread_try_recv_closed_with_data() { - let (tx, rx) = sync_channel::(1); - tx.send(10).unwrap(); - drop(tx); - assert_eq!(rx.try_recv(), Ok(10)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); -} - -#[test] -fn oneshot_single_thread_peek_data() { - let (tx, rx) = sync_channel::(1); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); -} - -#[test] -fn oneshot_single_thread_peek_close() { - let (tx, rx) = sync_channel::(0); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); -} - -#[test] -fn oneshot_single_thread_peek_open() { - let (_tx, rx) = sync_channel::(0); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); -} - -#[test] -fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = sync_channel::>(0); - let _t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(Box::new(10)).unwrap(); -} - -#[test] -fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = sync_channel::>(0); - let _t = thread::spawn(move || { - drop(tx); - }); - let res = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }) - .join(); - assert!(res.is_err()); -} - -#[test] -fn oneshot_multi_thread_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - drop(rx); - }); - drop(tx); - } -} - -#[test] -fn oneshot_multi_thread_send_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - drop(rx); - }); - let _ = thread::spawn(move || { - tx.send(1).unwrap(); - }) - .join(); - } -} - -#[test] -fn oneshot_multi_thread_recv_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - let res = thread::spawn(move || { - rx.recv().unwrap(); - }) - .join(); - assert!(res.is_err()); - }); - let _t = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - } -} - -#[test] -fn oneshot_multi_thread_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::>(0); - let _t = thread::spawn(move || { - tx.send(Box::new(10)).unwrap(); - }); - assert!(*rx.recv().unwrap() == 10); - } -} - -#[test] -fn stream_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::>(0); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: SyncSender>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - tx.send(Box::new(i)).unwrap(); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - }); - } - } -} - -#[test] -fn recv_a_lot() { - let count = if cfg!(miri) { 1000 } else { 10000 }; - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(count); - for _ in 0..count { - tx.send(()).unwrap(); - } - for _ in 0..count { - rx.recv().unwrap(); - } -} - -#[test] -fn shared_chan_stress() { - let (tx, rx) = sync_channel(0); - let total = stress_factor() + 100; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } -} - -#[test] -fn test_nested_recv_iter() { - let (tx, rx) = sync_channel::(0); - let (total_tx, total_rx) = sync_channel::(0); - - let _t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); -} - -#[test] -fn test_recv_iter_break() { - let (tx, rx) = sync_channel::(0); - let (count_tx, count_rx) = sync_channel(0); - - let _t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.try_send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); -} - -#[test] -fn try_recv_states() { - let (tx1, rx1) = sync_channel::(1); - let (tx2, rx2) = sync_channel::<()>(1); - let (tx3, rx3) = sync_channel::<()>(1); - let _t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); -} - -// This bug used to end up in a livelock inside of the Receiver destructor -// because the internal state of the Shared packet was corrupted -#[test] -fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = sync_channel::<()>(0); - let (tx2, rx2) = sync_channel::<()>(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); -} - -#[test] -fn send1() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - assert_eq!(tx.send(1), Ok(())); -} - -#[test] -fn send2() { - let (tx, rx) = sync_channel::(0); - let _t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); -} - -#[test] -fn send3() { - let (tx, rx) = sync_channel::(1); - assert_eq!(tx.send(1), Ok(())); - let _t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); -} - -#[test] -fn send4() { - let (tx, rx) = sync_channel::(0); - let tx2 = tx.clone(); - let (done, donerx) = channel(); - let done2 = done.clone(); - let _t = thread::spawn(move || { - assert!(tx.send(1).is_err()); - done.send(()).unwrap(); - }); - let _t = thread::spawn(move || { - assert!(tx2.send(2).is_err()); - done2.send(()).unwrap(); - }); - drop(rx); - donerx.recv().unwrap(); - donerx.recv().unwrap(); -} - -#[test] -fn try_send1() { - let (tx, _rx) = sync_channel::(0); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); -} - -#[test] -fn try_send2() { - let (tx, _rx) = sync_channel::(1); - assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); -} - -#[test] -fn try_send3() { - let (tx, rx) = sync_channel::(1); - assert_eq!(tx.try_send(1), Ok(())); - drop(rx); - assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); -} - -#[test] -fn issue_15761() { - fn repro() { - let (tx1, rx1) = sync_channel::<()>(3); - let (tx2, rx2) = sync_channel::<()>(3); - - let _t = thread::spawn(move || { - rx1.recv().unwrap(); - tx2.try_send(()).unwrap(); - }); - - tx1.try_send(()).unwrap(); - rx2.recv().unwrap(); - } - - for _ in 0..100 { - repro() - } -} - -#[test] -fn drop_unreceived() { - let (tx, rx) = sync_channel::>(1); - let msg = Rc::new(()); - let weak = Rc::downgrade(&msg); - assert!(tx.send(msg).is_ok()); - drop(rx); - // Messages should be dropped immediately when the last receiver is destroyed. - assert!(weak.upgrade().is_none()); - drop(tx); -} diff --git a/library/std/src/sync/mpsc/tests.rs b/library/std/src/sync/mpsc/tests.rs deleted file mode 100644 index 13892fa0d18..00000000000 --- a/library/std/src/sync/mpsc/tests.rs +++ /dev/null @@ -1,721 +0,0 @@ -use super::*; -use crate::{env, thread}; - -pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } -} - -#[test] -fn smoke() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn drop_full() { - let (tx, _rx) = channel::>(); - tx.send(Box::new(1)).unwrap(); -} - -#[test] -fn drop_full_shared() { - let (tx, _rx) = channel::>(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(Box::new(1)).unwrap(); -} - -#[test] -fn smoke_shared() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn smoke_threads() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); -} - -#[test] -fn smoke_port_gone() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(1).is_err()); -} - -#[test] -fn smoke_shared_port_gone() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(1).is_err()) -} - -#[test] -fn smoke_shared_port_gone2() { - let (tx, rx) = channel::(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); -} - -#[test] -fn port_gone_concurrent() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} -} - -#[test] -fn port_gone_concurrent_shared() { - let (tx, rx) = channel::(); - let tx2 = tx.clone(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} -} - -#[test] -fn smoke_chan_gone() { - let (tx, rx) = channel::(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); -} - -#[test] -fn chan_gone_concurrent() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} -} - -#[test] -fn stress() { - let count = if cfg!(miri) { 100 } else { 10000 }; - let (tx, rx) = channel::(); - let t = thread::spawn(move || { - for _ in 0..count { - tx.send(1).unwrap(); - } - }); - for _ in 0..count { - assert_eq!(rx.recv().unwrap(), 1); - } - t.join().ok().expect("thread panicked"); -} - -#[test] -fn stress_shared() { - const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; - const NTHREADS: u32 = 8; - let (tx, rx) = channel::(); - - let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - drop(tx); - t.join().ok().expect("thread panicked"); -} - -#[test] -fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::(); - let t1 = thread::spawn(move || { - tx1.send(()).unwrap(); - for _ in 0..40 { - assert_eq!(rx2.recv().unwrap(), 1); - } - }); - rx1.recv().unwrap(); - let t2 = thread::spawn(move || { - for _ in 0..40 { - tx2.send(1).unwrap(); - } - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); -} - -#[test] -fn recv_from_outside_runtime() { - let (tx, rx) = channel::(); - let t = thread::spawn(move || { - for _ in 0..40 { - assert_eq!(rx.recv().unwrap(), 1); - } - }); - for _ in 0..40 { - tx.send(1).unwrap(); - } - t.join().ok().expect("thread panicked"); -} - -#[test] -fn no_runtime() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::(); - let t1 = thread::spawn(move || { - assert_eq!(rx1.recv().unwrap(), 1); - tx2.send(2).unwrap(); - }); - let t2 = thread::spawn(move || { - tx1.send(1).unwrap(); - assert_eq!(rx2.recv().unwrap(), 2); - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); -} - -#[test] -fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::(); - drop(rx); -} - -#[test] -fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::(); - drop(tx); -} - -#[test] -fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::>(); - drop(rx); - assert!(tx.send(Box::new(0)).is_err()); -} - -#[test] -fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = thread::spawn(move || { - let (tx, rx) = channel::(); - drop(tx); - rx.recv().unwrap(); - }) - .join(); - // What is our res? - assert!(res.is_err()); -} - -#[test] -fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::>(); - tx.send(Box::new(10)).unwrap(); - assert!(*rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::(); - assert!(tx.send(10).is_ok()); - assert!(rx.recv().unwrap() == 10); -} - -#[test] -fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::(); - drop(rx); - assert!(tx.send(10).is_err()); -} - -#[test] -fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::(); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); -} - -#[test] -fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::(); - drop(tx); - assert!(rx.recv().is_err()); -} - -#[test] -fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); -} - -#[test] -fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::(); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); -} - -#[test] -fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); -} - -#[test] -fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(Box::new(10)).unwrap(); -} - -#[test] -fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - drop(tx); - }); - let res = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }) - .join(); - assert!(res.is_err()); -} - -#[test] -fn oneshot_multi_thread_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - drop(rx); - }); - drop(tx); - } -} - -#[test] -fn oneshot_multi_thread_send_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - let _t = thread::spawn(move || { - drop(rx); - }); - let _ = thread::spawn(move || { - tx.send(1).unwrap(); - }) - .join(); - } -} - -#[test] -fn oneshot_multi_thread_recv_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::(); - thread::spawn(move || { - let res = thread::spawn(move || { - rx.recv().unwrap(); - }) - .join(); - assert!(res.is_err()); - }); - let _t = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - } -} - -#[test] -fn oneshot_multi_thread_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::>(); - let _t = thread::spawn(move || { - tx.send(Box::new(10)).unwrap(); - }); - assert!(*rx.recv().unwrap() == 10); - } -} - -#[test] -fn stream_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel(); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: Sender>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - tx.send(Box::new(i)).unwrap(); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - }); - } - } -} - -#[test] -fn oneshot_single_thread_recv_timeout() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); -} - -#[test] -fn stress_recv_timeout_two_threads() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - let timeout = Duration::from_millis(100); - - thread::spawn(move || { - for i in 0..stress { - if i % 2 == 0 { - thread::sleep(timeout * 2); - } - tx.send(1usize).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(timeout) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); -} - -#[test] -fn recv_timeout_upgrade() { - let (tx, rx) = channel::<()>(); - let timeout = Duration::from_millis(1); - let _tx_clone = tx.clone(); - - let start = Instant::now(); - assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); - assert!(Instant::now() >= start + timeout); -} - -#[test] -fn stress_recv_timeout_shared() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - - for i in 0..stress { - let tx = tx.clone(); - thread::spawn(move || { - thread::sleep(Duration::from_millis(i as u64 * 10)); - tx.send(1usize).unwrap(); - }); - } - - drop(tx); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); -} - -#[test] -fn very_long_recv_timeout_wont_panic() { - let (tx, rx) = channel::<()>(); - let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); - thread::sleep(Duration::from_secs(1)); - assert!(tx.send(()).is_ok()); - assert_eq!(join_handle.join().unwrap(), Ok(())); -} - -#[test] -fn recv_a_lot() { - let count = if cfg!(miri) { 1000 } else { 10000 }; - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in 0..count { - tx.send(()).unwrap(); - } - for _ in 0..count { - rx.recv().unwrap(); - } -} - -#[test] -fn shared_recv_timeout() { - let (tx, rx) = channel(); - let total = 5; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); -} - -#[test] -fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } -} - -#[test] -fn test_nested_recv_iter() { - let (tx, rx) = channel::(); - let (total_tx, total_rx) = channel::(); - - let _t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); -} - -#[test] -fn test_recv_iter_break() { - let (tx, rx) = channel::(); - let (count_tx, count_rx) = channel(); - - let _t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); -} - -#[test] -fn test_recv_try_iter() { - let (request_tx, request_rx) = channel(); - let (response_tx, response_rx) = channel(); - - // Request `x`s until we have `6`. - let t = thread::spawn(move || { - let mut count = 0; - loop { - for x in response_rx.try_iter() { - count += x; - if count == 6 { - return count; - } - } - request_tx.send(()).unwrap(); - } - }); - - for _ in request_rx.iter() { - if response_tx.send(2).is_err() { - break; - } - } - - assert_eq!(t.join().unwrap(), 6); -} - -#[test] -fn test_recv_into_iter_owned() { - let mut iter = { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - - rx.into_iter() - }; - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} - -#[test] -fn test_recv_into_iter_borrowed() { - let (tx, rx) = channel::(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - let mut iter = (&rx).into_iter(); - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); -} - -#[test] -fn try_recv_states() { - let (tx1, rx1) = channel::(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); -} - -// This bug used to end up in a livelock inside of the Receiver destructor -// because the internal state of the Shared packet was corrupted -#[test] -fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); -} - -#[test] -fn issue_32114() { - let (tx, _) = channel(); - let _ = tx.send(123); - assert_eq!(tx.send(123), Err(SendError(123))); -} - -#[test] -fn issue_39364() { - let (tx, rx) = channel::<()>(); - let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(300)); - let _ = tx.clone(); - // Don't drop; hand back to caller. - tx - }); - - let _ = rx.recv_timeout(Duration::from_millis(500)); - let _tx = t.join().unwrap(); // delay dropping until end of test - let _ = rx.recv_timeout(Duration::from_millis(500)); -} diff --git a/library/std/src/sync/once_lock.rs b/library/std/src/sync/once_lock.rs index 49f2dafd8fd..f6e4b34a13a 100644 --- a/library/std/src/sync/once_lock.rs +++ b/library/std/src/sync/once_lock.rs @@ -676,6 +676,3 @@ unsafe impl<#[may_dangle] T> Drop for OnceLock { } } } - -#[cfg(test)] -mod tests; diff --git a/library/std/src/sync/once_lock/tests.rs b/library/std/src/sync/once_lock/tests.rs deleted file mode 100644 index 5113d436c3c..00000000000 --- a/library/std/src/sync/once_lock/tests.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::sync::OnceLock; -use crate::sync::atomic::AtomicUsize; -use crate::sync::atomic::Ordering::SeqCst; -use crate::sync::mpsc::channel; -use crate::{panic, thread}; - -fn spawn_and_wait(f: impl FnOnce() -> R + Send + 'static) -> R { - thread::spawn(f).join().unwrap() -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn sync_once_cell() { - static ONCE_CELL: OnceLock = OnceLock::new(); - - assert!(ONCE_CELL.get().is_none()); - - spawn_and_wait(|| { - ONCE_CELL.get_or_init(|| 92); - assert_eq!(ONCE_CELL.get(), Some(&92)); - }); - - ONCE_CELL.get_or_init(|| panic!("Kaboom!")); - assert_eq!(ONCE_CELL.get(), Some(&92)); -} - -#[test] -fn sync_once_cell_get_mut() { - let mut c = OnceLock::new(); - assert!(c.get_mut().is_none()); - c.set(90).unwrap(); - *c.get_mut().unwrap() += 2; - assert_eq!(c.get_mut(), Some(&mut 92)); -} - -#[test] -fn sync_once_cell_get_unchecked() { - let c = OnceLock::new(); - c.set(92).unwrap(); - unsafe { - assert_eq!(c.get_unchecked(), &92); - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn sync_once_cell_drop() { - static DROP_CNT: AtomicUsize = AtomicUsize::new(0); - struct Dropper; - impl Drop for Dropper { - fn drop(&mut self) { - DROP_CNT.fetch_add(1, SeqCst); - } - } - - let x = OnceLock::new(); - spawn_and_wait(move || { - x.get_or_init(|| Dropper); - assert_eq!(DROP_CNT.load(SeqCst), 0); - drop(x); - }); - - assert_eq!(DROP_CNT.load(SeqCst), 1); -} - -#[test] -fn sync_once_cell_drop_empty() { - let x = OnceLock::::new(); - drop(x); -} - -#[test] -fn clone() { - let s = OnceLock::new(); - let c = s.clone(); - assert!(c.get().is_none()); - - s.set("hello".to_string()).unwrap(); - let c = s.clone(); - assert_eq!(c.get().map(String::as_str), Some("hello")); -} - -#[test] -#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] -fn get_or_try_init() { - let cell: OnceLock = OnceLock::new(); - assert!(cell.get().is_none()); - - let res = panic::catch_unwind(|| cell.get_or_try_init(|| -> Result<_, ()> { panic!() })); - assert!(res.is_err()); - assert!(!cell.is_initialized()); - assert!(cell.get().is_none()); - - assert_eq!(cell.get_or_try_init(|| Err(())), Err(())); - - assert_eq!(cell.get_or_try_init(|| Ok::<_, ()>("hello".to_string())), Ok(&"hello".to_string())); - assert_eq!(cell.get(), Some(&"hello".to_string())); -} - -#[test] -fn from_impl() { - assert_eq!(OnceLock::from("value").get(), Some(&"value")); - assert_ne!(OnceLock::from("foo").get(), Some(&"bar")); -} - -#[test] -fn partialeq_impl() { - assert!(OnceLock::from("value") == OnceLock::from("value")); - assert!(OnceLock::from("foo") != OnceLock::from("bar")); - - assert!(OnceLock::::new() == OnceLock::new()); - assert!(OnceLock::::new() != OnceLock::from("value".to_owned())); -} - -#[test] -fn into_inner() { - let cell: OnceLock = OnceLock::new(); - assert_eq!(cell.into_inner(), None); - let cell = OnceLock::new(); - cell.set("hello".to_string()).unwrap(); - assert_eq!(cell.into_inner(), Some("hello".to_string())); -} - -#[test] -fn is_sync_send() { - fn assert_traits() {} - assert_traits::>(); -} - -#[test] -fn eval_once_macro() { - macro_rules! eval_once { - (|| -> $ty:ty { - $($body:tt)* - }) => {{ - static ONCE_CELL: OnceLock<$ty> = OnceLock::new(); - fn init() -> $ty { - $($body)* - } - ONCE_CELL.get_or_init(init) - }}; - } - - let fib: &'static Vec = eval_once! { - || -> Vec { - let mut res = vec![1, 1]; - for i in 0..10 { - let next = res[i] + res[i + 1]; - res.push(next); - } - res - } - }; - assert_eq!(fib[5], 8) -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn sync_once_cell_does_not_leak_partially_constructed_boxes() { - static ONCE_CELL: OnceLock = OnceLock::new(); - - let n_readers = 10; - let n_writers = 3; - const MSG: &str = "Hello, World"; - - let (tx, rx) = channel(); - - for _ in 0..n_readers { - let tx = tx.clone(); - thread::spawn(move || { - loop { - if let Some(msg) = ONCE_CELL.get() { - tx.send(msg).unwrap(); - break; - } - #[cfg(target_env = "sgx")] - crate::thread::yield_now(); - } - }); - } - for _ in 0..n_writers { - thread::spawn(move || { - let _ = ONCE_CELL.set(MSG.to_owned()); - }); - } - - for _ in 0..n_readers { - let msg = rx.recv().unwrap(); - assert_eq!(msg, MSG); - } -} - -#[test] -fn dropck() { - let cell = OnceLock::new(); - { - let s = String::new(); - cell.set(&s).unwrap(); - } -} diff --git a/library/std/src/sync/poison/condvar.rs b/library/std/src/sync/poison/condvar.rs index a6e2389c93b..7f0f3f652bc 100644 --- a/library/std/src/sync/poison/condvar.rs +++ b/library/std/src/sync/poison/condvar.rs @@ -1,6 +1,3 @@ -#[cfg(test)] -mod tests; - use crate::fmt; use crate::sync::poison::{self, LockResult, MutexGuard, PoisonError, mutex}; use crate::sys::sync as sys; diff --git a/library/std/src/sync/poison/condvar/tests.rs b/library/std/src/sync/poison/condvar/tests.rs deleted file mode 100644 index f9e9066bc92..00000000000 --- a/library/std/src/sync/poison/condvar/tests.rs +++ /dev/null @@ -1,190 +0,0 @@ -use crate::sync::atomic::{AtomicBool, Ordering}; -use crate::sync::mpsc::channel; -use crate::sync::{Arc, Condvar, Mutex}; -use crate::thread; -use crate::time::Duration; - -#[test] -fn smoke() { - let c = Condvar::new(); - c.notify_one(); - c.notify_all(); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn notify_one() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let g = m.lock().unwrap(); - let _t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - c2.notify_one(); - }); - let g = c.wait(g).unwrap(); - drop(g); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn notify_all() { - const N: usize = 10; - - let data = Arc::new((Mutex::new(0), Condvar::new())); - let (tx, rx) = channel(); - for _ in 0..N { - let data = data.clone(); - let tx = tx.clone(); - thread::spawn(move || { - let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock().unwrap(); - *cnt += 1; - if *cnt == N { - tx.send(()).unwrap(); - } - while *cnt != 0 { - cnt = cond.wait(cnt).unwrap(); - } - tx.send(()).unwrap(); - }); - } - drop(tx); - - let &(ref lock, ref cond) = &*data; - rx.recv().unwrap(); - let mut cnt = lock.lock().unwrap(); - *cnt = 0; - cond.notify_all(); - drop(cnt); - - for _ in 0..N { - rx.recv().unwrap(); - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn wait_while() { - let pair = Arc::new((Mutex::new(false), Condvar::new())); - let pair2 = pair.clone(); - - // Inside of our lock, spawn a new thread, and then wait for it to start. - thread::spawn(move || { - let &(ref lock, ref cvar) = &*pair2; - let mut started = lock.lock().unwrap(); - *started = true; - // We notify the condvar that the value has changed. - cvar.notify_one(); - }); - - // Wait for the thread to start up. - let &(ref lock, ref cvar) = &*pair; - let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); - assert!(*guard.unwrap()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported -fn wait_timeout_wait() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - loop { - let g = m.lock().unwrap(); - let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); - // spurious wakeups mean this isn't necessarily true - // so execute test again, if not timeout - if !no_timeout.timed_out() { - continue; - } - - break; - } -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported -fn wait_timeout_while_wait() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); - // no spurious wakeups. ensure it timed-out - assert!(wait.timed_out()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported -fn wait_timeout_while_instant_satisfy() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); - // ensure it didn't time-out even if we were not given any time. - assert!(!wait.timed_out()); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn wait_timeout_while_wake() { - let pair = Arc::new((Mutex::new(false), Condvar::new())); - let pair_copy = pair.clone(); - - let &(ref m, ref c) = &*pair; - let g = m.lock().unwrap(); - let _t = thread::spawn(move || { - let &(ref lock, ref cvar) = &*pair_copy; - let mut started = lock.lock().unwrap(); - thread::sleep(Duration::from_millis(1)); - *started = true; - cvar.notify_one(); - }); - let (g2, wait) = c - .wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) - .unwrap(); - // ensure it didn't time-out even if we were not given any time. - assert!(!wait.timed_out()); - assert!(*g2); -} - -#[test] -#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads -fn wait_timeout_wake() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - loop { - let g = m.lock().unwrap(); - - let c2 = c.clone(); - let m2 = m.clone(); - - let notified = Arc::new(AtomicBool::new(false)); - let notified_copy = notified.clone(); - - let t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - thread::sleep(Duration::from_millis(1)); - notified_copy.store(true, Ordering::Relaxed); - c2.notify_one(); - }); - let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); - assert!(!timeout_res.timed_out()); - // spurious wakeups mean this isn't necessarily true - // so execute test again, if not notified - if !notified.load(Ordering::Relaxed) { - t.join().unwrap(); - continue; - } - drop(g); - - t.join().unwrap(); - - break; - } -} diff --git a/library/std/src/sync/poison/mutex.rs b/library/std/src/sync/poison/mutex.rs index 01ef71a187f..0a5ad561926 100644 --- a/library/std/src/sync/poison/mutex.rs +++ b/library/std/src/sync/poison/mutex.rs @@ -1,6 +1,3 @@ -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod tests; - use crate::cell::UnsafeCell; use crate::fmt; use crate::marker::PhantomData; diff --git a/library/std/src/sync/poison/mutex/tests.rs b/library/std/src/sync/poison/mutex/tests.rs deleted file mode 100644 index 395c8aada08..00000000000 --- a/library/std/src/sync/poison/mutex/tests.rs +++ /dev/null @@ -1,442 +0,0 @@ -use crate::fmt::Debug; -use crate::ops::FnMut; -use crate::panic::{self, AssertUnwindSafe}; -use crate::sync::atomic::{AtomicUsize, Ordering}; -use crate::sync::mpsc::channel; -use crate::sync::{Arc, Condvar, MappedMutexGuard, Mutex, MutexGuard, TryLockError}; -use crate::{hint, mem, thread}; - -struct Packet(Arc<(Mutex, Condvar)>); - -#[derive(Eq, PartialEq, Debug)] -struct NonCopy(i32); - -#[derive(Eq, PartialEq, Debug)] -struct NonCopyNeedsDrop(i32); - -impl Drop for NonCopyNeedsDrop { - fn drop(&mut self) { - hint::black_box(()); - } -} - -#[test] -fn test_needs_drop() { - assert!(!mem::needs_drop::()); - assert!(mem::needs_drop::()); -} - -#[derive(Clone, Eq, PartialEq, Debug)] -struct Cloneable(i32); - -#[test] -fn smoke() { - let m = Mutex::new(()); - drop(m.lock().unwrap()); - drop(m.lock().unwrap()); -} - -#[test] -fn lots_and_lots() { - const J: u32 = 1000; - const K: u32 = 3; - - let m = Arc::new(Mutex::new(0)); - - fn inc(m: &Mutex) { - for _ in 0..J { - *m.lock().unwrap() += 1; - } - } - - let (tx, rx) = channel(); - for _ in 0..K { - let tx2 = tx.clone(); - let m2 = m.clone(); - thread::spawn(move || { - inc(&m2); - tx2.send(()).unwrap(); - }); - let tx2 = tx.clone(); - let m2 = m.clone(); - thread::spawn(move || { - inc(&m2); - tx2.send(()).unwrap(); - }); - } - - drop(tx); - for _ in 0..2 * K { - rx.recv().unwrap(); - } - assert_eq!(*m.lock().unwrap(), J * K * 2); -} - -#[test] -fn try_lock() { - let m = Mutex::new(()); - *m.try_lock().unwrap() = (); -} - -fn new_poisoned_mutex(value: T) -> Mutex { - let mutex = Mutex::new(value); - - let catch_unwind_result = panic::catch_unwind(AssertUnwindSafe(|| { - let _guard = mutex.lock().unwrap(); - - panic!("test panic to poison mutex"); - })); - - assert!(catch_unwind_result.is_err()); - assert!(mutex.is_poisoned()); - - mutex -} - -#[test] -fn test_into_inner() { - let m = Mutex::new(NonCopy(10)); - assert_eq!(m.into_inner().unwrap(), NonCopy(10)); -} - -#[test] -fn test_into_inner_drop() { - struct Foo(Arc); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = Mutex::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner().unwrap(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); -} - -#[test] -fn test_into_inner_poison() { - let m = new_poisoned_mutex(NonCopy(10)); - - match m.into_inner() { - Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), - Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {x:?}"), - } -} - -#[test] -fn test_get_cloned() { - let m = Mutex::new(Cloneable(10)); - - assert_eq!(m.get_cloned().unwrap(), Cloneable(10)); -} - -#[test] -fn test_get_cloned_poison() { - let m = new_poisoned_mutex(Cloneable(10)); - - match m.get_cloned() { - Err(e) => assert_eq!(e.into_inner(), ()), - Ok(x) => panic!("get of poisoned Mutex is Ok: {x:?}"), - } -} - -#[test] -fn test_get_mut() { - let mut m = Mutex::new(NonCopy(10)); - *m.get_mut().unwrap() = NonCopy(20); - assert_eq!(m.into_inner().unwrap(), NonCopy(20)); -} - -#[test] -fn test_get_mut_poison() { - let mut m = new_poisoned_mutex(NonCopy(10)); - - match m.get_mut() { - Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), - Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {x:?}"), - } -} - -#[test] -fn test_set() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = Mutex::new(init()); - - assert_eq!(*m.lock().unwrap(), init()); - m.set(value()).unwrap(); - assert_eq!(*m.lock().unwrap(), value()); - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_set_poison() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = new_poisoned_mutex(init()); - - match m.set(value()) { - Err(e) => { - assert_eq!(e.into_inner(), value()); - assert_eq!(m.into_inner().unwrap_err().into_inner(), init()); - } - Ok(x) => panic!("set of poisoned Mutex is Ok: {x:?}"), - } - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_replace() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = Mutex::new(init()); - - assert_eq!(*m.lock().unwrap(), init()); - assert_eq!(m.replace(value()).unwrap(), init()); - assert_eq!(*m.lock().unwrap(), value()); - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_replace_poison() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = new_poisoned_mutex(init()); - - match m.replace(value()) { - Err(e) => { - assert_eq!(e.into_inner(), value()); - assert_eq!(m.into_inner().unwrap_err().into_inner(), init()); - } - Ok(x) => panic!("replace of poisoned Mutex is Ok: {x:?}"), - } - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_mutex_arc_condvar() { - let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); - let packet2 = Packet(packet.0.clone()); - let (tx, rx) = channel(); - let _t = thread::spawn(move || { - // wait until parent gets in - rx.recv().unwrap(); - let &(ref lock, ref cvar) = &*packet2.0; - let mut lock = lock.lock().unwrap(); - *lock = true; - cvar.notify_one(); - }); - - let &(ref lock, ref cvar) = &*packet.0; - let mut lock = lock.lock().unwrap(); - tx.send(()).unwrap(); - assert!(!*lock); - while !*lock { - lock = cvar.wait(lock).unwrap(); - } -} - -#[test] -fn test_arc_condvar_poison() { - let packet = Packet(Arc::new((Mutex::new(1), Condvar::new()))); - let packet2 = Packet(packet.0.clone()); - let (tx, rx) = channel(); - - let _t = thread::spawn(move || -> () { - rx.recv().unwrap(); - let &(ref lock, ref cvar) = &*packet2.0; - let _g = lock.lock().unwrap(); - cvar.notify_one(); - // Parent should fail when it wakes up. - panic!(); - }); - - let &(ref lock, ref cvar) = &*packet.0; - let mut lock = lock.lock().unwrap(); - tx.send(()).unwrap(); - while *lock == 1 { - match cvar.wait(lock) { - Ok(l) => { - lock = l; - assert_eq!(*lock, 1); - } - Err(..) => break, - } - } -} - -#[test] -fn test_mutex_arc_poison() { - let arc = Arc::new(Mutex::new(1)); - assert!(!arc.is_poisoned()); - let arc2 = arc.clone(); - let _ = thread::spawn(move || { - let lock = arc2.lock().unwrap(); - assert_eq!(*lock, 2); // deliberate assertion failure to poison the mutex - }) - .join(); - assert!(arc.lock().is_err()); - assert!(arc.is_poisoned()); -} - -#[test] -fn test_mutex_arc_poison_mapped() { - let arc = Arc::new(Mutex::new(1)); - assert!(!arc.is_poisoned()); - let arc2 = arc.clone(); - let _ = thread::spawn(move || { - let lock = arc2.lock().unwrap(); - let lock = MutexGuard::map(lock, |val| val); - assert_eq!(*lock, 2); // deliberate assertion failure to poison the mutex - }) - .join(); - assert!(arc.lock().is_err()); - assert!(arc.is_poisoned()); -} - -#[test] -fn test_mutex_arc_nested() { - // Tests nested mutexes and access - // to underlying data. - let arc = Arc::new(Mutex::new(1)); - let arc2 = Arc::new(Mutex::new(arc)); - let (tx, rx) = channel(); - let _t = thread::spawn(move || { - let lock = arc2.lock().unwrap(); - let lock2 = lock.lock().unwrap(); - assert_eq!(*lock2, 1); - tx.send(()).unwrap(); - }); - rx.recv().unwrap(); -} - -#[test] -fn test_mutex_arc_access_in_unwind() { - let arc = Arc::new(Mutex::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - *self.i.lock().unwrap() += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.lock().unwrap(); - assert_eq!(*lock, 2); -} - -#[test] -fn test_mutex_unsized() { - let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); - { - let b = &mut *mutex.lock().unwrap(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*mutex.lock().unwrap(), comp); -} - -#[test] -fn test_mapping_mapped_guard() { - let arr = [0; 4]; - let mut lock = Mutex::new(arr); - let guard = lock.lock().unwrap(); - let guard = MutexGuard::map(guard, |arr| &mut arr[..2]); - let mut guard = MappedMutexGuard::map(guard, |slice| &mut slice[1..]); - assert_eq!(guard.len(), 1); - guard[0] = 42; - drop(guard); - assert_eq!(*lock.get_mut().unwrap(), [0, 42, 0, 0]); -} - -#[test] -fn panic_while_mapping_unlocked_poison() { - let lock = Mutex::new(()); - - let _ = panic::catch_unwind(|| { - let guard = lock.lock().unwrap(); - let _guard = MutexGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_lock() { - Ok(_) => panic!("panicking in a MutexGuard::map closure should poison the Mutex"), - Err(TryLockError::WouldBlock) => { - panic!("panicking in a MutexGuard::map closure should unlock the mutex") - } - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.lock().unwrap(); - let _guard = MutexGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_lock() { - Ok(_) => panic!("panicking in a MutexGuard::try_map closure should poison the Mutex"), - Err(TryLockError::WouldBlock) => { - panic!("panicking in a MutexGuard::try_map closure should unlock the mutex") - } - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.lock().unwrap(); - let guard = MutexGuard::map::<(), _>(guard, |val| val); - let _guard = MappedMutexGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_lock() { - Ok(_) => panic!("panicking in a MappedMutexGuard::map closure should poison the Mutex"), - Err(TryLockError::WouldBlock) => { - panic!("panicking in a MappedMutexGuard::map closure should unlock the mutex") - } - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.lock().unwrap(); - let guard = MutexGuard::map::<(), _>(guard, |val| val); - let _guard = MappedMutexGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_lock() { - Ok(_) => panic!("panicking in a MappedMutexGuard::try_map closure should poison the Mutex"), - Err(TryLockError::WouldBlock) => { - panic!("panicking in a MappedMutexGuard::try_map closure should unlock the mutex") - } - Err(TryLockError::Poisoned(_)) => {} - } - - drop(lock); -} diff --git a/library/std/src/sync/poison/once.rs b/library/std/src/sync/poison/once.rs index 27db4b634fb..f78d88ac6d8 100644 --- a/library/std/src/sync/poison/once.rs +++ b/library/std/src/sync/poison/once.rs @@ -3,9 +3,6 @@ //! This primitive is meant to be used to run one-time initialization. An //! example use case would be for initializing an FFI library. -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod tests; - use crate::fmt; use crate::panic::{RefUnwindSafe, UnwindSafe}; use crate::sys::sync as sys; diff --git a/library/std/src/sync/poison/once/tests.rs b/library/std/src/sync/poison/once/tests.rs deleted file mode 100644 index ce96468aeb6..00000000000 --- a/library/std/src/sync/poison/once/tests.rs +++ /dev/null @@ -1,162 +0,0 @@ -use super::Once; -use crate::sync::atomic::AtomicBool; -use crate::sync::atomic::Ordering::Relaxed; -use crate::sync::mpsc::channel; -use crate::time::Duration; -use crate::{panic, thread}; - -#[test] -fn smoke_once() { - static O: Once = Once::new(); - let mut a = 0; - O.call_once(|| a += 1); - assert_eq!(a, 1); - O.call_once(|| a += 1); - assert_eq!(a, 1); -} - -#[test] -fn stampede_once() { - static O: Once = Once::new(); - static mut RUN: bool = false; - - let (tx, rx) = channel(); - for _ in 0..10 { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..4 { - thread::yield_now() - } - unsafe { - O.call_once(|| { - assert!(!RUN); - RUN = true; - }); - assert!(RUN); - } - tx.send(()).unwrap(); - }); - } - - unsafe { - O.call_once(|| { - assert!(!RUN); - RUN = true; - }); - assert!(RUN); - } - - for _ in 0..10 { - rx.recv().unwrap(); - } -} - -#[test] -fn poison_bad() { - static O: Once = Once::new(); - - // poison the once - let t = panic::catch_unwind(|| { - O.call_once(|| panic!()); - }); - assert!(t.is_err()); - - // poisoning propagates - let t = panic::catch_unwind(|| { - O.call_once(|| {}); - }); - assert!(t.is_err()); - - // we can subvert poisoning, however - let mut called = false; - O.call_once_force(|p| { - called = true; - assert!(p.is_poisoned()) - }); - assert!(called); - - // once any success happens, we stop propagating the poison - O.call_once(|| {}); -} - -#[test] -fn wait_for_force_to_finish() { - static O: Once = Once::new(); - - // poison the once - let t = panic::catch_unwind(|| { - O.call_once(|| panic!()); - }); - assert!(t.is_err()); - - // make sure someone's waiting inside the once via a force - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - let t1 = thread::spawn(move || { - O.call_once_force(|p| { - assert!(p.is_poisoned()); - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - }); - }); - - rx1.recv().unwrap(); - - // put another waiter on the once - let t2 = thread::spawn(|| { - let mut called = false; - O.call_once(|| { - called = true; - }); - assert!(!called); - }); - - tx2.send(()).unwrap(); - - assert!(t1.join().is_ok()); - assert!(t2.join().is_ok()); -} - -#[test] -fn wait() { - for _ in 0..50 { - let val = AtomicBool::new(false); - let once = Once::new(); - - thread::scope(|s| { - for _ in 0..4 { - s.spawn(|| { - once.wait(); - assert!(val.load(Relaxed)); - }); - } - - once.call_once(|| val.store(true, Relaxed)); - }); - } -} - -#[test] -fn wait_on_poisoned() { - let once = Once::new(); - - panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err(); - panic::catch_unwind(|| once.wait()).unwrap_err(); -} - -#[test] -fn wait_force_on_poisoned() { - let once = Once::new(); - - thread::scope(|s| { - panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err(); - - s.spawn(|| { - thread::sleep(Duration::from_millis(100)); - - once.call_once_force(|_| {}); - }); - - once.wait_force(); - }) -} diff --git a/library/std/src/sync/poison/rwlock.rs b/library/std/src/sync/poison/rwlock.rs index 1519baf99a8..f9d9321f5f2 100644 --- a/library/std/src/sync/poison/rwlock.rs +++ b/library/std/src/sync/poison/rwlock.rs @@ -1,6 +1,3 @@ -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod tests; - use crate::cell::UnsafeCell; use crate::fmt; use crate::marker::PhantomData; diff --git a/library/std/src/sync/poison/rwlock/tests.rs b/library/std/src/sync/poison/rwlock/tests.rs deleted file mode 100644 index 057c2f1a5d7..00000000000 --- a/library/std/src/sync/poison/rwlock/tests.rs +++ /dev/null @@ -1,729 +0,0 @@ -use rand::Rng; - -use crate::fmt::Debug; -use crate::ops::FnMut; -use crate::panic::{self, AssertUnwindSafe}; -use crate::sync::atomic::{AtomicUsize, Ordering}; -use crate::sync::mpsc::channel; -use crate::sync::{ - Arc, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, - TryLockError, -}; -use crate::{hint, mem, thread}; - -#[derive(Eq, PartialEq, Debug)] -struct NonCopy(i32); - -#[derive(Eq, PartialEq, Debug)] -struct NonCopyNeedsDrop(i32); - -impl Drop for NonCopyNeedsDrop { - fn drop(&mut self) { - hint::black_box(()); - } -} - -#[test] -fn test_needs_drop() { - assert!(!mem::needs_drop::()); - assert!(mem::needs_drop::()); -} - -#[derive(Clone, Eq, PartialEq, Debug)] -struct Cloneable(i32); - -#[test] -fn smoke() { - let l = RwLock::new(()); - drop(l.read().unwrap()); - drop(l.write().unwrap()); - drop((l.read().unwrap(), l.read().unwrap())); - drop(l.write().unwrap()); -} - -#[test] -// FIXME: On macOS we use a provenance-incorrect implementation and Miri -// catches that issue with a chance of around 1/1000. -// See for details. -#[cfg_attr(all(miri, target_os = "macos"), ignore)] -fn frob() { - const N: u32 = 10; - const M: usize = if cfg!(miri) { 100 } else { 1000 }; - - let r = Arc::new(RwLock::new(())); - - let (tx, rx) = channel::<()>(); - for _ in 0..N { - let tx = tx.clone(); - let r = r.clone(); - thread::spawn(move || { - let mut rng = crate::test_helpers::test_rng(); - for _ in 0..M { - if rng.gen_bool(1.0 / (N as f64)) { - drop(r.write().unwrap()); - } else { - drop(r.read().unwrap()); - } - } - drop(tx); - }); - } - drop(tx); - let _ = rx.recv(); -} - -#[test] -fn test_rw_arc_poison_wr() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.write().unwrap(); - panic!(); - }) - .join(); - assert!(arc.read().is_err()); -} - -#[test] -fn test_rw_arc_poison_mapped_w_r() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let lock = arc2.write().unwrap(); - let _lock = RwLockWriteGuard::map(lock, |val| val); - panic!(); - }) - .join(); - assert!(arc.read().is_err()); -} - -#[test] -fn test_rw_arc_poison_ww() { - let arc = Arc::new(RwLock::new(1)); - assert!(!arc.is_poisoned()); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.write().unwrap(); - panic!(); - }) - .join(); - assert!(arc.write().is_err()); - assert!(arc.is_poisoned()); -} - -#[test] -fn test_rw_arc_poison_mapped_w_w() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let lock = arc2.write().unwrap(); - let _lock = RwLockWriteGuard::map(lock, |val| val); - panic!(); - }) - .join(); - assert!(arc.write().is_err()); - assert!(arc.is_poisoned()); -} - -#[test] -fn test_rw_arc_no_poison_rr() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.read().unwrap(); - panic!(); - }) - .join(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 1); -} - -#[test] -fn test_rw_arc_no_poison_mapped_r_r() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let lock = arc2.read().unwrap(); - let _lock = RwLockReadGuard::map(lock, |val| val); - panic!(); - }) - .join(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 1); -} - -#[test] -fn test_rw_arc_no_poison_rw() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.read().unwrap(); - panic!() - }) - .join(); - let lock = arc.write().unwrap(); - assert_eq!(*lock, 1); -} - -#[test] -fn test_rw_arc_no_poison_mapped_r_w() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let lock = arc2.read().unwrap(); - let _lock = RwLockReadGuard::map(lock, |val| val); - panic!(); - }) - .join(); - let lock = arc.write().unwrap(); - assert_eq!(*lock, 1); -} - -#[test] -fn test_rw_arc() { - let arc = Arc::new(RwLock::new(0)); - let arc2 = arc.clone(); - let (tx, rx) = channel(); - - thread::spawn(move || { - let mut lock = arc2.write().unwrap(); - for _ in 0..10 { - let tmp = *lock; - *lock = -1; - thread::yield_now(); - *lock = tmp + 1; - } - tx.send(()).unwrap(); - }); - - // Readers try to catch the writer in the act - let mut children = Vec::new(); - for _ in 0..5 { - let arc3 = arc.clone(); - children.push(thread::spawn(move || { - let lock = arc3.read().unwrap(); - assert!(*lock >= 0); - })); - } - - // Wait for children to pass their asserts - for r in children { - assert!(r.join().is_ok()); - } - - // Wait for writer to finish - rx.recv().unwrap(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 10); -} - -#[test] -fn test_rw_arc_access_in_unwind() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - let mut lock = self.i.write().unwrap(); - *lock += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 2); -} - -#[test] -fn test_rwlock_unsized() { - let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); - { - let b = &mut *rw.write().unwrap(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*rw.read().unwrap(), comp); -} - -#[test] -fn test_rwlock_try_write() { - let lock = RwLock::new(0isize); - let read_guard = lock.read().unwrap(); - - let write_result = lock.try_write(); - match write_result { - Err(TryLockError::WouldBlock) => (), - Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), - Err(_) => assert!(false, "unexpected error"), - } - - drop(read_guard); - let mapped_read_guard = RwLockReadGuard::map(lock.read().unwrap(), |_| &()); - - let write_result = lock.try_write(); - match write_result { - Err(TryLockError::WouldBlock) => (), - Ok(_) => assert!(false, "try_write should not succeed while mapped_read_guard is in scope"), - Err(_) => assert!(false, "unexpected error"), - } - - drop(mapped_read_guard); -} - -fn new_poisoned_rwlock(value: T) -> RwLock { - let lock = RwLock::new(value); - - let catch_unwind_result = panic::catch_unwind(AssertUnwindSafe(|| { - let _guard = lock.write().unwrap(); - - panic!("test panic to poison RwLock"); - })); - - assert!(catch_unwind_result.is_err()); - assert!(lock.is_poisoned()); - - lock -} - -#[test] -fn test_into_inner() { - let m = RwLock::new(NonCopy(10)); - assert_eq!(m.into_inner().unwrap(), NonCopy(10)); -} - -#[test] -fn test_into_inner_drop() { - struct Foo(Arc); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = RwLock::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner().unwrap(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); -} - -#[test] -fn test_into_inner_poison() { - let m = new_poisoned_rwlock(NonCopy(10)); - - match m.into_inner() { - Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), - Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {x:?}"), - } -} - -#[test] -fn test_get_cloned() { - let m = RwLock::new(Cloneable(10)); - - assert_eq!(m.get_cloned().unwrap(), Cloneable(10)); -} - -#[test] -fn test_get_cloned_poison() { - let m = new_poisoned_rwlock(Cloneable(10)); - - match m.get_cloned() { - Err(e) => assert_eq!(e.into_inner(), ()), - Ok(x) => panic!("get of poisoned RwLock is Ok: {x:?}"), - } -} - -#[test] -fn test_get_mut() { - let mut m = RwLock::new(NonCopy(10)); - *m.get_mut().unwrap() = NonCopy(20); - assert_eq!(m.into_inner().unwrap(), NonCopy(20)); -} - -#[test] -fn test_get_mut_poison() { - let mut m = new_poisoned_rwlock(NonCopy(10)); - - match m.get_mut() { - Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), - Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {x:?}"), - } -} - -#[test] -fn test_set() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = RwLock::new(init()); - - assert_eq!(*m.read().unwrap(), init()); - m.set(value()).unwrap(); - assert_eq!(*m.read().unwrap(), value()); - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_set_poison() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = new_poisoned_rwlock(init()); - - match m.set(value()) { - Err(e) => { - assert_eq!(e.into_inner(), value()); - assert_eq!(m.into_inner().unwrap_err().into_inner(), init()); - } - Ok(x) => panic!("set of poisoned RwLock is Ok: {x:?}"), - } - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_replace() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = RwLock::new(init()); - - assert_eq!(*m.read().unwrap(), init()); - assert_eq!(m.replace(value()).unwrap(), init()); - assert_eq!(*m.read().unwrap(), value()); - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_replace_poison() { - fn inner(mut init: impl FnMut() -> T, mut value: impl FnMut() -> T) - where - T: Debug + Eq, - { - let m = new_poisoned_rwlock(init()); - - match m.replace(value()) { - Err(e) => { - assert_eq!(e.into_inner(), value()); - assert_eq!(m.into_inner().unwrap_err().into_inner(), init()); - } - Ok(x) => panic!("replace of poisoned RwLock is Ok: {x:?}"), - } - } - - inner(|| NonCopy(10), || NonCopy(20)); - inner(|| NonCopyNeedsDrop(10), || NonCopyNeedsDrop(20)); -} - -#[test] -fn test_read_guard_covariance() { - fn do_stuff<'a>(_: RwLockReadGuard<'_, &'a i32>, _: &'a i32) {} - let j: i32 = 5; - let lock = RwLock::new(&j); - { - let i = 6; - do_stuff(lock.read().unwrap(), &i); - } - drop(lock); -} - -#[test] -fn test_mapped_read_guard_covariance() { - fn do_stuff<'a>(_: MappedRwLockReadGuard<'_, &'a i32>, _: &'a i32) {} - let j: i32 = 5; - let lock = RwLock::new((&j, &j)); - { - let i = 6; - let guard = lock.read().unwrap(); - let guard = RwLockReadGuard::map(guard, |(val, _val)| val); - do_stuff(guard, &i); - } - drop(lock); -} - -#[test] -fn test_mapping_mapped_guard() { - let arr = [0; 4]; - let mut lock = RwLock::new(arr); - let guard = lock.write().unwrap(); - let guard = RwLockWriteGuard::map(guard, |arr| &mut arr[..2]); - let mut guard = MappedRwLockWriteGuard::map(guard, |slice| &mut slice[1..]); - assert_eq!(guard.len(), 1); - guard[0] = 42; - drop(guard); - assert_eq!(*lock.get_mut().unwrap(), [0, 42, 0, 0]); - - let guard = lock.read().unwrap(); - let guard = RwLockReadGuard::map(guard, |arr| &arr[..2]); - let guard = MappedRwLockReadGuard::map(guard, |slice| &slice[1..]); - assert_eq!(*guard, [42]); - drop(guard); - assert_eq!(*lock.get_mut().unwrap(), [0, 42, 0, 0]); -} - -#[test] -fn panic_while_mapping_read_unlocked_no_poison() { - let lock = RwLock::new(()); - - let _ = panic::catch_unwind(|| { - let guard = lock.read().unwrap(); - let _guard = RwLockReadGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => {} - Err(TryLockError::WouldBlock) => { - panic!("panicking in a RwLockReadGuard::map closure should release the read lock") - } - Err(TryLockError::Poisoned(_)) => { - panic!("panicking in a RwLockReadGuard::map closure should not poison the RwLock") - } - } - - let _ = panic::catch_unwind(|| { - let guard = lock.read().unwrap(); - let _guard = RwLockReadGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => {} - Err(TryLockError::WouldBlock) => { - panic!("panicking in a RwLockReadGuard::try_map closure should release the read lock") - } - Err(TryLockError::Poisoned(_)) => { - panic!("panicking in a RwLockReadGuard::try_map closure should not poison the RwLock") - } - } - - let _ = panic::catch_unwind(|| { - let guard = lock.read().unwrap(); - let guard = RwLockReadGuard::map::<(), _>(guard, |val| val); - let _guard = MappedRwLockReadGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => {} - Err(TryLockError::WouldBlock) => { - panic!("panicking in a MappedRwLockReadGuard::map closure should release the read lock") - } - Err(TryLockError::Poisoned(_)) => { - panic!("panicking in a MappedRwLockReadGuard::map closure should not poison the RwLock") - } - } - - let _ = panic::catch_unwind(|| { - let guard = lock.read().unwrap(); - let guard = RwLockReadGuard::map::<(), _>(guard, |val| val); - let _guard = MappedRwLockReadGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => {} - Err(TryLockError::WouldBlock) => panic!( - "panicking in a MappedRwLockReadGuard::try_map closure should release the read lock" - ), - Err(TryLockError::Poisoned(_)) => panic!( - "panicking in a MappedRwLockReadGuard::try_map closure should not poison the RwLock" - ), - } - - drop(lock); -} - -#[test] -fn panic_while_mapping_write_unlocked_poison() { - let lock = RwLock::new(()); - - let _ = panic::catch_unwind(|| { - let guard = lock.write().unwrap(); - let _guard = RwLockWriteGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => panic!("panicking in a RwLockWriteGuard::map closure should poison the RwLock"), - Err(TryLockError::WouldBlock) => { - panic!("panicking in a RwLockWriteGuard::map closure should release the write lock") - } - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.write().unwrap(); - let _guard = RwLockWriteGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => { - panic!("panicking in a RwLockWriteGuard::try_map closure should poison the RwLock") - } - Err(TryLockError::WouldBlock) => { - panic!("panicking in a RwLockWriteGuard::try_map closure should release the write lock") - } - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.write().unwrap(); - let guard = RwLockWriteGuard::map::<(), _>(guard, |val| val); - let _guard = MappedRwLockWriteGuard::map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => { - panic!("panicking in a MappedRwLockWriteGuard::map closure should poison the RwLock") - } - Err(TryLockError::WouldBlock) => panic!( - "panicking in a MappedRwLockWriteGuard::map closure should release the write lock" - ), - Err(TryLockError::Poisoned(_)) => {} - } - - let _ = panic::catch_unwind(|| { - let guard = lock.write().unwrap(); - let guard = RwLockWriteGuard::map::<(), _>(guard, |val| val); - let _guard = MappedRwLockWriteGuard::try_map::<(), _>(guard, |_| panic!()); - }); - - match lock.try_write() { - Ok(_) => panic!( - "panicking in a MappedRwLockWriteGuard::try_map closure should poison the RwLock" - ), - Err(TryLockError::WouldBlock) => panic!( - "panicking in a MappedRwLockWriteGuard::try_map closure should release the write lock" - ), - Err(TryLockError::Poisoned(_)) => {} - } - - drop(lock); -} - -#[test] -fn test_downgrade_basic() { - let r = RwLock::new(()); - - let write_guard = r.write().unwrap(); - let _read_guard = RwLockWriteGuard::downgrade(write_guard); -} - -#[test] -// FIXME: On macOS we use a provenance-incorrect implementation and Miri catches that issue. -// See for details. -#[cfg_attr(all(miri, target_os = "macos"), ignore)] -fn test_downgrade_observe() { - // Taken from the test `test_rwlock_downgrade` from: - // https://github.com/Amanieu/parking_lot/blob/master/src/rwlock.rs - - const W: usize = 20; - const N: usize = if cfg!(miri) { 40 } else { 100 }; - - // This test spawns `W` writer threads, where each will increment a counter `N` times, ensuring - // that the value they wrote has not changed after downgrading. - - let rw = Arc::new(RwLock::new(0)); - - // Spawn the writers that will do `W * N` operations and checks. - let handles: Vec<_> = (0..W) - .map(|_| { - let rw = rw.clone(); - thread::spawn(move || { - for _ in 0..N { - // Increment the counter. - let mut write_guard = rw.write().unwrap(); - *write_guard += 1; - let cur_val = *write_guard; - - // Downgrade the lock to read mode, where the value protected cannot be modified. - let read_guard = RwLockWriteGuard::downgrade(write_guard); - assert_eq!(cur_val, *read_guard); - } - }) - }) - .collect(); - - for handle in handles { - handle.join().unwrap(); - } - - assert_eq!(*rw.read().unwrap(), W * N); -} - -#[test] -// FIXME: On macOS we use a provenance-incorrect implementation and Miri catches that issue. -// See for details. -#[cfg_attr(all(miri, target_os = "macos"), ignore)] -fn test_downgrade_atomic() { - const NEW_VALUE: i32 = -1; - - // This test checks that `downgrade` is atomic, meaning as soon as a write lock has been - // downgraded, the lock must be in read mode and no other threads can take the write lock to - // modify the protected value. - - // `W` is the number of evil writer threads. - const W: usize = 20; - let rwlock = Arc::new(RwLock::new(0)); - - // Spawns many evil writer threads that will try and write to the locked value before the - // initial writer (who has the exclusive lock) can read after it downgrades. - // If the `RwLock` behaves correctly, then the initial writer should read the value it wrote - // itself as no other thread should be able to mutate the protected value. - - // Put the lock in write mode, causing all future threads trying to access this go to sleep. - let mut main_write_guard = rwlock.write().unwrap(); - - // Spawn all of the evil writer threads. They will each increment the protected value by 1. - let handles: Vec<_> = (0..W) - .map(|_| { - let rwlock = rwlock.clone(); - thread::spawn(move || { - // Will go to sleep since the main thread initially has the write lock. - let mut evil_guard = rwlock.write().unwrap(); - *evil_guard += 1; - }) - }) - .collect(); - - // Wait for a good amount of time so that evil threads go to sleep. - // Note: this is not strictly necessary... - let eternity = crate::time::Duration::from_millis(42); - thread::sleep(eternity); - - // Once everyone is asleep, set the value to `NEW_VALUE`. - *main_write_guard = NEW_VALUE; - - // Atomically downgrade the write guard into a read guard. - let main_read_guard = RwLockWriteGuard::downgrade(main_write_guard); - - // If the above is not atomic, then it would be possible for an evil thread to get in front of - // this read and change the value to be non-negative. - assert_eq!(*main_read_guard, NEW_VALUE, "`downgrade` was not atomic"); - - // Drop the main read guard and allow the evil writer threads to start incrementing. - drop(main_read_guard); - - for handle in handles { - handle.join().unwrap(); - } - - let final_check = rwlock.read().unwrap(); - assert_eq!(*final_check, W as i32 + NEW_VALUE); -} diff --git a/library/std/src/sync/reentrant_lock.rs b/library/std/src/sync/reentrant_lock.rs index 0140e0d2129..e009eb410ef 100644 --- a/library/std/src/sync/reentrant_lock.rs +++ b/library/std/src/sync/reentrant_lock.rs @@ -1,6 +1,3 @@ -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] -mod tests; - use cfg_if::cfg_if; use crate::cell::UnsafeCell; @@ -324,7 +321,10 @@ impl ReentrantLock { /// Otherwise, an RAII guard is returned. /// /// This function does not block. - pub(crate) fn try_lock(&self) -> Option> { + // FIXME maybe make it a public part of the API? + #[unstable(issue = "none", feature = "std_internals")] + #[doc(hidden)] + pub fn try_lock(&self) -> Option> { let this_thread = current_id(); // Safety: We only touch lock_count when we own the inner mutex. // Additionally, we only call `self.owner.set()` while holding diff --git a/library/std/src/sync/reentrant_lock/tests.rs b/library/std/src/sync/reentrant_lock/tests.rs deleted file mode 100644 index aeef0289d28..00000000000 --- a/library/std/src/sync/reentrant_lock/tests.rs +++ /dev/null @@ -1,53 +0,0 @@ -use super::ReentrantLock; -use crate::cell::RefCell; -use crate::sync::Arc; -use crate::thread; - -#[test] -fn smoke() { - let l = ReentrantLock::new(()); - { - let a = l.lock(); - { - let b = l.lock(); - { - let c = l.lock(); - assert_eq!(*c, ()); - } - assert_eq!(*b, ()); - } - assert_eq!(*a, ()); - } -} - -#[test] -fn is_mutex() { - let l = Arc::new(ReentrantLock::new(RefCell::new(0))); - let l2 = l.clone(); - let lock = l.lock(); - let child = thread::spawn(move || { - let lock = l2.lock(); - assert_eq!(*lock.borrow(), 4950); - }); - for i in 0..100 { - let lock = l.lock(); - *lock.borrow_mut() += i; - } - drop(lock); - child.join().unwrap(); -} - -#[test] -fn trylock_works() { - let l = Arc::new(ReentrantLock::new(())); - let l2 = l.clone(); - let _lock = l.try_lock(); - let _lock2 = l.try_lock(); - thread::spawn(move || { - let lock = l2.try_lock(); - assert!(lock.is_none()); - }) - .join() - .unwrap(); - let _lock3 = l.try_lock(); -} -- cgit 1.4.1-3-g733a5