diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-12-22 09:04:23 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-12-29 08:58:21 -0800 |
| commit | c32d03f4172580e3f33e4844ed3c01234dca2d53 (patch) | |
| tree | 70c57ebe6a3b5f6e8cb4609c918f9455671926be /src/libstd/comm | |
| parent | 3dcc409fac18a258ba2a8af4345d9566ec8eebad (diff) | |
| download | rust-c32d03f4172580e3f33e4844ed3c01234dca2d53.tar.gz rust-c32d03f4172580e3f33e4844ed3c01234dca2d53.zip | |
std: Stabilize the prelude module
This commit is an implementation of [RFC 503][rfc] which is a stabilization story for the prelude. Most of the RFC was directly applied, removing reexports. Some reexports are kept around, however: * `range` remains until range syntax has landed to reduce churn. * `Path` and `GenericPath` remain until path reform lands. This is done to prevent many imports of `GenericPath` which will soon be removed. * All `io` traits remain until I/O reform lands so imports can be rewritten all at once to `std::io::prelude::*`. This is a breaking change because many prelude reexports have been removed, and the RFC can be consulted for the exact list of removed reexports, as well as to find the locations of where to import them. [rfc]: https://github.com/rust-lang/rfcs/blob/master/text/0503-prelude-stabilization.md [breaking-change] Closes #20068
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/mod.rs | 680 | ||||
| -rw-r--r-- | src/libstd/comm/mpsc_queue.rs | 11 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 150 | ||||
| -rw-r--r-- | src/libstd/comm/spsc_queue.rs | 6 |
4 files changed, 480 insertions, 367 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index c85bea87218..de7f3d00478 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -60,6 +60,7 @@ //! //! ``` //! use std::thread::Thread; +//! use std::comm::channel; //! //! // Create a simple streaming channel //! let (tx, rx) = channel(); @@ -73,6 +74,7 @@ //! //! ``` //! use std::thread::Thread; +//! use std::comm::channel; //! //! // Create a shared channel that can be sent along from many threads //! // where tx is the sending half (tx for transmission), and rx is the receiving @@ -94,6 +96,8 @@ //! Propagating panics: //! //! ```should_fail +//! use std::comm::channel; +//! //! // The call to recv() will panic!() because the channel has already hung //! // up (or been deallocated) //! let (tx, rx) = channel::<int>(); @@ -105,6 +109,7 @@ //! //! ``` //! use std::thread::Thread; +//! use std::comm::sync_channel; //! //! let (tx, rx) = sync_channel::<int>(0); //! Thread::spawn(move|| { @@ -120,6 +125,7 @@ //! after 10 seconds no matter what: //! //! ```no_run +//! use std::comm::channel; //! use std::io::timer::Timer; //! use std::time::Duration; //! @@ -143,6 +149,7 @@ //! has been inactive for 5 seconds: //! //! ```no_run +//! use std::comm::channel; //! use std::io::timer::Timer; //! use std::time::Duration; //! @@ -329,21 +336,6 @@ use self::select::StartResult; use self::select::StartResult::*; use self::blocking::SignalToken; -macro_rules! test { - { fn $name:ident() $b:block $(#[$a:meta])*} => ( - mod $name { - #![allow(unused_imports)] - - use super::*; - use comm::*; - use thread::Thread; - use prelude::*; - - $(#[$a])* #[test] fn f() { $b } - } - ) -} - mod blocking; mod oneshot; mod select; @@ -458,6 +450,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// # Example /// /// ``` +/// use std::comm::channel; /// use std::thread::Thread; /// /// // tx is is the sending half (tx for transmission), and rx is the receiving @@ -499,6 +492,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { /// # Example /// /// ``` +/// use std::comm::sync_channel; /// use std::thread::Thread; /// /// let (tx, rx) = sync_channel(1); @@ -580,6 +574,8 @@ impl<T: Send> Sender<T> { /// # Example /// /// ``` + /// use std::comm::channel; + /// /// let (tx, rx) = channel(); /// /// // This send is always successful @@ -1046,9 +1042,12 @@ unsafe impl<T> kinds::Sync for RacyCell<T> { } // Oh dear #[cfg(test)] mod test { - use super::*; - use prelude::*; + use prelude::v1::*; + use os; + use super::*; + use thread::Thread; + use str::from_str; pub fn stress_factor() -> uint { match os::getenv("RUST_TEST_STRESS") { @@ -1057,121 +1056,144 @@ mod test { } } - test! { fn smoke() { + #[test] + fn smoke() { let (tx, rx) = channel::<int>(); tx.send(1); assert_eq!(rx.recv(), 1); - } } + } - test! { fn drop_full() { + #[test] + fn drop_full() { let (tx, _rx) = channel(); tx.send(box 1i); - } } + } - test! { fn drop_full_shared() { + #[test] + fn drop_full_shared() { let (tx, _rx) = channel(); drop(tx.clone()); drop(tx.clone()); tx.send(box 1i); - } } + } - test! { fn smoke_shared() { + #[test] + fn smoke_shared() { let (tx, rx) = channel::<int>(); tx.send(1); assert_eq!(rx.recv(), 1); let tx = tx.clone(); tx.send(1); assert_eq!(rx.recv(), 1); - } } + } - test! { fn smoke_threads() { + #[test] + fn smoke_threads() { let (tx, rx) = channel::<int>(); - spawn(move|| { + let _t = Thread::spawn(move|| { tx.send(1); }); assert_eq!(rx.recv(), 1); - } } + } - test! { fn smoke_port_gone() { + #[test] + #[should_fail] + fn smoke_port_gone() { let (tx, rx) = channel::<int>(); drop(rx); tx.send(1); - } #[should_fail] } + } - test! { fn smoke_shared_port_gone() { + #[test] + #[should_fail] + fn smoke_shared_port_gone() { let (tx, rx) = channel::<int>(); drop(rx); tx.send(1); - } #[should_fail] } + } - test! { fn smoke_shared_port_gone2() { + #[test] + #[should_fail] + fn smoke_shared_port_gone2() { let (tx, rx) = channel::<int>(); drop(rx); let tx2 = tx.clone(); drop(tx); tx2.send(1); - } #[should_fail] } + } - test! { fn port_gone_concurrent() { + #[test] + #[should_fail] + fn port_gone_concurrent() { let (tx, rx) = channel::<int>(); - spawn(move|| { + Thread::spawn(move|| { rx.recv(); - }); + }).detach(); loop { tx.send(1) } - } #[should_fail] } + } - test! { fn port_gone_concurrent_shared() { + #[test] + #[should_fail] + fn port_gone_concurrent_shared() { let (tx, rx) = channel::<int>(); let tx2 = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { rx.recv(); - }); + }).detach(); loop { tx.send(1); tx2.send(1); } - } #[should_fail] } + } - test! { fn smoke_chan_gone() { + #[test] + #[should_fail] + fn smoke_chan_gone() { let (tx, rx) = channel::<int>(); drop(tx); rx.recv(); - } #[should_fail] } + } - test! { fn smoke_chan_gone_shared() { + #[test] + #[should_fail] + fn smoke_chan_gone_shared() { let (tx, rx) = channel::<()>(); let tx2 = tx.clone(); drop(tx); drop(tx2); rx.recv(); - } #[should_fail] } + } - test! { fn chan_gone_concurrent() { + #[test] + #[should_fail] + fn chan_gone_concurrent() { let (tx, rx) = channel::<int>(); - spawn(move|| { + Thread::spawn(move|| { tx.send(1); tx.send(1); - }); + }).detach(); loop { rx.recv(); } - } #[should_fail] } + } - test! { fn stress() { + #[test] + fn stress() { let (tx, rx) = channel::<int>(); - spawn(move|| { + let t = Thread::spawn(move|| { for _ in range(0u, 10000) { tx.send(1i); } }); for _ in range(0u, 10000) { assert_eq!(rx.recv(), 1); } - } } + t.join().ok().unwrap(); + } - test! { fn stress_shared() { + #[test] + fn stress_shared() { static AMT: uint = 10000; static NTHREADS: uint = 8; let (tx, rx) = channel::<int>(); - let (dtx, drx) = channel::<()>(); - spawn(move|| { + let t = Thread::spawn(move|| { for _ in range(0, AMT * NTHREADS) { assert_eq!(rx.recv(), 1); } @@ -1179,99 +1201,93 @@ mod test { Ok(..) => panic!(), _ => {} } - dtx.send(()); }); for _ in range(0, NTHREADS) { let tx = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { for _ in range(0, AMT) { tx.send(1); } - }); + }).detach(); } drop(tx); - drx.recv(); - } } + t.join().ok().unwrap(); + } #[test] fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - let tx4 = tx3.clone(); - spawn(move|| { + let t1 = Thread::spawn(move|| { tx1.send(()); for _ in range(0i, 40) { assert_eq!(rx2.recv(), 1); } - tx3.send(()); }); rx1.recv(); - spawn(move|| { + let t2 = Thread::spawn(move|| { for _ in range(0i, 40) { tx2.send(1); } - tx4.send(()); }); - rx3.recv(); - rx3.recv(); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); } #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::<int>(); - let (dtx, drx) = channel(); - spawn(move|| { + let t = Thread::spawn(move|| { for _ in range(0i, 40) { assert_eq!(rx.recv(), 1); } - dtx.send(()); }); for _ in range(0u, 40) { tx.send(1); } - drx.recv(); + t.join().ok().unwrap(); } #[test] fn no_runtime() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - let tx4 = tx3.clone(); - spawn(move|| { + let t1 = Thread::spawn(move|| { assert_eq!(rx1.recv(), 1); tx2.send(2); - tx4.send(()); }); - spawn(move|| { + let t2 = Thread::spawn(move|| { tx1.send(1); assert_eq!(rx2.recv(), 2); - tx3.send(()); }); - rx3.recv(); - rx3.recv(); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); } - test! { fn oneshot_single_thread_close_port_first() { + #[test] + fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = channel::<int>(); drop(rx); - } } + } - test! { fn oneshot_single_thread_close_chan_first() { + #[test] + fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = channel::<int>(); drop(tx); - } } + } - test! { fn oneshot_single_thread_send_port_close() { + #[test] + #[should_fail] + fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = channel::<Box<int>>(); drop(rx); tx.send(box 0); - } #[should_fail] } + } - test! { fn oneshot_single_thread_recv_chan_close() { + #[test] + fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic let res = Thread::spawn(move|| { let (tx, rx) = channel::<int>(); @@ -1280,129 +1296,142 @@ mod test { }).join(); // What is our res? assert!(res.is_err()); - } } + } - test! { fn oneshot_single_thread_send_then_recv() { + #[test] + fn oneshot_single_thread_send_then_recv() { let (tx, rx) = channel::<Box<int>>(); tx.send(box 10); assert!(rx.recv() == box 10); - } } + } - test! { fn oneshot_single_thread_try_send_open() { + #[test] + fn oneshot_single_thread_try_send_open() { let (tx, rx) = channel::<int>(); assert!(tx.send_opt(10).is_ok()); assert!(rx.recv() == 10); - } } + } - test! { fn oneshot_single_thread_try_send_closed() { + #[test] + fn oneshot_single_thread_try_send_closed() { let (tx, rx) = channel::<int>(); drop(rx); assert!(tx.send_opt(10).is_err()); - } } + } - test! { fn oneshot_single_thread_try_recv_open() { + #[test] + fn oneshot_single_thread_try_recv_open() { let (tx, rx) = channel::<int>(); tx.send(10); assert!(rx.recv_opt() == Ok(10)); - } } + } - test! { fn oneshot_single_thread_try_recv_closed() { + #[test] + fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = channel::<int>(); drop(tx); assert!(rx.recv_opt() == Err(())); - } } + } - test! { fn oneshot_single_thread_peek_data() { + #[test] + fn oneshot_single_thread_peek_data() { let (tx, rx) = channel::<int>(); assert_eq!(rx.try_recv(), Err(Empty)); tx.send(10); assert_eq!(rx.try_recv(), Ok(10)); - } } + } - test! { fn oneshot_single_thread_peek_close() { + #[test] + fn oneshot_single_thread_peek_close() { let (tx, rx) = channel::<int>(); drop(tx); assert_eq!(rx.try_recv(), Err(Disconnected)); assert_eq!(rx.try_recv(), Err(Disconnected)); - } } + } - test! { fn oneshot_single_thread_peek_open() { + #[test] + fn oneshot_single_thread_peek_open() { let (_tx, rx) = channel::<int>(); assert_eq!(rx.try_recv(), Err(Empty)); - } } + } - test! { fn oneshot_multi_task_recv_then_send() { + #[test] + fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::<Box<int>>(); - spawn(move|| { + let _t = Thread::spawn(move|| { assert!(rx.recv() == box 10); }); tx.send(box 10); - } } + } - test! { fn oneshot_multi_task_recv_then_close() { + #[test] + fn oneshot_multi_task_recv_then_close() { let (tx, rx) = channel::<Box<int>>(); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(tx); }); let res = Thread::spawn(move|| { assert!(rx.recv() == box 10); }).join(); assert!(res.is_err()); - } } + } - test! { fn oneshot_multi_thread_close_stress() { + #[test] + fn oneshot_multi_thread_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(rx); }); drop(tx); } - } } + } - test! { fn oneshot_multi_thread_send_close_stress() { + #[test] + fn oneshot_multi_thread_send_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(rx); }); let _ = Thread::spawn(move|| { tx.send(1); }).join(); } - } } + } - test! { fn oneshot_multi_thread_recv_close_stress() { + #[test] + fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - spawn(move|| { + Thread::spawn(move|| { let res = Thread::spawn(move|| { rx.recv(); }).join(); assert!(res.is_err()); - }); - spawn(move|| { - spawn(move|| { + }).detach(); + let _t = Thread::spawn(move|| { + Thread::spawn(move|| { drop(tx); - }); + }).detach(); }); } - } } + } - test! { fn oneshot_multi_thread_send_recv_stress() { + #[test] + fn oneshot_multi_thread_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel(); - spawn(move|| { + let _t = Thread::spawn(move|| { tx.send(box 10i); }); - spawn(move|| { - assert!(rx.recv() == box 10i); - }); + assert!(rx.recv() == box 10i); } - } } + } - test! { fn stream_send_recv_stress() { + #[test] + fn stream_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel(); @@ -1412,50 +1441,53 @@ mod test { fn send(tx: Sender<Box<int>>, i: int) { if i == 10 { return } - spawn(move|| { + Thread::spawn(move|| { tx.send(box i); send(tx, i + 1); - }); + }).detach(); } fn recv(rx: Receiver<Box<int>>, i: int) { if i == 10 { return } - spawn(move|| { + Thread::spawn(move|| { assert!(rx.recv() == box i); recv(rx, i + 1); - }); + }).detach(); } } - } } + } - test! { fn recv_a_lot() { + #[test] + fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); for _ in range(0i, 10000) { tx.send(()); } for _ in range(0i, 10000) { rx.recv(); } - } } + } - test! { fn shared_chan_stress() { + #[test] + fn shared_chan_stress() { let (tx, rx) = channel(); let total = stress_factor() + 100; for _ in range(0, total) { let tx = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { tx.send(()); - }); + }).detach(); } for _ in range(0, total) { rx.recv(); } - } } + } - test! { fn test_nested_recv_iter() { + #[test] + fn test_nested_recv_iter() { let (tx, rx) = channel::<int>(); let (total_tx, total_rx) = channel::<int>(); - spawn(move|| { + let _t = Thread::spawn(move|| { let mut acc = 0; for x in rx.iter() { acc += x; @@ -1468,13 +1500,14 @@ mod test { tx.send(2); drop(tx); assert_eq!(total_rx.recv(), 6); - } } + } - test! { fn test_recv_iter_break() { + #[test] + fn test_recv_iter_break() { let (tx, rx) = channel::<int>(); let (count_tx, count_rx) = channel(); - spawn(move|| { + let _t = Thread::spawn(move|| { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -1492,13 +1525,14 @@ mod test { let _ = tx.send_opt(2); drop(tx); assert_eq!(count_rx.recv(), 4); - } } + } - test! { fn try_recv_states() { + #[test] + fn try_recv_states() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { rx2.recv(); tx1.send(1); tx3.send(()); @@ -1515,14 +1549,15 @@ mod test { tx2.send(()); rx3.recv(); assert_eq!(rx1.try_recv(), Err(Disconnected)); - } } + } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted - test! { fn destroy_upgraded_shared_port_when_sender_still_active() { + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = channel(); let (tx2, rx2) = channel(); - spawn(move|| { + let _t = Thread::spawn(move|| { rx.recv(); // wait on a oneshot drop(rx); // destroy a shared tx2.send(()); @@ -1537,13 +1572,16 @@ mod test { // wait for the child task to exit before we exit rx2.recv(); - }} + } } #[cfg(test)] mod sync_tests { - use prelude::*; + use prelude::v1::*; use os; + use thread::Thread; + use super::*; + use str::from_str; pub fn stress_factor() -> uint { match os::getenv("RUST_TEST_STRESS") { @@ -1552,108 +1590,128 @@ mod sync_tests { } } - test! { fn smoke() { + #[test] + fn smoke() { let (tx, rx) = sync_channel::<int>(1); tx.send(1); assert_eq!(rx.recv(), 1); - } } + } - test! { fn drop_full() { + #[test] + fn drop_full() { let (tx, _rx) = sync_channel(1); tx.send(box 1i); - } } + } - test! { fn smoke_shared() { + #[test] + fn smoke_shared() { let (tx, rx) = sync_channel::<int>(1); tx.send(1); assert_eq!(rx.recv(), 1); let tx = tx.clone(); tx.send(1); assert_eq!(rx.recv(), 1); - } } + } - test! { fn smoke_threads() { + #[test] + fn smoke_threads() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { tx.send(1); }); assert_eq!(rx.recv(), 1); - } } + } - test! { fn smoke_port_gone() { + #[test] + #[should_fail] + fn smoke_port_gone() { let (tx, rx) = sync_channel::<int>(0); drop(rx); tx.send(1); - } #[should_fail] } + } - test! { fn smoke_shared_port_gone2() { + #[test] + #[should_fail] + fn smoke_shared_port_gone2() { let (tx, rx) = sync_channel::<int>(0); drop(rx); let tx2 = tx.clone(); drop(tx); tx2.send(1); - } #[should_fail] } + } - test! { fn port_gone_concurrent() { + #[test] + #[should_fail] + fn port_gone_concurrent() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + Thread::spawn(move|| { rx.recv(); - }); + }).detach(); loop { tx.send(1) } - } #[should_fail] } + } - test! { fn port_gone_concurrent_shared() { + #[test] + #[should_fail] + fn port_gone_concurrent_shared() { let (tx, rx) = sync_channel::<int>(0); let tx2 = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { rx.recv(); - }); + }).detach(); loop { tx.send(1); tx2.send(1); } - } #[should_fail] } + } - test! { fn smoke_chan_gone() { + #[test] + #[should_fail] + fn smoke_chan_gone() { let (tx, rx) = sync_channel::<int>(0); drop(tx); rx.recv(); - } #[should_fail] } + } - test! { fn smoke_chan_gone_shared() { + #[test] + #[should_fail] + fn smoke_chan_gone_shared() { let (tx, rx) = sync_channel::<()>(0); let tx2 = tx.clone(); drop(tx); drop(tx2); rx.recv(); - } #[should_fail] } + } - test! { fn chan_gone_concurrent() { + #[test] + #[should_fail] + fn chan_gone_concurrent() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + Thread::spawn(move|| { tx.send(1); tx.send(1); - }); + }).detach(); loop { rx.recv(); } - } #[should_fail] } + } - test! { fn stress() { + #[test] + fn stress() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + Thread::spawn(move|| { for _ in range(0u, 10000) { tx.send(1); } - }); + }).detach(); for _ in range(0u, 10000) { assert_eq!(rx.recv(), 1); } - } } + } - test! { fn stress_shared() { + #[test] + fn stress_shared() { static AMT: uint = 1000; static NTHREADS: uint = 8; let (tx, rx) = sync_channel::<int>(0); let (dtx, drx) = sync_channel::<()>(0); - spawn(move|| { + Thread::spawn(move|| { for _ in range(0, AMT * NTHREADS) { assert_eq!(rx.recv(), 1); } @@ -1662,38 +1720,43 @@ mod sync_tests { _ => {} } dtx.send(()); - }); + }).detach(); for _ in range(0, NTHREADS) { let tx = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { for _ in range(0, AMT) { tx.send(1); } - }); + }).detach(); } drop(tx); drx.recv(); - } } + } - test! { fn oneshot_single_thread_close_port_first() { + #[test] + fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = sync_channel::<int>(0); drop(rx); - } } + } - test! { fn oneshot_single_thread_close_chan_first() { + #[test] + fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = sync_channel::<int>(0); drop(tx); - } } + } - test! { fn oneshot_single_thread_send_port_close() { + #[test] + #[should_fail] + fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = sync_channel::<Box<int>>(0); drop(rx); tx.send(box 0); - } #[should_fail] } + } - test! { fn oneshot_single_thread_recv_chan_close() { + #[test] + fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic let res = Thread::spawn(move|| { let (tx, rx) = sync_channel::<int>(0); @@ -1702,134 +1765,148 @@ mod sync_tests { }).join(); // What is our res? assert!(res.is_err()); - } } + } - test! { fn oneshot_single_thread_send_then_recv() { + #[test] + fn oneshot_single_thread_send_then_recv() { let (tx, rx) = sync_channel::<Box<int>>(1); tx.send(box 10); assert!(rx.recv() == box 10); - } } + } - test! { fn oneshot_single_thread_try_send_open() { + #[test] + fn oneshot_single_thread_try_send_open() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(10), Ok(())); assert!(rx.recv() == 10); - } } + } - test! { fn oneshot_single_thread_try_send_closed() { + #[test] + fn oneshot_single_thread_try_send_closed() { let (tx, rx) = sync_channel::<int>(0); drop(rx); assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); - } } + } - test! { fn oneshot_single_thread_try_send_closed2() { + #[test] + fn oneshot_single_thread_try_send_closed2() { let (tx, _rx) = sync_channel::<int>(0); assert_eq!(tx.try_send(10), Err(Full(10))); - } } + } - test! { fn oneshot_single_thread_try_recv_open() { + #[test] + fn oneshot_single_thread_try_recv_open() { let (tx, rx) = sync_channel::<int>(1); tx.send(10); assert!(rx.recv_opt() == Ok(10)); - } } + } - test! { fn oneshot_single_thread_try_recv_closed() { + #[test] + fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = sync_channel::<int>(0); drop(tx); assert!(rx.recv_opt() == Err(())); - } } + } - test! { fn oneshot_single_thread_peek_data() { + #[test] + fn oneshot_single_thread_peek_data() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(rx.try_recv(), Err(Empty)); tx.send(10); assert_eq!(rx.try_recv(), Ok(10)); - } } + } - test! { fn oneshot_single_thread_peek_close() { + #[test] + fn oneshot_single_thread_peek_close() { let (tx, rx) = sync_channel::<int>(0); drop(tx); assert_eq!(rx.try_recv(), Err(Disconnected)); assert_eq!(rx.try_recv(), Err(Disconnected)); - } } + } - test! { fn oneshot_single_thread_peek_open() { + #[test] + fn oneshot_single_thread_peek_open() { let (_tx, rx) = sync_channel::<int>(0); assert_eq!(rx.try_recv(), Err(Empty)); - } } + } - test! { fn oneshot_multi_task_recv_then_send() { + #[test] + fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::<Box<int>>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { assert!(rx.recv() == box 10); }); tx.send(box 10); - } } + } - test! { fn oneshot_multi_task_recv_then_close() { + #[test] + fn oneshot_multi_task_recv_then_close() { let (tx, rx) = sync_channel::<Box<int>>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(tx); }); let res = Thread::spawn(move|| { assert!(rx.recv() == box 10); }).join(); assert!(res.is_err()); - } } + } - test! { fn oneshot_multi_thread_close_stress() { + #[test] + fn oneshot_multi_thread_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(rx); }); drop(tx); } - } } + } - test! { fn oneshot_multi_thread_send_close_stress() { + #[test] + fn oneshot_multi_thread_send_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { drop(rx); }); let _ = Thread::spawn(move || { tx.send(1); }).join(); } - } } + } - test! { fn oneshot_multi_thread_recv_close_stress() { + #[test] + fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { let res = Thread::spawn(move|| { rx.recv(); }).join(); assert!(res.is_err()); }); - spawn(move|| { - spawn(move|| { + let _t = Thread::spawn(move|| { + Thread::spawn(move|| { drop(tx); - }); + }).detach(); }); } - } } + } - test! { fn oneshot_multi_thread_send_recv_stress() { + #[test] + fn oneshot_multi_thread_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<Box<int>>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { tx.send(box 10i); }); - spawn(move|| { - assert!(rx.recv() == box 10i); - }); + assert!(rx.recv() == box 10i); } - } } + } - test! { fn stream_send_recv_stress() { + #[test] + fn stream_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<Box<int>>(0); @@ -1839,50 +1916,53 @@ mod sync_tests { fn send(tx: SyncSender<Box<int>>, i: int) { if i == 10 { return } - spawn(move|| { + Thread::spawn(move|| { tx.send(box i); send(tx, i + 1); - }); + }).detach(); } fn recv(rx: Receiver<Box<int>>, i: int) { if i == 10 { return } - spawn(move|| { + Thread::spawn(move|| { assert!(rx.recv() == box i); recv(rx, i + 1); - }); + }).detach(); } } - } } + } - test! { fn recv_a_lot() { + #[test] + fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = sync_channel(10000); for _ in range(0u, 10000) { tx.send(()); } for _ in range(0u, 10000) { rx.recv(); } - } } + } - test! { fn shared_chan_stress() { + #[test] + fn shared_chan_stress() { let (tx, rx) = sync_channel(0); let total = stress_factor() + 100; for _ in range(0, total) { let tx = tx.clone(); - spawn(move|| { + Thread::spawn(move|| { tx.send(()); - }); + }).detach(); } for _ in range(0, total) { rx.recv(); } - } } + } - test! { fn test_nested_recv_iter() { + #[test] + fn test_nested_recv_iter() { let (tx, rx) = sync_channel::<int>(0); let (total_tx, total_rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { let mut acc = 0; for x in rx.iter() { acc += x; @@ -1895,13 +1975,14 @@ mod sync_tests { tx.send(2); drop(tx); assert_eq!(total_rx.recv(), 6); - } } + } - test! { fn test_recv_iter_break() { + #[test] + fn test_recv_iter_break() { let (tx, rx) = sync_channel::<int>(0); let (count_tx, count_rx) = sync_channel(0); - spawn(move|| { + let _t = Thread::spawn(move|| { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -1919,13 +2000,14 @@ mod sync_tests { let _ = tx.try_send(2); drop(tx); assert_eq!(count_rx.recv(), 4); - } } + } - test! { fn try_recv_states() { + #[test] + fn try_recv_states() { let (tx1, rx1) = sync_channel::<int>(1); let (tx2, rx2) = sync_channel::<()>(1); let (tx3, rx3) = sync_channel::<()>(1); - spawn(move|| { + let _t = Thread::spawn(move|| { rx2.recv(); tx1.send(1); tx3.send(()); @@ -1942,14 +2024,15 @@ mod sync_tests { tx2.send(()); rx3.recv(); assert_eq!(rx1.try_recv(), Err(Disconnected)); - } } + } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted - test! { fn destroy_upgraded_shared_port_when_sender_still_active() { + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = sync_channel::<()>(0); let (tx2, rx2) = sync_channel::<()>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { rx.recv(); // wait on a oneshot drop(rx); // destroy a shared tx2.send(()); @@ -1964,78 +2047,77 @@ mod sync_tests { // wait for the child task to exit before we exit rx2.recv(); - } } + } - test! { fn send_opt1() { + #[test] + fn send_opt1() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { rx.recv(); }); + let _t = Thread::spawn(move|| { rx.recv(); }); assert_eq!(tx.send_opt(1), Ok(())); - } } + } - test! { fn send_opt2() { + #[test] + fn send_opt2() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { drop(rx); }); + let _t = Thread::spawn(move|| { drop(rx); }); assert_eq!(tx.send_opt(1), Err(1)); - } } + } - test! { fn send_opt3() { + #[test] + fn send_opt3() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(tx.send_opt(1), Ok(())); - spawn(move|| { drop(rx); }); + let _t = Thread::spawn(move|| { drop(rx); }); assert_eq!(tx.send_opt(1), Err(1)); - } } + } - test! { fn send_opt4() { + #[test] + fn send_opt4() { let (tx, rx) = sync_channel::<int>(0); let tx2 = tx.clone(); let (done, donerx) = channel(); let done2 = done.clone(); - spawn(move|| { + let _t = Thread::spawn(move|| { assert_eq!(tx.send_opt(1), Err(1)); done.send(()); }); - spawn(move|| { + let _t = Thread::spawn(move|| { assert_eq!(tx2.send_opt(2), Err(2)); done2.send(()); }); drop(rx); donerx.recv(); donerx.recv(); - } } + } - test! { fn try_send1() { + #[test] + fn try_send1() { let (tx, _rx) = sync_channel::<int>(0); assert_eq!(tx.try_send(1), Err(Full(1))); - } } + } - test! { fn try_send2() { + #[test] + fn try_send2() { let (tx, _rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(1), Ok(())); assert_eq!(tx.try_send(1), Err(Full(1))); - } } + } - test! { fn try_send3() { + #[test] + fn try_send3() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(1), Ok(())); drop(rx); assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); - } } - - test! { fn try_send4() { - let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { - for _ in range(0u, 1000) { Thread::yield_now(); } - assert_eq!(tx.try_send(1), Ok(())); - }); - assert_eq!(rx.recv(), 1); - } #[ignore(reason = "flaky on libnative")] } + } - test! { fn issue_15761() { + #[test] + fn issue_15761() { fn repro() { let (tx1, rx1) = sync_channel::<()>(3); let (tx2, rx2) = sync_channel::<()>(3); - spawn(move|| { + let _t = Thread::spawn(move|| { rx1.recv(); tx2.try_send(()).unwrap(); }); @@ -2047,5 +2129,5 @@ mod sync_tests { for _ in range(0u, 100) { repro() } - } } + } } diff --git a/src/libstd/comm/mpsc_queue.rs b/src/libstd/comm/mpsc_queue.rs index cddef236664..d1b6d0d697c 100644 --- a/src/libstd/comm/mpsc_queue.rs +++ b/src/libstd/comm/mpsc_queue.rs @@ -153,11 +153,12 @@ impl<T: Send> Drop for Queue<T> { #[cfg(test)] mod tests { - use prelude::*; - - use alloc::arc::Arc; + use prelude::v1::*; + use comm::channel; use super::{Queue, Data, Empty, Inconsistent}; + use sync::Arc; + use thread::Thread; #[test] fn test_full() { @@ -181,12 +182,12 @@ mod tests { for _ in range(0, nthreads) { let tx = tx.clone(); let q = q.clone(); - spawn(move|| { + Thread::spawn(move|| { for i in range(0, nmsgs) { q.push(i); } tx.send(()); - }); + }).detach(); } let mut i = 0u; diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 690b5861c22..5c476775bdb 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -27,6 +27,8 @@ //! # Example //! //! ```rust +//! use std::comm::channel; +//! //! let (tx1, rx1) = channel(); //! let (tx2, rx2) = channel(); //! @@ -335,9 +337,11 @@ impl Iterator<*mut Handle<'static, ()>> for Packets { #[cfg(test)] #[allow(unused_imports)] mod test { - use prelude::*; + use prelude::v1::*; use super::*; + use comm::*; + use thread::Thread; // Don't use the libstd version so we can pull in the right Select structure // (std::comm points at the wrong one) @@ -357,7 +361,8 @@ mod test { }) } - test! { fn smoke() { + #[test] + fn smoke() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); tx1.send(1); @@ -379,9 +384,10 @@ mod test { select! { bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } } - } } + } - test! { fn smoke2() { + #[test] + fn smoke2() { let (_tx1, rx1) = channel::<int>(); let (_tx2, rx2) = channel::<int>(); let (_tx3, rx3) = channel::<int>(); @@ -395,9 +401,10 @@ mod test { _foo = rx4.recv() => { panic!("4") }, foo = rx5.recv() => { assert_eq!(foo, 4); } } - } } + } - test! { fn closed() { + #[test] + fn closed() { let (_tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); drop(tx2); @@ -406,14 +413,15 @@ mod test { _a1 = rx1.recv_opt() => { panic!() }, a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } } - } } + } - test! { fn unblocks() { + #[test] + fn unblocks() { let (tx1, rx1) = channel::<int>(); let (_tx2, rx2) = channel::<int>(); let (tx3, rx3) = channel::<int>(); - spawn(move|| { + let _t = Thread::spawn(move|| { for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); rx3.recv(); @@ -429,14 +437,15 @@ mod test { a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, _b = rx2.recv() => { panic!() } } - } } + } - test! { fn both_ready() { + #[test] + fn both_ready() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); tx2.send(2); @@ -454,15 +463,16 @@ mod test { assert_eq!(rx1.try_recv(), Err(Empty)); assert_eq!(rx2.try_recv(), Err(Empty)); tx3.send(()); - } } + } - test! { fn stress() { + #[test] + fn stress() { static AMT: int = 10000; let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { for i in range(0, AMT) { if i % 2 == 0 { tx1.send(i); @@ -480,14 +490,15 @@ mod test { } tx3.send(()); } - } } + } - test! { fn cloning() { + #[test] + fn cloning() { let (tx1, rx1) = channel::<int>(); let (_tx2, rx2) = channel::<int>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { rx3.recv(); tx1.clone(); assert_eq!(rx3.try_recv(), Err(Empty)); @@ -501,14 +512,15 @@ mod test { _i2 = rx2.recv() => panic!() } tx3.send(()); - } } + } - test! { fn cloning2() { + #[test] + fn cloning2() { let (tx1, rx1) = channel::<int>(); let (_tx2, rx2) = channel::<int>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { rx3.recv(); tx1.clone(); assert_eq!(rx3.try_recv(), Err(Empty)); @@ -522,13 +534,14 @@ mod test { _i2 = rx2.recv() => panic!() } tx3.send(()); - } } + } - test! { fn cloning3() { + #[test] + fn cloning3() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); - spawn(move|| { + let _t = Thread::spawn(move|| { let s = Select::new(); let mut h1 = s.handle(&rx1); let mut h2 = s.handle(&rx2); @@ -542,44 +555,49 @@ mod test { drop(tx1.clone()); tx2.send(()); rx3.recv(); - } } + } - test! { fn preflight1() { + #[test] + fn preflight1() { let (tx, rx) = channel(); tx.send(()); select! { () = rx.recv() => {} } - } } + } - test! { fn preflight2() { + #[test] + fn preflight2() { let (tx, rx) = channel(); tx.send(()); tx.send(()); select! { () = rx.recv() => {} } - } } + } - test! { fn preflight3() { + #[test] + fn preflight3() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()); select! { () = rx.recv() => {} } - } } + } - test! { fn preflight4() { + #[test] + fn preflight4() { let (tx, rx) = channel(); tx.send(()); let s = Select::new(); let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn preflight5() { + #[test] + fn preflight5() { let (tx, rx) = channel(); tx.send(()); tx.send(()); @@ -587,9 +605,10 @@ mod test { let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn preflight6() { + #[test] + fn preflight6() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()); @@ -597,18 +616,20 @@ mod test { let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn preflight7() { + #[test] + fn preflight7() { let (tx, rx) = channel::<()>(); drop(tx); let s = Select::new(); let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn preflight8() { + #[test] + fn preflight8() { let (tx, rx) = channel(); tx.send(()); drop(tx); @@ -617,9 +638,10 @@ mod test { let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn preflight9() { + #[test] + fn preflight9() { let (tx, rx) = channel(); drop(tx.clone()); tx.send(()); @@ -629,12 +651,13 @@ mod test { let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); - } } + } - test! { fn oneshot_data_waiting() { + #[test] + fn oneshot_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - spawn(move|| { + let _t = Thread::spawn(move|| { select! { () = rx1.recv() => {} } @@ -644,16 +667,17 @@ mod test { for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); - } } + } - test! { fn stream_data_waiting() { + #[test] + fn stream_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); tx1.send(()); tx1.send(()); rx1.recv(); rx1.recv(); - spawn(move|| { + let _t = Thread::spawn(move|| { select! { () = rx1.recv() => {} } @@ -663,15 +687,16 @@ mod test { for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); - } } + } - test! { fn shared_data_waiting() { + #[test] + fn shared_data_waiting() { let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); drop(tx1.clone()); tx1.send(()); rx1.recv(); - spawn(move|| { + let _t = Thread::spawn(move|| { select! { () = rx1.recv() => {} } @@ -681,32 +706,35 @@ mod test { for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); - } } + } - test! { fn sync1() { + #[test] + fn sync1() { let (tx, rx) = sync_channel::<int>(1); tx.send(1); select! { n = rx.recv() => { assert_eq!(n, 1); } } - } } + } - test! { fn sync2() { + #[test] + fn sync2() { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { + let _t = Thread::spawn(move|| { for _ in range(0u, 100) { Thread::yield_now() } tx.send(1); }); select! { n = rx.recv() => { assert_eq!(n, 1); } } - } } + } - test! { fn sync3() { + #[test] + fn sync3() { let (tx1, rx1) = sync_channel::<int>(0); let (tx2, rx2): (Sender<int>, Receiver<int>) = channel(); - spawn(move|| { tx1.send(1); }); - spawn(move|| { tx2.send(2); }); + let _t = Thread::spawn(move|| { tx1.send(1); }); + let _t = Thread::spawn(move|| { tx2.send(2); }); select! { n = rx1.recv() => { assert_eq!(n, 1); @@ -717,5 +745,5 @@ mod test { assert_eq!(rx1.recv(), 1); } } - } } + } } diff --git a/src/libstd/comm/spsc_queue.rs b/src/libstd/comm/spsc_queue.rs index becb78063ae..1e2f5222d8b 100644 --- a/src/libstd/comm/spsc_queue.rs +++ b/src/libstd/comm/spsc_queue.rs @@ -240,10 +240,12 @@ impl<T: Send> Drop for Queue<T> { #[cfg(test)] mod test { - use prelude::*; + use prelude::v1::*; use sync::Arc; use super::Queue; + use thread::Thread; + use comm::channel; #[test] fn smoke() { @@ -320,7 +322,7 @@ mod test { let (tx, rx) = channel(); let q2 = q.clone(); - spawn(move|| { + let _t = Thread::spawn(move|| { for _ in range(0u, 100000) { loop { match q2.pop() { |
