diff options
Diffstat (limited to 'library/std/src/sync')
| -rw-r--r-- | library/std/src/sync/barrier.rs | 42 | ||||
| -rw-r--r-- | library/std/src/sync/barrier/tests.rs | 35 | ||||
| -rw-r--r-- | library/std/src/sync/condvar.rs | 218 | ||||
| -rw-r--r-- | library/std/src/sync/condvar/tests.rs | 211 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/mod.rs | 1367 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/mpsc_queue.rs | 54 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/mpsc_queue/tests.rs | 47 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/spsc_queue.rs | 108 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/spsc_queue/tests.rs | 101 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/sync_tests.rs | 647 | ||||
| -rw-r--r-- | library/std/src/sync/mpsc/tests.rs | 706 | ||||
| -rw-r--r-- | library/std/src/sync/mutex.rs | 245 | ||||
| -rw-r--r-- | library/std/src/sync/mutex/tests.rs | 238 | ||||
| -rw-r--r-- | library/std/src/sync/once.rs | 123 | ||||
| -rw-r--r-- | library/std/src/sync/once/tests.rs | 116 | ||||
| -rw-r--r-- | library/std/src/sync/rwlock.rs | 254 | ||||
| -rw-r--r-- | library/std/src/sync/rwlock/tests.rs | 247 |
17 files changed, 2375 insertions, 2384 deletions
diff --git a/library/std/src/sync/barrier.rs b/library/std/src/sync/barrier.rs index 23c989fd2fd..5d434791e9a 100644 --- a/library/std/src/sync/barrier.rs +++ b/library/std/src/sync/barrier.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use crate::fmt; use crate::sync::{Condvar, Mutex}; @@ -174,42 +177,3 @@ impl BarrierWaitResult { self.0 } } - -#[cfg(test)] -mod tests { - use crate::sync::mpsc::{channel, TryRecvError}; - use crate::sync::{Arc, Barrier}; - use crate::thread; - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn test_barrier() { - const N: usize = 10; - - let barrier = Arc::new(Barrier::new(N)); - let (tx, rx) = channel(); - - for _ in 0..N - 1 { - let c = barrier.clone(); - let tx = tx.clone(); - thread::spawn(move || { - tx.send(c.wait().is_leader()).unwrap(); - }); - } - - // At this point, all spawned threads should be blocked, - // so we shouldn't get anything from the port - assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); - - let mut leader_found = barrier.wait().is_leader(); - - // Now, the barrier is cleared and we should get data. - for _ in 0..N - 1 { - if rx.recv().unwrap() { - assert!(!leader_found); - leader_found = true; - } - } - assert!(leader_found); - } -} diff --git a/library/std/src/sync/barrier/tests.rs b/library/std/src/sync/barrier/tests.rs new file mode 100644 index 00000000000..834a3e75158 --- /dev/null +++ b/library/std/src/sync/barrier/tests.rs @@ -0,0 +1,35 @@ +use crate::sync::mpsc::{channel, TryRecvError}; +use crate::sync::{Arc, Barrier}; +use crate::thread; + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn test_barrier() { + const N: usize = 10; + + let barrier = Arc::new(Barrier::new(N)); + let (tx, rx) = channel(); + + for _ in 0..N - 1 { + let c = barrier.clone(); + let tx = tx.clone(); + thread::spawn(move || { + tx.send(c.wait().is_leader()).unwrap(); + }); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get anything from the port + assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); + + let mut leader_found = barrier.wait().is_leader(); + + // Now, the barrier is cleared and we should get data. + for _ in 0..N - 1 { + if rx.recv().unwrap() { + assert!(!leader_found); + leader_found = true; + } + } + assert!(leader_found); +} diff --git a/library/std/src/sync/condvar.rs b/library/std/src/sync/condvar.rs index 4efd86aa3ed..651f813b3e2 100644 --- a/library/std/src/sync/condvar.rs +++ b/library/std/src/sync/condvar.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use crate::fmt; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::{mutex, MutexGuard, PoisonError}; @@ -598,218 +601,3 @@ impl Drop for Condvar { unsafe { self.inner.destroy() } } } - -#[cfg(test)] -mod tests { - use crate::sync::atomic::{AtomicBool, Ordering}; - use crate::sync::mpsc::channel; - use crate::sync::{Arc, Condvar, Mutex}; - use crate::thread; - use crate::time::Duration; - - #[test] - fn smoke() { - let c = Condvar::new(); - c.notify_one(); - c.notify_all(); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn notify_one() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let g = m.lock().unwrap(); - let _t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - c2.notify_one(); - }); - let g = c.wait(g).unwrap(); - drop(g); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn notify_all() { - const N: usize = 10; - - let data = Arc::new((Mutex::new(0), Condvar::new())); - let (tx, rx) = channel(); - for _ in 0..N { - let data = data.clone(); - let tx = tx.clone(); - thread::spawn(move || { - let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock().unwrap(); - *cnt += 1; - if *cnt == N { - tx.send(()).unwrap(); - } - while *cnt != 0 { - cnt = cond.wait(cnt).unwrap(); - } - tx.send(()).unwrap(); - }); - } - drop(tx); - - let &(ref lock, ref cond) = &*data; - rx.recv().unwrap(); - let mut cnt = lock.lock().unwrap(); - *cnt = 0; - cond.notify_all(); - drop(cnt); - - for _ in 0..N { - rx.recv().unwrap(); - } - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_while() { - let pair = Arc::new((Mutex::new(false), Condvar::new())); - let pair2 = pair.clone(); - - // Inside of our lock, spawn a new thread, and then wait for it to start. - thread::spawn(move || { - let &(ref lock, ref cvar) = &*pair2; - let mut started = lock.lock().unwrap(); - *started = true; - // We notify the condvar that the value has changed. - cvar.notify_one(); - }); - - // Wait for the thread to start up. - let &(ref lock, ref cvar) = &*pair; - let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); - assert!(*guard.unwrap()); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_wait() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - loop { - let g = m.lock().unwrap(); - let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); - // spurious wakeups mean this isn't necessarily true - // so execute test again, if not timeout - if !no_timeout.timed_out() { - continue; - } - - break; - } - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_while_wait() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); - // no spurious wakeups. ensure it timed-out - assert!(wait.timed_out()); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_while_instant_satisfy() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - let g = m.lock().unwrap(); - let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); - // ensure it didn't time-out even if we were not given any time. - assert!(!wait.timed_out()); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_while_wake() { - let pair = Arc::new((Mutex::new(false), Condvar::new())); - let pair_copy = pair.clone(); - - let &(ref m, ref c) = &*pair; - let g = m.lock().unwrap(); - let _t = thread::spawn(move || { - let &(ref lock, ref cvar) = &*pair_copy; - let mut started = lock.lock().unwrap(); - thread::sleep(Duration::from_millis(1)); - *started = true; - cvar.notify_one(); - }); - let (g2, wait) = c - .wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) - .unwrap(); - // ensure it didn't time-out even if we were not given any time. - assert!(!wait.timed_out()); - assert!(*g2); - } - - #[test] - #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_wake() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - loop { - let g = m.lock().unwrap(); - - let c2 = c.clone(); - let m2 = m.clone(); - - let notified = Arc::new(AtomicBool::new(false)); - let notified_copy = notified.clone(); - - let t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - thread::sleep(Duration::from_millis(1)); - notified_copy.store(true, Ordering::SeqCst); - c2.notify_one(); - }); - let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); - assert!(!timeout_res.timed_out()); - // spurious wakeups mean this isn't necessarily true - // so execute test again, if not notified - if !notified.load(Ordering::SeqCst) { - t.join().unwrap(); - continue; - } - drop(g); - - t.join().unwrap(); - - break; - } - } - - #[test] - #[should_panic] - #[cfg_attr(target_os = "emscripten", ignore)] - fn two_mutexes() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock().unwrap(); - let _t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - c2.notify_one(); - }); - g = c.wait(g).unwrap(); - drop(g); - - let m = Mutex::new(()); - let _ = c.wait(m.lock().unwrap()).unwrap(); - } -} diff --git a/library/std/src/sync/condvar/tests.rs b/library/std/src/sync/condvar/tests.rs new file mode 100644 index 00000000000..86d099ee3a1 --- /dev/null +++ b/library/std/src/sync/condvar/tests.rs @@ -0,0 +1,211 @@ +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, Condvar, Mutex}; +use crate::thread; +use crate::time::Duration; + +#[test] +fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + c2.notify_one(); + }); + let g = c.wait(g).unwrap(); + drop(g); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock().unwrap(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cnt = cond.wait(cnt).unwrap(); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock().unwrap(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_while() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); + assert!(*guard.unwrap()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not timeout + if !no_timeout.timed_out() { + continue; + } + + break; + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let &(ref m, ref c) = &*pair; + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; + let mut started = lock.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + *started = true; + cvar.notify_one(); + }); + let (g2, wait) = c + .wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) + .unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_wake() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + + let c2 = c.clone(); + let m2 = m.clone(); + + let notified = Arc::new(AtomicBool::new(false)); + let notified_copy = notified.clone(); + + let t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + notified_copy.store(true, Ordering::SeqCst); + c2.notify_one(); + }); + let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); + assert!(!timeout_res.timed_out()); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not notified + if !notified.load(Ordering::SeqCst) { + t.join().unwrap(); + continue; + } + drop(g); + + t.join().unwrap(); + + break; + } +} + +#[test] +#[should_panic] +#[cfg_attr(target_os = "emscripten", ignore)] +fn two_mutexes() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + c2.notify_one(); + }); + g = c.wait(g).unwrap(); + drop(g); + + let m = Mutex::new(()); + let _ = c.wait(m.lock().unwrap()).unwrap(); +} diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs index ac83017d9e1..073f969bbe2 100644 --- a/library/std/src/sync/mpsc/mod.rs +++ b/library/std/src/sync/mpsc/mod.rs @@ -108,6 +108,12 @@ #![stable(feature = "rust1", since = "1.0.0")] +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +#[cfg(all(test, not(target_os = "emscripten")))] +mod sync_tests; + // A description of how Rust's channel implementation works // // Channels are supposed to be the basic building block for all other @@ -1606,1364 +1612,3 @@ impl From<RecvError> for RecvTimeoutError { } } } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use super::*; - use crate::env; - use crate::thread; - use crate::time::{Duration, Instant}; - - pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = channel::<Box<isize>>(); - tx.send(box 1).unwrap(); - } - - #[test] - fn drop_full_shared() { - let (tx, _rx) = channel::<Box<isize>>(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(box 1).unwrap(); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn smoke_port_gone() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(1).is_err()); - } - - #[test] - fn smoke_shared_port_gone() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(1).is_err()) - } - - #[test] - fn smoke_shared_port_gone2() { - let (tx, rx) = channel::<i32>(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); - } - - #[test] - fn port_gone_concurrent() { - let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} - } - - #[test] - fn port_gone_concurrent_shared() { - let (tx, rx) = channel::<i32>(); - let tx2 = tx.clone(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - } - - #[test] - fn smoke_chan_gone() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); - } - - #[test] - fn chan_gone_concurrent() { - let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} - } - - #[test] - fn stress() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - for _ in 0..10000 { - assert_eq!(rx.recv().unwrap(), 1); - } - t.join().ok().expect("thread panicked"); - } - - #[test] - fn stress_shared() { - const AMT: u32 = 10000; - const NTHREADS: u32 = 8; - let (tx, rx) = channel::<i32>(); - - let t = thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - drop(tx); - t.join().ok().expect("thread panicked"); - } - - #[test] - fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move || { - tx1.send(()).unwrap(); - for _ in 0..40 { - assert_eq!(rx2.recv().unwrap(), 1); - } - }); - rx1.recv().unwrap(); - let t2 = thread::spawn(move || { - for _ in 0..40 { - tx2.send(1).unwrap(); - } - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); - } - - #[test] - fn recv_from_outside_runtime() { - let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move || { - for _ in 0..40 { - assert_eq!(rx.recv().unwrap(), 1); - } - }); - for _ in 0..40 { - tx.send(1).unwrap(); - } - t.join().ok().expect("thread panicked"); - } - - #[test] - fn no_runtime() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move || { - assert_eq!(rx1.recv().unwrap(), 1); - tx2.send(2).unwrap(); - }); - let t2 = thread::spawn(move || { - tx1.send(1).unwrap(); - assert_eq!(rx2.recv().unwrap(), 2); - }); - t1.join().ok().expect("thread panicked"); - t2.join().ok().expect("thread panicked"); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::<i32>(); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::<i32>(); - drop(tx); - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::<Box<i32>>(); - drop(rx); - assert!(tx.send(box 0).is_err()); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = thread::spawn(move || { - let (tx, rx) = channel::<i32>(); - drop(tx); - rx.recv().unwrap(); - }) - .join(); - // What is our res? - assert!(res.is_err()); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::<Box<i32>>(); - tx.send(box 10).unwrap(); - assert!(*rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::<i32>(); - assert!(tx.send(10).is_ok()); - assert!(rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::<i32>(); - drop(rx); - assert!(tx.send(10).is_err()); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::<i32>(); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::<i32>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::<i32>(); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::<i32>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(box 10).unwrap(); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move || { - drop(tx); - }); - let res = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }) - .join(); - assert!(res.is_err()); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move || { - drop(rx); - }); - drop(tx); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move || { - drop(rx); - }); - let _ = thread::spawn(move || { - tx.send(1).unwrap(); - }) - .join(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::<i32>(); - thread::spawn(move || { - let res = thread::spawn(move || { - rx.recv().unwrap(); - }) - .join(); - assert!(res.is_err()); - }); - let _t = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel::<Box<isize>>(); - let _t = thread::spawn(move || { - tx.send(box 10).unwrap(); - }); - assert!(*rx.recv().unwrap() == 10); - } - } - - #[test] - fn stream_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = channel(); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: Sender<Box<i32>>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - tx.send(box i).unwrap(); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - }); - } - } - } - - #[test] - fn oneshot_single_thread_recv_timeout() { - let (tx, rx) = channel(); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - } - - #[test] - fn stress_recv_timeout_two_threads() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - let timeout = Duration::from_millis(100); - - thread::spawn(move || { - for i in 0..stress { - if i % 2 == 0 { - thread::sleep(timeout * 2); - } - tx.send(1usize).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(timeout) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); - } - - #[test] - fn recv_timeout_upgrade() { - let (tx, rx) = channel::<()>(); - let timeout = Duration::from_millis(1); - let _tx_clone = tx.clone(); - - let start = Instant::now(); - assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); - assert!(Instant::now() >= start + timeout); - } - - #[test] - fn stress_recv_timeout_shared() { - let (tx, rx) = channel(); - let stress = stress_factor() + 100; - - for i in 0..stress { - let tx = tx.clone(); - thread::spawn(move || { - thread::sleep(Duration::from_millis(i as u64 * 10)); - tx.send(1usize).unwrap(); - }); - } - - drop(tx); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(n) => { - assert_eq!(n, 1usize); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, stress); - } - - #[test] - fn very_long_recv_timeout_wont_panic() { - let (tx, rx) = channel::<()>(); - let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); - thread::sleep(Duration::from_secs(1)); - assert!(tx.send(()).is_ok()); - assert_eq!(join_handle.join().unwrap(), Ok(())); - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in 0..10000 { - tx.send(()).unwrap(); - } - for _ in 0..10000 { - rx.recv().unwrap(); - } - } - - #[test] - fn shared_recv_timeout() { - let (tx, rx) = channel(); - let total = 5; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(()).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = channel::<i32>(); - let (total_tx, total_rx) = channel::<i32>(); - - let _t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = channel::<i32>(); - let (count_tx, count_rx) = channel(); - - let _t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - } - - #[test] - fn test_recv_try_iter() { - let (request_tx, request_rx) = channel(); - let (response_tx, response_rx) = channel(); - - // Request `x`s until we have `6`. - let t = thread::spawn(move || { - let mut count = 0; - loop { - for x in response_rx.try_iter() { - count += x; - if count == 6 { - return count; - } - } - request_tx.send(()).unwrap(); - } - }); - - for _ in request_rx.iter() { - if response_tx.send(2).is_err() { - break; - } - } - - assert_eq!(t.join().unwrap(), 6); - } - - #[test] - fn test_recv_into_iter_owned() { - let mut iter = { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - - rx.into_iter() - }; - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); - } - - #[test] - fn test_recv_into_iter_borrowed() { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - let mut iter = (&rx).into_iter(); - assert_eq!(iter.next().unwrap(), 1); - assert_eq!(iter.next().unwrap(), 2); - assert_eq!(iter.next().is_none(), true); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = channel::<i32>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); - } - - #[test] - fn issue_32114() { - let (tx, _) = channel(); - let _ = tx.send(123); - assert_eq!(tx.send(123), Err(SendError(123))); - } -} - -#[cfg(all(test, not(target_os = "emscripten")))] -mod sync_tests { - use super::*; - use crate::env; - use crate::thread; - use crate::time::Duration; - - pub fn stress_factor() -> usize { - match env::var("RUST_TEST_STRESS") { - Ok(val) => val.parse().unwrap(), - Err(..) => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = sync_channel::<Box<isize>>(1); - tx.send(box 1).unwrap(); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn recv_timeout() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); - tx.send(1).unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - tx.send(1).unwrap(); - }); - assert_eq!(rx.recv().unwrap(), 1); - } - - #[test] - fn smoke_port_gone() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - assert!(tx.send(1).is_err()); - } - - #[test] - fn smoke_shared_port_gone2() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - assert!(tx2.send(1).is_err()); - } - - #[test] - fn port_gone_concurrent() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() {} - } - - #[test] - fn port_gone_concurrent_shared() { - let (tx, rx) = sync_channel::<i32>(0); - let tx2 = tx.clone(); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - } - - #[test] - fn smoke_chan_gone() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn smoke_chan_gone_shared() { - let (tx, rx) = sync_channel::<()>(0); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - assert!(rx.recv().is_err()); - } - - #[test] - fn chan_gone_concurrent() { - let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move || { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }); - while rx.recv().is_ok() {} - } - - #[test] - fn stress() { - let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - for _ in 0..10000 { - assert_eq!(rx.recv().unwrap(), 1); - } - } - - #[test] - fn stress_recv_timeout_two_threads() { - let (tx, rx) = sync_channel::<i32>(0); - - thread::spawn(move || { - for _ in 0..10000 { - tx.send(1).unwrap(); - } - }); - - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(1)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, 10000); - } - - #[test] - fn stress_recv_timeout_shared() { - const AMT: u32 = 1000; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::<i32>(0); - let (dtx, drx) = sync_channel::<()>(0); - - thread::spawn(move || { - let mut recv_count = 0; - loop { - match rx.recv_timeout(Duration::from_millis(10)) { - Ok(v) => { - assert_eq!(v, 1); - recv_count += 1; - } - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => break, - } - } - - assert_eq!(recv_count, AMT * NTHREADS); - assert!(rx.try_recv().is_err()); - - dtx.send(()).unwrap(); - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - - drop(tx); - - drx.recv().unwrap(); - } - - #[test] - fn stress_shared() { - const AMT: u32 = 1000; - const NTHREADS: u32 = 8; - let (tx, rx) = sync_channel::<i32>(0); - let (dtx, drx) = sync_channel::<()>(0); - - thread::spawn(move || { - for _ in 0..AMT * NTHREADS { - assert_eq!(rx.recv().unwrap(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - dtx.send(()).unwrap(); - }); - - for _ in 0..NTHREADS { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..AMT { - tx.send(1).unwrap(); - } - }); - } - drop(tx); - drx.recv().unwrap(); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = sync_channel::<i32>(0); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = sync_channel::<i32>(0); - drop(tx); - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = sync_channel::<Box<i32>>(0); - drop(rx); - assert!(tx.send(box 0).is_err()); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = thread::spawn(move || { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - rx.recv().unwrap(); - }) - .join(); - // What is our res? - assert!(res.is_err()); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = sync_channel::<Box<i32>>(1); - tx.send(box 10).unwrap(); - assert!(*rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv().unwrap() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = sync_channel::<i32>(0); - drop(rx); - assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); - } - - #[test] - fn oneshot_single_thread_try_send_closed2() { - let (tx, _rx) = sync_channel::<i32>(0); - assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert!(rx.recv().is_err()); - } - - #[test] - fn oneshot_single_thread_try_recv_closed_with_data() { - let (tx, rx) = sync_channel::<i32>(1); - tx.send(10).unwrap(); - drop(tx); - assert_eq!(rx.try_recv(), Ok(10)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = sync_channel::<i32>(0); - drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = sync_channel::<i32>(0); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }); - - tx.send(box 10).unwrap(); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move || { - drop(tx); - }); - let res = thread::spawn(move || { - assert!(*rx.recv().unwrap() == 10); - }) - .join(); - assert!(res.is_err()); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - drop(rx); - }); - drop(tx); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - drop(rx); - }); - let _ = thread::spawn(move || { - tx.send(1).unwrap(); - }) - .join(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - let res = thread::spawn(move || { - rx.recv().unwrap(); - }) - .join(); - assert!(res.is_err()); - }); - let _t = thread::spawn(move || { - thread::spawn(move || { - drop(tx); - }); - }); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move || { - tx.send(box 10).unwrap(); - }); - assert!(*rx.recv().unwrap() == 10); - } - } - - #[test] - fn stream_send_recv_stress() { - for _ in 0..stress_factor() { - let (tx, rx) = sync_channel::<Box<i32>>(0); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: SyncSender<Box<i32>>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - tx.send(box i).unwrap(); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { - return; - } - - thread::spawn(move || { - assert!(*rx.recv().unwrap() == i); - recv(rx, i + 1); - }); - } - } - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(10000); - for _ in 0..10000 { - tx.send(()).unwrap(); - } - for _ in 0..10000 { - rx.recv().unwrap(); - } - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = sync_channel(0); - let total = stress_factor() + 100; - for _ in 0..total { - let tx = tx.clone(); - thread::spawn(move || { - tx.send(()).unwrap(); - }); - } - - for _ in 0..total { - rx.recv().unwrap(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = sync_channel::<i32>(0); - let (total_tx, total_rx) = sync_channel::<i32>(0); - - let _t = thread::spawn(move || { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc).unwrap(); - }); - - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); - drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = sync_channel::<i32>(0); - let (count_tx, count_rx) = sync_channel(0); - - let _t = thread::spawn(move || { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count).unwrap(); - }); - - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.try_send(2); - drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = sync_channel::<i32>(1); - let (tx2, rx2) = sync_channel::<()>(1); - let (tx3, rx3) = sync_channel::<()>(1); - let _t = thread::spawn(move || { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); - drop(tx1); - tx3.send(()).unwrap(); - }); - - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = sync_channel::<()>(0); - let (tx2, rx2) = sync_channel::<()>(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()).unwrap(); - }); - // make sure the other thread has gone to sleep - for _ in 0..5000 { - thread::yield_now(); - } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()).unwrap(); - - // wait for the child thread to exit before we exit - rx2.recv().unwrap(); - } - - #[test] - fn send1() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - rx.recv().unwrap(); - }); - assert_eq!(tx.send(1), Ok(())); - } - - #[test] - fn send2() { - let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); - } - - #[test] - fn send3() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.send(1), Ok(())); - let _t = thread::spawn(move || { - drop(rx); - }); - assert!(tx.send(1).is_err()); - } - - #[test] - fn send4() { - let (tx, rx) = sync_channel::<i32>(0); - let tx2 = tx.clone(); - let (done, donerx) = channel(); - let done2 = done.clone(); - let _t = thread::spawn(move || { - assert!(tx.send(1).is_err()); - done.send(()).unwrap(); - }); - let _t = thread::spawn(move || { - assert!(tx2.send(2).is_err()); - done2.send(()).unwrap(); - }); - drop(rx); - donerx.recv().unwrap(); - donerx.recv().unwrap(); - } - - #[test] - fn try_send1() { - let (tx, _rx) = sync_channel::<i32>(0); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } - - #[test] - fn try_send2() { - let (tx, _rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } - - #[test] - fn try_send3() { - let (tx, rx) = sync_channel::<i32>(1); - assert_eq!(tx.try_send(1), Ok(())); - drop(rx); - assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); - } - - #[test] - fn issue_15761() { - fn repro() { - let (tx1, rx1) = sync_channel::<()>(3); - let (tx2, rx2) = sync_channel::<()>(3); - - let _t = thread::spawn(move || { - rx1.recv().unwrap(); - tx2.try_send(()).unwrap(); - }); - - tx1.try_send(()).unwrap(); - rx2.recv().unwrap(); - } - - for _ in 0..100 { - repro() - } - } -} diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs index 6e7a7be4430..42bc639dc25 100644 --- a/library/std/src/sync/mpsc/mpsc_queue.rs +++ b/library/std/src/sync/mpsc/mpsc_queue.rs @@ -11,6 +11,9 @@ // http://www.1024cores.net/home/lock-free-algorithms // /queues/non-intrusive-mpsc-node-based-queue +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + pub use self::PopResult::*; use core::cell::UnsafeCell; @@ -112,54 +115,3 @@ impl<T> Drop for Queue<T> { } } } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use super::{Data, Empty, Inconsistent, Queue}; - use crate::sync::mpsc::channel; - use crate::sync::Arc; - use crate::thread; - - #[test] - fn test_full() { - let q: Queue<Box<_>> = Queue::new(); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn test() { - let nthreads = 8; - let nmsgs = 1000; - let q = Queue::new(); - match q.pop() { - Empty => {} - Inconsistent | Data(..) => panic!(), - } - let (tx, rx) = channel(); - let q = Arc::new(q); - - for _ in 0..nthreads { - let tx = tx.clone(); - let q = q.clone(); - thread::spawn(move || { - for i in 0..nmsgs { - q.push(i); - } - tx.send(()).unwrap(); - }); - } - - let mut i = 0; - while i < nthreads * nmsgs { - match q.pop() { - Empty | Inconsistent => {} - Data(_) => i += 1, - } - } - drop(tx); - for _ in 0..nthreads { - rx.recv().unwrap(); - } - } -} diff --git a/library/std/src/sync/mpsc/mpsc_queue/tests.rs b/library/std/src/sync/mpsc/mpsc_queue/tests.rs new file mode 100644 index 00000000000..348b83424b0 --- /dev/null +++ b/library/std/src/sync/mpsc/mpsc_queue/tests.rs @@ -0,0 +1,47 @@ +use super::{Data, Empty, Inconsistent, Queue}; +use crate::sync::mpsc::channel; +use crate::sync::Arc; +use crate::thread; + +#[test] +fn test_full() { + let q: Queue<Box<_>> = Queue::new(); + q.push(box 1); + q.push(box 2); +} + +#[test] +fn test() { + let nthreads = 8; + let nmsgs = 1000; + let q = Queue::new(); + match q.pop() { + Empty => {} + Inconsistent | Data(..) => panic!(), + } + let (tx, rx) = channel(); + let q = Arc::new(q); + + for _ in 0..nthreads { + let tx = tx.clone(); + let q = q.clone(); + thread::spawn(move || { + for i in 0..nmsgs { + q.push(i); + } + tx.send(()).unwrap(); + }); + } + + let mut i = 0; + while i < nthreads * nmsgs { + match q.pop() { + Empty | Inconsistent => {} + Data(_) => i += 1, + } + } + drop(tx); + for _ in 0..nthreads { + rx.recv().unwrap(); + } +} diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs index 0274268f69f..9bf99f193ca 100644 --- a/library/std/src/sync/mpsc/spsc_queue.rs +++ b/library/std/src/sync/mpsc/spsc_queue.rs @@ -6,6 +6,9 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + use core::cell::UnsafeCell; use core::ptr; @@ -231,108 +234,3 @@ impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, } } } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use super::Queue; - use crate::sync::mpsc::channel; - use crate::sync::Arc; - use crate::thread; - - #[test] - fn smoke() { - unsafe { - let queue = Queue::with_additions(0, (), ()); - queue.push(1); - queue.push(2); - assert_eq!(queue.pop(), Some(1)); - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), None); - queue.push(3); - queue.push(4); - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), Some(4)); - assert_eq!(queue.pop(), None); - } - } - - #[test] - fn peek() { - unsafe { - let queue = Queue::with_additions(0, (), ()); - queue.push(vec![1]); - - // Ensure the borrowchecker works - match queue.peek() { - Some(vec) => { - assert_eq!(&*vec, &[1]); - } - None => unreachable!(), - } - - match queue.pop() { - Some(vec) => { - assert_eq!(&*vec, &[1]); - } - None => unreachable!(), - } - } - } - - #[test] - fn drop_full() { - unsafe { - let q: Queue<Box<_>> = Queue::with_additions(0, (), ()); - q.push(box 1); - q.push(box 2); - } - } - - #[test] - fn smoke_bound() { - unsafe { - let q = Queue::with_additions(0, (), ()); - q.push(1); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - } - - #[test] - fn stress() { - unsafe { - stress_bound(0); - stress_bound(1); - } - - unsafe fn stress_bound(bound: usize) { - let q = Arc::new(Queue::with_additions(bound, (), ())); - - let (tx, rx) = channel(); - let q2 = q.clone(); - let _t = thread::spawn(move || { - for _ in 0..100000 { - loop { - match q2.pop() { - Some(1) => break, - Some(_) => panic!(), - None => {} - } - } - } - tx.send(()).unwrap(); - }); - for _ in 0..100000 { - q.push(1); - } - rx.recv().unwrap(); - } - } -} diff --git a/library/std/src/sync/mpsc/spsc_queue/tests.rs b/library/std/src/sync/mpsc/spsc_queue/tests.rs new file mode 100644 index 00000000000..e4fd15cbbde --- /dev/null +++ b/library/std/src/sync/mpsc/spsc_queue/tests.rs @@ -0,0 +1,101 @@ +use super::Queue; +use crate::sync::mpsc::channel; +use crate::sync::Arc; +use crate::thread; + +#[test] +fn smoke() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(1); + queue.push(2); + assert_eq!(queue.pop(), Some(1)); + assert_eq!(queue.pop(), Some(2)); + assert_eq!(queue.pop(), None); + queue.push(3); + queue.push(4); + assert_eq!(queue.pop(), Some(3)); + assert_eq!(queue.pop(), Some(4)); + assert_eq!(queue.pop(), None); + } +} + +#[test] +fn peek() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(vec![1]); + + // Ensure the borrowchecker works + match queue.peek() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + + match queue.pop() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + } +} + +#[test] +fn drop_full() { + unsafe { + let q: Queue<Box<_>> = Queue::with_additions(0, (), ()); + q.push(box 1); + q.push(box 2); + } +} + +#[test] +fn smoke_bound() { + unsafe { + let q = Queue::with_additions(0, (), ()); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); + } +} + +#[test] +fn stress() { + unsafe { + stress_bound(0); + stress_bound(1); + } + + unsafe fn stress_bound(bound: usize) { + let q = Arc::new(Queue::with_additions(bound, (), ())); + + let (tx, rx) = channel(); + let q2 = q.clone(); + let _t = thread::spawn(move || { + for _ in 0..100000 { + loop { + match q2.pop() { + Some(1) => break, + Some(_) => panic!(), + None => {} + } + } + } + tx.send(()).unwrap(); + }); + for _ in 0..100000 { + q.push(1); + } + rx.recv().unwrap(); + } +} diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs new file mode 100644 index 00000000000..0052a38f7bb --- /dev/null +++ b/library/std/src/sync/mpsc/sync_tests.rs @@ -0,0 +1,647 @@ +use super::*; +use crate::env; +use crate::thread; +use crate::time::Duration; + +pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } +} + +#[test] +fn smoke() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn drop_full() { + let (tx, _rx) = sync_channel::<Box<isize>>(1); + tx.send(box 1).unwrap(); +} + +#[test] +fn smoke_shared() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn recv_timeout() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); +} + +#[test] +fn smoke_threads() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_port_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert!(tx.send(1).is_err()); +} + +#[test] +fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); +} + +#[test] +fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} +} + +#[test] +fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} +} + +#[test] +fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); +} + +#[test] +fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} +} + +#[test] +fn stress() { + let (tx, rx) = sync_channel::<i32>(0); + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } +} + +#[test] +fn stress_recv_timeout_two_threads() { + let (tx, rx) = sync_channel::<i32>(0); + + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, 10000); +} + +#[test] +fn stress_recv_timeout_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + + drop(tx); + + drx.recv().unwrap(); +} + +#[test] +fn stress_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + drx.recv().unwrap(); +} + +#[test] +fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<i32>(0); + drop(rx); +} + +#[test] +fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<i32>(0); + drop(tx); +} + +#[test] +fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<i32>>(0); + drop(rx); + assert!(tx.send(box 0).is_err()); +} + +#[test] +fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); +} + +#[test] +fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<i32>>(1); + tx.send(box 10).unwrap(); + assert!(*rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); +} + +#[test] +fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); +} + +#[test] +fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); +} + +#[test] +fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); +} + +#[test] +fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<i32>(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(box 10).unwrap(); +} + +#[test] +fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); +} + +#[test] +fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } +} + +#[test] +fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } +} + +#[test] +fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } +} + +#[test] +fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let _t = thread::spawn(move || { + tx.send(box 10).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } +} + +#[test] +fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(box i).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } +} + +#[test] +fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } +} + +#[test] +fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } +} + +#[test] +fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<i32>(0); + let (total_tx, total_rx) = sync_channel::<i32>(0); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); +} + +#[test] +fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<i32>(0); + let (count_tx, count_rx) = sync_channel(0); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); +} + +#[test] +fn try_recv_states() { + let (tx1, rx1) = sync_channel::<i32>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); +} + +// This bug used to end up in a livelock inside of the Receiver destructor +// because the internal state of the Shared packet was corrupted +#[test] +fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); +} + +#[test] +fn send1() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); +} + +#[test] +fn send2() { + let (tx, rx) = sync_channel::<i32>(0); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); +} + +#[test] +fn send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.send(1), Ok(())); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); +} + +#[test] +fn send4() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let _t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let _t = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); +} + +#[test] +fn try_send1() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); +} + +#[test] +fn try_send2() { + let (tx, _rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); +} + +#[test] +fn try_send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); +} + +#[test] +fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } +} diff --git a/library/std/src/sync/mpsc/tests.rs b/library/std/src/sync/mpsc/tests.rs new file mode 100644 index 00000000000..184ce193cbe --- /dev/null +++ b/library/std/src/sync/mpsc/tests.rs @@ -0,0 +1,706 @@ +use super::*; +use crate::env; +use crate::thread; +use crate::time::{Duration, Instant}; + +pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } +} + +#[test] +fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(box 1).unwrap(); +} + +#[test] +fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(box 1).unwrap(); +} + +#[test] +fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); +} + +#[test] +fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) +} + +#[test] +fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); +} + +#[test] +fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} +} + +#[test] +fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} +} + +#[test] +fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); +} + +#[test] +fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} +} + +#[test] +fn stress() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn stress_shared() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + t.join().ok().expect("thread panicked"); +} + +#[test] +fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); +} + +#[test] +fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); +} + +#[test] +fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(box 0).is_err()); +} + +#[test] +fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = channel::<i32>(); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); +} + +#[test] +fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(box 10).unwrap(); + assert!(*rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); +} + +#[test] +fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); +} + +#[test] +fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); +} + +#[test] +fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(box 10).unwrap(); +} + +#[test] +fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); +} + +#[test] +fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } +} + +#[test] +fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } +} + +#[test] +fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } +} + +#[test] +fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<Box<isize>>(); + let _t = thread::spawn(move || { + tx.send(box 10).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } +} + +#[test] +fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(box i).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } +} + +#[test] +fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); +} + +#[test] +fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + for i in 0..stress { + let tx = tx.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn very_long_recv_timeout_wont_panic() { + let (tx, rx) = channel::<()>(); + let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); + thread::sleep(Duration::from_secs(1)); + assert!(tx.send(()).is_ok()); + assert_eq!(join_handle.join().unwrap(), Ok(())); +} + +#[test] +fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } +} + +#[test] +fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } +} + +#[test] +fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); +} + +#[test] +fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); +} + +#[test] +fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); +} + +#[test] +fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); +} + +// This bug used to end up in a livelock inside of the Receiver destructor +// because the internal state of the Shared packet was corrupted +#[test] +fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); +} + +#[test] +fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); +} diff --git a/library/std/src/sync/mutex.rs b/library/std/src/sync/mutex.rs index d7a4f00305c..0de71793834 100644 --- a/library/std/src/sync/mutex.rs +++ b/library/std/src/sync/mutex.rs @@ -1,3 +1,6 @@ +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + use crate::cell::UnsafeCell; use crate::fmt; use crate::mem; @@ -515,245 +518,3 @@ pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex { pub fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag { &guard.lock.poison } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use crate::sync::atomic::{AtomicUsize, Ordering}; - use crate::sync::mpsc::channel; - use crate::sync::{Arc, Condvar, Mutex}; - use crate::thread; - - struct Packet<T>(Arc<(Mutex<T>, Condvar)>); - - #[derive(Eq, PartialEq, Debug)] - struct NonCopy(i32); - - #[test] - fn smoke() { - let m = Mutex::new(()); - drop(m.lock().unwrap()); - drop(m.lock().unwrap()); - } - - #[test] - fn lots_and_lots() { - const J: u32 = 1000; - const K: u32 = 3; - - let m = Arc::new(Mutex::new(0)); - - fn inc(m: &Mutex<u32>) { - for _ in 0..J { - *m.lock().unwrap() += 1; - } - } - - let (tx, rx) = channel(); - for _ in 0..K { - let tx2 = tx.clone(); - let m2 = m.clone(); - thread::spawn(move || { - inc(&m2); - tx2.send(()).unwrap(); - }); - let tx2 = tx.clone(); - let m2 = m.clone(); - thread::spawn(move || { - inc(&m2); - tx2.send(()).unwrap(); - }); - } - - drop(tx); - for _ in 0..2 * K { - rx.recv().unwrap(); - } - assert_eq!(*m.lock().unwrap(), J * K * 2); - } - - #[test] - fn try_lock() { - let m = Mutex::new(()); - *m.try_lock().unwrap() = (); - } - - #[test] - fn test_into_inner() { - let m = Mutex::new(NonCopy(10)); - assert_eq!(m.into_inner().unwrap(), NonCopy(10)); - } - - #[test] - fn test_into_inner_drop() { - struct Foo(Arc<AtomicUsize>); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = Mutex::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner().unwrap(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); - } - - #[test] - fn test_into_inner_poison() { - let m = Arc::new(Mutex::new(NonCopy(10))); - let m2 = m.clone(); - let _ = thread::spawn(move || { - let _lock = m2.lock().unwrap(); - panic!("test panic in inner thread to poison mutex"); - }) - .join(); - - assert!(m.is_poisoned()); - match Arc::try_unwrap(m).unwrap().into_inner() { - Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), - Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {:?}", x), - } - } - - #[test] - fn test_get_mut() { - let mut m = Mutex::new(NonCopy(10)); - *m.get_mut().unwrap() = NonCopy(20); - assert_eq!(m.into_inner().unwrap(), NonCopy(20)); - } - - #[test] - fn test_get_mut_poison() { - let m = Arc::new(Mutex::new(NonCopy(10))); - let m2 = m.clone(); - let _ = thread::spawn(move || { - let _lock = m2.lock().unwrap(); - panic!("test panic in inner thread to poison mutex"); - }) - .join(); - - assert!(m.is_poisoned()); - match Arc::try_unwrap(m).unwrap().get_mut() { - Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), - Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {:?}", x), - } - } - - #[test] - fn test_mutex_arc_condvar() { - let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); - let packet2 = Packet(packet.0.clone()); - let (tx, rx) = channel(); - let _t = thread::spawn(move || { - // wait until parent gets in - rx.recv().unwrap(); - let &(ref lock, ref cvar) = &*packet2.0; - let mut lock = lock.lock().unwrap(); - *lock = true; - cvar.notify_one(); - }); - - let &(ref lock, ref cvar) = &*packet.0; - let mut lock = lock.lock().unwrap(); - tx.send(()).unwrap(); - assert!(!*lock); - while !*lock { - lock = cvar.wait(lock).unwrap(); - } - } - - #[test] - fn test_arc_condvar_poison() { - let packet = Packet(Arc::new((Mutex::new(1), Condvar::new()))); - let packet2 = Packet(packet.0.clone()); - let (tx, rx) = channel(); - - let _t = thread::spawn(move || -> () { - rx.recv().unwrap(); - let &(ref lock, ref cvar) = &*packet2.0; - let _g = lock.lock().unwrap(); - cvar.notify_one(); - // Parent should fail when it wakes up. - panic!(); - }); - - let &(ref lock, ref cvar) = &*packet.0; - let mut lock = lock.lock().unwrap(); - tx.send(()).unwrap(); - while *lock == 1 { - match cvar.wait(lock) { - Ok(l) => { - lock = l; - assert_eq!(*lock, 1); - } - Err(..) => break, - } - } - } - - #[test] - fn test_mutex_arc_poison() { - let arc = Arc::new(Mutex::new(1)); - assert!(!arc.is_poisoned()); - let arc2 = arc.clone(); - let _ = thread::spawn(move || { - let lock = arc2.lock().unwrap(); - assert_eq!(*lock, 2); - }) - .join(); - assert!(arc.lock().is_err()); - assert!(arc.is_poisoned()); - } - - #[test] - fn test_mutex_arc_nested() { - // Tests nested mutexes and access - // to underlying data. - let arc = Arc::new(Mutex::new(1)); - let arc2 = Arc::new(Mutex::new(arc)); - let (tx, rx) = channel(); - let _t = thread::spawn(move || { - let lock = arc2.lock().unwrap(); - let lock2 = lock.lock().unwrap(); - assert_eq!(*lock2, 1); - tx.send(()).unwrap(); - }); - rx.recv().unwrap(); - } - - #[test] - fn test_mutex_arc_access_in_unwind() { - let arc = Arc::new(Mutex::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc<Mutex<i32>>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - *self.i.lock().unwrap() += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.lock().unwrap(); - assert_eq!(*lock, 2); - } - - #[test] - fn test_mutex_unsized() { - let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); - { - let b = &mut *mutex.lock().unwrap(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*mutex.lock().unwrap(), comp); - } -} diff --git a/library/std/src/sync/mutex/tests.rs b/library/std/src/sync/mutex/tests.rs new file mode 100644 index 00000000000..a1b5aeddcb6 --- /dev/null +++ b/library/std/src/sync/mutex/tests.rs @@ -0,0 +1,238 @@ +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, Condvar, Mutex}; +use crate::thread; + +struct Packet<T>(Arc<(Mutex<T>, Condvar)>); + +#[derive(Eq, PartialEq, Debug)] +struct NonCopy(i32); + +#[test] +fn smoke() { + let m = Mutex::new(()); + drop(m.lock().unwrap()); + drop(m.lock().unwrap()); +} + +#[test] +fn lots_and_lots() { + const J: u32 = 1000; + const K: u32 = 3; + + let m = Arc::new(Mutex::new(0)); + + fn inc(m: &Mutex<u32>) { + for _ in 0..J { + *m.lock().unwrap() += 1; + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(*m.lock().unwrap(), J * K * 2); +} + +#[test] +fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); +} + +#[test] +fn test_into_inner() { + let m = Mutex::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); +} + +#[test] +fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = Mutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); +} + +#[test] +fn test_into_inner_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {:?}", x), + } +} + +#[test] +fn test_get_mut() { + let mut m = Mutex::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); +} + +#[test] +fn test_get_mut_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {:?}", x), + } +} + +#[test] +fn test_mutex_arc_condvar() { + let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + // wait until parent gets in + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let mut lock = lock.lock().unwrap(); + *lock = true; + cvar.notify_one(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + assert!(!*lock); + while !*lock { + lock = cvar.wait(lock).unwrap(); + } +} + +#[test] +fn test_arc_condvar_poison() { + let packet = Packet(Arc::new((Mutex::new(1), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + + let _t = thread::spawn(move || -> () { + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let _g = lock.lock().unwrap(); + cvar.notify_one(); + // Parent should fail when it wakes up. + panic!(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + while *lock == 1 { + match cvar.wait(lock) { + Ok(l) => { + lock = l; + assert_eq!(*lock, 1); + } + Err(..) => break, + } + } +} + +#[test] +fn test_mutex_arc_poison() { + let arc = Arc::new(Mutex::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + assert_eq!(*lock, 2); + }) + .join(); + assert!(arc.lock().is_err()); + assert!(arc.is_poisoned()); +} + +#[test] +fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + let lock2 = lock.lock().unwrap(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); +} + +#[test] +fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<Mutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock().unwrap() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock().unwrap(); + assert_eq!(*lock, 2); +} + +#[test] +fn test_mutex_unsized() { + let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock().unwrap(), comp); +} diff --git a/library/std/src/sync/once.rs b/library/std/src/sync/once.rs index 714ec3e8786..8fed369bffc 100644 --- a/library/std/src/sync/once.rs +++ b/library/std/src/sync/once.rs @@ -84,6 +84,9 @@ // processor. Because both use Acquire ordering such a reordering is not // allowed, so no need for SeqCst. +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + use crate::cell::Cell; use crate::fmt; use crate::marker; @@ -568,123 +571,3 @@ impl OnceState { self.set_state_on_drop_to.set(POISONED); } } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use super::Once; - use crate::panic; - use crate::sync::mpsc::channel; - use crate::thread; - - #[test] - fn smoke_once() { - static O: Once = Once::new(); - let mut a = 0; - O.call_once(|| a += 1); - assert_eq!(a, 1); - O.call_once(|| a += 1); - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static O: Once = Once::new(); - static mut RUN: bool = false; - - let (tx, rx) = channel(); - for _ in 0..10 { - let tx = tx.clone(); - thread::spawn(move || { - for _ in 0..4 { - thread::yield_now() - } - unsafe { - O.call_once(|| { - assert!(!RUN); - RUN = true; - }); - assert!(RUN); - } - tx.send(()).unwrap(); - }); - } - - unsafe { - O.call_once(|| { - assert!(!RUN); - RUN = true; - }); - assert!(RUN); - } - - for _ in 0..10 { - rx.recv().unwrap(); - } - } - - #[test] - fn poison_bad() { - static O: Once = Once::new(); - - // poison the once - let t = panic::catch_unwind(|| { - O.call_once(|| panic!()); - }); - assert!(t.is_err()); - - // poisoning propagates - let t = panic::catch_unwind(|| { - O.call_once(|| {}); - }); - assert!(t.is_err()); - - // we can subvert poisoning, however - let mut called = false; - O.call_once_force(|p| { - called = true; - assert!(p.poisoned()) - }); - assert!(called); - - // once any success happens, we stop propagating the poison - O.call_once(|| {}); - } - - #[test] - fn wait_for_force_to_finish() { - static O: Once = Once::new(); - - // poison the once - let t = panic::catch_unwind(|| { - O.call_once(|| panic!()); - }); - assert!(t.is_err()); - - // make sure someone's waiting inside the once via a force - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - let t1 = thread::spawn(move || { - O.call_once_force(|p| { - assert!(p.poisoned()); - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - }); - }); - - rx1.recv().unwrap(); - - // put another waiter on the once - let t2 = thread::spawn(|| { - let mut called = false; - O.call_once(|| { - called = true; - }); - assert!(!called); - }); - - tx2.send(()).unwrap(); - - assert!(t1.join().is_ok()); - assert!(t2.join().is_ok()); - } -} diff --git a/library/std/src/sync/once/tests.rs b/library/std/src/sync/once/tests.rs new file mode 100644 index 00000000000..fae2752526e --- /dev/null +++ b/library/std/src/sync/once/tests.rs @@ -0,0 +1,116 @@ +use super::Once; +use crate::panic; +use crate::sync::mpsc::channel; +use crate::thread; + +#[test] +fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); +} + +#[test] +fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } +} + +#[test] +fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); +} + +#[test] +fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); +} diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs index 586093c916d..2d219c2c7dd 100644 --- a/library/std/src/sync/rwlock.rs +++ b/library/std/src/sync/rwlock.rs @@ -1,3 +1,6 @@ +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + use crate::cell::UnsafeCell; use crate::fmt; use crate::mem; @@ -538,254 +541,3 @@ impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> { } } } - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use crate::sync::atomic::{AtomicUsize, Ordering}; - use crate::sync::mpsc::channel; - use crate::sync::{Arc, RwLock, TryLockError}; - use crate::thread; - use rand::{self, Rng}; - - #[derive(Eq, PartialEq, Debug)] - struct NonCopy(i32); - - #[test] - fn smoke() { - let l = RwLock::new(()); - drop(l.read().unwrap()); - drop(l.write().unwrap()); - drop((l.read().unwrap(), l.read().unwrap())); - drop(l.write().unwrap()); - } - - #[test] - fn frob() { - const N: u32 = 10; - const M: usize = 1000; - - let r = Arc::new(RwLock::new(())); - - let (tx, rx) = channel::<()>(); - for _ in 0..N { - let tx = tx.clone(); - let r = r.clone(); - thread::spawn(move || { - let mut rng = rand::thread_rng(); - for _ in 0..M { - if rng.gen_bool(1.0 / (N as f64)) { - drop(r.write().unwrap()); - } else { - drop(r.read().unwrap()); - } - } - drop(tx); - }); - } - drop(tx); - let _ = rx.recv(); - } - - #[test] - fn test_rw_arc_poison_wr() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.write().unwrap(); - panic!(); - }) - .join(); - assert!(arc.read().is_err()); - } - - #[test] - fn test_rw_arc_poison_ww() { - let arc = Arc::new(RwLock::new(1)); - assert!(!arc.is_poisoned()); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.write().unwrap(); - panic!(); - }) - .join(); - assert!(arc.write().is_err()); - assert!(arc.is_poisoned()); - } - - #[test] - fn test_rw_arc_no_poison_rr() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.read().unwrap(); - panic!(); - }) - .join(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 1); - } - #[test] - fn test_rw_arc_no_poison_rw() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move || { - let _lock = arc2.read().unwrap(); - panic!() - }) - .join(); - let lock = arc.write().unwrap(); - assert_eq!(*lock, 1); - } - - #[test] - fn test_rw_arc() { - let arc = Arc::new(RwLock::new(0)); - let arc2 = arc.clone(); - let (tx, rx) = channel(); - - thread::spawn(move || { - let mut lock = arc2.write().unwrap(); - for _ in 0..10 { - let tmp = *lock; - *lock = -1; - thread::yield_now(); - *lock = tmp + 1; - } - tx.send(()).unwrap(); - }); - - // Readers try to catch the writer in the act - let mut children = Vec::new(); - for _ in 0..5 { - let arc3 = arc.clone(); - children.push(thread::spawn(move || { - let lock = arc3.read().unwrap(); - assert!(*lock >= 0); - })); - } - - // Wait for children to pass their asserts - for r in children { - assert!(r.join().is_ok()); - } - - // Wait for writer to finish - rx.recv().unwrap(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 10); - } - - #[test] - fn test_rw_arc_access_in_unwind() { - let arc = Arc::new(RwLock::new(1)); - let arc2 = arc.clone(); - let _ = thread::spawn(move || -> () { - struct Unwinder { - i: Arc<RwLock<isize>>, - } - impl Drop for Unwinder { - fn drop(&mut self) { - let mut lock = self.i.write().unwrap(); - *lock += 1; - } - } - let _u = Unwinder { i: arc2 }; - panic!(); - }) - .join(); - let lock = arc.read().unwrap(); - assert_eq!(*lock, 2); - } - - #[test] - fn test_rwlock_unsized() { - let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); - { - let b = &mut *rw.write().unwrap(); - b[0] = 4; - b[2] = 5; - } - let comp: &[i32] = &[4, 2, 5]; - assert_eq!(&*rw.read().unwrap(), comp); - } - - #[test] - fn test_rwlock_try_write() { - let lock = RwLock::new(0isize); - let read_guard = lock.read().unwrap(); - - let write_result = lock.try_write(); - match write_result { - Err(TryLockError::WouldBlock) => (), - Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), - Err(_) => assert!(false, "unexpected error"), - } - - drop(read_guard); - } - - #[test] - fn test_into_inner() { - let m = RwLock::new(NonCopy(10)); - assert_eq!(m.into_inner().unwrap(), NonCopy(10)); - } - - #[test] - fn test_into_inner_drop() { - struct Foo(Arc<AtomicUsize>); - impl Drop for Foo { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - let num_drops = Arc::new(AtomicUsize::new(0)); - let m = RwLock::new(Foo(num_drops.clone())); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - { - let _inner = m.into_inner().unwrap(); - assert_eq!(num_drops.load(Ordering::SeqCst), 0); - } - assert_eq!(num_drops.load(Ordering::SeqCst), 1); - } - - #[test] - fn test_into_inner_poison() { - let m = Arc::new(RwLock::new(NonCopy(10))); - let m2 = m.clone(); - let _ = thread::spawn(move || { - let _lock = m2.write().unwrap(); - panic!("test panic in inner thread to poison RwLock"); - }) - .join(); - - assert!(m.is_poisoned()); - match Arc::try_unwrap(m).unwrap().into_inner() { - Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), - Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {:?}", x), - } - } - - #[test] - fn test_get_mut() { - let mut m = RwLock::new(NonCopy(10)); - *m.get_mut().unwrap() = NonCopy(20); - assert_eq!(m.into_inner().unwrap(), NonCopy(20)); - } - - #[test] - fn test_get_mut_poison() { - let m = Arc::new(RwLock::new(NonCopy(10))); - let m2 = m.clone(); - let _ = thread::spawn(move || { - let _lock = m2.write().unwrap(); - panic!("test panic in inner thread to poison RwLock"); - }) - .join(); - - assert!(m.is_poisoned()); - match Arc::try_unwrap(m).unwrap().get_mut() { - Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), - Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {:?}", x), - } - } -} diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs new file mode 100644 index 00000000000..e9b74fb3ecc --- /dev/null +++ b/library/std/src/sync/rwlock/tests.rs @@ -0,0 +1,247 @@ +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, RwLock, TryLockError}; +use crate::thread; +use rand::{self, Rng}; + +#[derive(Eq, PartialEq, Debug)] +struct NonCopy(i32); + +#[test] +fn smoke() { + let l = RwLock::new(()); + drop(l.read().unwrap()); + drop(l.write().unwrap()); + drop((l.read().unwrap(), l.read().unwrap())); + drop(l.write().unwrap()); +} + +#[test] +fn frob() { + const N: u32 = 10; + const M: usize = 1000; + + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / (N as f64)) { + drop(r.write().unwrap()); + } else { + drop(r.read().unwrap()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +#[test] +fn test_rw_arc_poison_wr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.read().is_err()); +} + +#[test] +fn test_rw_arc_poison_ww() { + let arc = Arc::new(RwLock::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.write().is_err()); + assert!(arc.is_poisoned()); +} + +#[test] +fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 1); +} +#[test] +fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!() + }) + .join(); + let lock = arc.write().unwrap(); + assert_eq!(*lock, 1); +} + +#[test] +fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write().unwrap(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read().unwrap(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 10); +} + +#[test] +fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<RwLock<isize>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write().unwrap(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 2); +} + +#[test] +fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read().unwrap(), comp); +} + +#[test] +fn test_rwlock_try_write() { + let lock = RwLock::new(0isize); + let read_guard = lock.read().unwrap(); + + let write_result = lock.try_write(); + match write_result { + Err(TryLockError::WouldBlock) => (), + Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), + Err(_) => assert!(false, "unexpected error"), + } + + drop(read_guard); +} + +#[test] +fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); +} + +#[test] +fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); +} + +#[test] +fn test_into_inner_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {:?}", x), + } +} + +#[test] +fn test_get_mut() { + let mut m = RwLock::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); +} + +#[test] +fn test_get_mut_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {:?}", x), + } +} |
