diff options
Diffstat (limited to 'src/libstd/sync/mpsc/mod.rs')
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 510 |
1 files changed, 251 insertions, 259 deletions
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index c2884a28f3c..2831bbcb88d 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -269,20 +269,20 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! -use crate::sync::Arc; +use crate::cell::UnsafeCell; use crate::error; use crate::fmt; use crate::mem; -use crate::cell::UnsafeCell; +use crate::sync::Arc; use crate::time::{Duration, Instant}; mod blocking; +mod mpsc_queue; mod oneshot; mod shared; +mod spsc_queue; mod stream; mod sync; -mod mpsc_queue; -mod spsc_queue; mod cache_aligned; @@ -322,10 +322,10 @@ pub struct Receiver<T> { // The receiver port can be sent from place to place, so long as it // is not used to receive non-sendable things. #[stable(feature = "rust1", since = "1.0.0")] -unsafe impl<T: Send> Send for Receiver<T> { } +unsafe impl<T: Send> Send for Receiver<T> {} #[stable(feature = "rust1", since = "1.0.0")] -impl<T> !Sync for Receiver<T> { } +impl<T> !Sync for Receiver<T> {} /// An iterator over messages on a [`Receiver`], created by [`iter`]. /// @@ -359,7 +359,7 @@ impl<T> !Sync for Receiver<T> { } #[stable(feature = "rust1", since = "1.0.0")] #[derive(Debug)] pub struct Iter<'a, T: 'a> { - rx: &'a Receiver<T> + rx: &'a Receiver<T>, } /// An iterator that attempts to yield all pending values for a [`Receiver`], @@ -404,7 +404,7 @@ pub struct Iter<'a, T: 'a> { #[stable(feature = "receiver_try_iter", since = "1.15.0")] #[derive(Debug)] pub struct TryIter<'a, T: 'a> { - rx: &'a Receiver<T> + rx: &'a Receiver<T>, } /// An owning iterator over messages on a [`Receiver`], @@ -439,7 +439,7 @@ pub struct TryIter<'a, T: 'a> { #[stable(feature = "receiver_into_iter", since = "1.1.0")] #[derive(Debug)] pub struct IntoIter<T> { - rx: Receiver<T> + rx: Receiver<T>, } /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be @@ -482,10 +482,10 @@ pub struct Sender<T> { // The send port can be sent from place to place, so long as it // is not used to send non-sendable things. #[stable(feature = "rust1", since = "1.0.0")] -unsafe impl<T: Send> Send for Sender<T> { } +unsafe impl<T: Send> Send for Sender<T> {} #[stable(feature = "rust1", since = "1.0.0")] -impl<T> !Sync for Sender<T> { } +impl<T> !Sync for Sender<T> {} /// The sending-half of Rust's synchronous [`sync_channel`] type. /// @@ -772,9 +772,7 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { impl<T> Sender<T> { fn new(inner: Flavor<T>) -> Sender<T> { - Sender { - inner: UnsafeCell::new(inner), - } + Sender { inner: UnsafeCell::new(inner) } } /// Attempts to send a value on this channel, returning it back if it could @@ -856,8 +854,7 @@ impl<T> Clone for Sender<T> { let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); let sleeper = match p.upgrade(rx) { - oneshot::UpSuccess | - oneshot::UpDisconnected => None, + oneshot::UpSuccess | oneshot::UpDisconnected => None, oneshot::UpWoke(task) => Some(task), }; a.inherit_blocker(sleeper, guard); @@ -870,8 +867,7 @@ impl<T> Clone for Sender<T> { let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); let sleeper = match p.upgrade(rx) { - stream::UpSuccess | - stream::UpDisconnected => None, + stream::UpSuccess | stream::UpDisconnected => None, stream::UpWoke(task) => Some(task), }; a.inherit_blocker(sleeper, guard); @@ -1078,48 +1074,31 @@ impl<T> Receiver<T> { pub fn try_recv(&self) -> Result<T, TryRecvError> { loop { let new_port = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return Err(TryRecvError::Empty), - Err(oneshot::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - Err(oneshot::Upgraded(rx)) => rx, - } - } - Flavor::Stream(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(stream::Empty) => return Err(TryRecvError::Empty), - Err(stream::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - Err(stream::Upgraded(rx)) => rx, - } - } - Flavor::Shared(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(shared::Empty) => return Err(TryRecvError::Empty), - Err(shared::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - } - } - Flavor::Sync(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(sync::Empty) => return Err(TryRecvError::Empty), - Err(sync::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - } - } + Flavor::Oneshot(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(TryRecvError::Empty), + Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected), + Err(oneshot::Upgraded(rx)) => rx, + }, + Flavor::Stream(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(TryRecvError::Empty), + Err(stream::Disconnected) => return Err(TryRecvError::Disconnected), + Err(stream::Upgraded(rx)) => rx, + }, + Flavor::Shared(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(TryRecvError::Empty), + Err(shared::Disconnected) => return Err(TryRecvError::Disconnected), + }, + Flavor::Sync(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(TryRecvError::Empty), + Err(sync::Disconnected) => return Err(TryRecvError::Disconnected), + }, }; unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); + mem::swap(self.inner_mut(), new_port.inner_mut()); } } } @@ -1185,29 +1164,23 @@ impl<T> Receiver<T> { pub fn recv(&self) -> Result<T, RecvError> { loop { let new_port = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(oneshot::Disconnected) => return Err(RecvError), - Err(oneshot::Upgraded(rx)) => rx, - Err(oneshot::Empty) => unreachable!(), - } - } - Flavor::Stream(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(stream::Disconnected) => return Err(RecvError), - Err(stream::Upgraded(rx)) => rx, - Err(stream::Empty) => unreachable!(), - } - } - Flavor::Shared(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(shared::Disconnected) => return Err(RecvError), - Err(shared::Empty) => unreachable!(), - } - } + Flavor::Oneshot(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Upgraded(rx)) => rx, + Err(oneshot::Empty) => unreachable!(), + }, + Flavor::Stream(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Upgraded(rx)) => rx, + Err(stream::Empty) => unreachable!(), + }, + Flavor::Shared(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(RecvError), + Err(shared::Empty) => unreachable!(), + }, Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), }; unsafe { @@ -1383,36 +1356,28 @@ impl<T> Receiver<T> { loop { let port_or_empty = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(oneshot::Disconnected) => return Err(Disconnected), - Err(oneshot::Upgraded(rx)) => Some(rx), - Err(oneshot::Empty) => None, - } - } - Flavor::Stream(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(stream::Disconnected) => return Err(Disconnected), - Err(stream::Upgraded(rx)) => Some(rx), - Err(stream::Empty) => None, - } - } - Flavor::Shared(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(shared::Disconnected) => return Err(Disconnected), - Err(shared::Empty) => None, - } - } - Flavor::Sync(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(sync::Disconnected) => return Err(Disconnected), - Err(sync::Empty) => None, - } - } + Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(Disconnected), + Err(oneshot::Upgraded(rx)) => Some(rx), + Err(oneshot::Empty) => None, + }, + Flavor::Stream(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(Disconnected), + Err(stream::Upgraded(rx)) => Some(rx), + Err(stream::Empty) => None, + }, + Flavor::Shared(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(Disconnected), + Err(shared::Empty) => None, + }, + Flavor::Sync(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(sync::Disconnected) => return Err(Disconnected), + Err(sync::Empty) => None, + }, }; if let Some(new_port) = port_or_empty { @@ -1502,21 +1467,24 @@ impl<T> Receiver<T> { pub fn try_iter(&self) -> TryIter<'_, T> { TryIter { rx: self } } - } #[stable(feature = "rust1", since = "1.0.0")] impl<'a, T> Iterator for Iter<'a, T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } } #[stable(feature = "receiver_try_iter", since = "1.15.0")] impl<'a, T> Iterator for TryIter<'a, T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.try_recv().ok() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] @@ -1524,17 +1492,21 @@ impl<'a, T> IntoIterator for &'a Receiver<T> { type Item = T; type IntoIter = Iter<'a, T>; - fn into_iter(self) -> Iter<'a, T> { self.iter() } + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] impl<T> Iterator for IntoIter<T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] -impl <T> IntoIterator for Receiver<T> { +impl<T> IntoIterator for Receiver<T> { type Item = T; type IntoIter = IntoIter<T>; @@ -1597,27 +1569,18 @@ impl<T> fmt::Debug for TrySendError<T> { impl<T> fmt::Display for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - TrySendError::Full(..) => { - "sending on a full channel".fmt(f) - } - TrySendError::Disconnected(..) => { - "sending on a closed channel".fmt(f) - } + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), } } } #[stable(feature = "rust1", since = "1.0.0")] impl<T: Send> error::Error for TrySendError<T> { - fn description(&self) -> &str { match *self { - TrySendError::Full(..) => { - "sending on a full channel" - } - TrySendError::Disconnected(..) => { - "sending on a closed channel" - } + TrySendError::Full(..) => "sending on a full channel", + TrySendError::Disconnected(..) => "sending on a closed channel", } } } @@ -1640,7 +1603,6 @@ impl fmt::Display for RecvError { #[stable(feature = "rust1", since = "1.0.0")] impl error::Error for RecvError { - fn description(&self) -> &str { "receiving on a closed channel" } @@ -1650,27 +1612,18 @@ impl error::Error for RecvError { impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - TryRecvError::Empty => { - "receiving on an empty channel".fmt(f) - } - TryRecvError::Disconnected => { - "receiving on a closed channel".fmt(f) - } + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), } } } #[stable(feature = "rust1", since = "1.0.0")] impl error::Error for TryRecvError { - fn description(&self) -> &str { match *self { - TryRecvError::Empty => { - "receiving on an empty channel" - } - TryRecvError::Disconnected => { - "receiving on a closed channel" - } + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", } } } @@ -1688,12 +1641,8 @@ impl From<RecvError> for TryRecvError { impl fmt::Display for RecvTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - RecvTimeoutError::Timeout => { - "timed out waiting on channel".fmt(f) - } - RecvTimeoutError::Disconnected => { - "channel is empty and sending half is closed".fmt(f) - } + RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), } } } @@ -1702,12 +1651,8 @@ impl fmt::Display for RecvTimeoutError { impl error::Error for RecvTimeoutError { fn description(&self) -> &str { match *self { - RecvTimeoutError::Timeout => { - "timed out waiting on channel" - } - RecvTimeoutError::Disconnected => { - "channel is empty and sending half is closed" - } + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", } } } @@ -1769,7 +1714,7 @@ mod tests { #[test] fn smoke_threads() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); @@ -1801,7 +1746,7 @@ mod tests { #[test] fn port_gone_concurrent() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} @@ -1811,7 +1756,7 @@ mod tests { fn port_gone_concurrent_shared() { let (tx, rx) = channel::<i32>(); let tx2 = tx.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} @@ -1836,7 +1781,7 @@ mod tests { #[test] fn chan_gone_concurrent() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); @@ -1846,8 +1791,10 @@ mod tests { #[test] fn stress() { let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); @@ -1861,7 +1808,7 @@ mod tests { const NTHREADS: u32 = 8; let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } @@ -1873,8 +1820,10 @@ mod tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } drop(tx); @@ -1885,14 +1834,14 @@ mod tests { fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move|| { + let t1 = thread::spawn(move || { tx1.send(()).unwrap(); for _ in 0..40 { assert_eq!(rx2.recv().unwrap(), 1); } }); rx1.recv().unwrap(); - let t2 = thread::spawn(move|| { + let t2 = thread::spawn(move || { for _ in 0..40 { tx2.send(1).unwrap(); } @@ -1904,7 +1853,7 @@ mod tests { #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { for _ in 0..40 { assert_eq!(rx.recv().unwrap(), 1); } @@ -1919,11 +1868,11 @@ mod tests { fn no_runtime() { let (tx1, rx1) = channel::<i32>(); let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move|| { + let t1 = thread::spawn(move || { assert_eq!(rx1.recv().unwrap(), 1); tx2.send(2).unwrap(); }); - let t2 = thread::spawn(move|| { + let t2 = thread::spawn(move || { tx1.send(1).unwrap(); assert_eq!(rx2.recv().unwrap(), 2); }); @@ -1956,11 +1905,12 @@ mod tests { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = thread::spawn(move|| { + let res = thread::spawn(move || { let (tx, rx) = channel::<i32>(); drop(tx); rx.recv().unwrap(); - }).join(); + }) + .join(); // What is our res? assert!(res.is_err()); } @@ -2025,7 +1975,7 @@ mod tests { #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); @@ -2035,12 +1985,13 @@ mod tests { #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(tx); }); - let res = thread::spawn(move|| { + let res = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); - }).join(); + }) + .join(); assert!(res.is_err()); } @@ -2048,7 +1999,7 @@ mod tests { fn oneshot_multi_thread_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); drop(tx); @@ -2059,12 +2010,13 @@ mod tests { fn oneshot_multi_thread_send_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); - let _ = thread::spawn(move|| { + let _ = thread::spawn(move || { tx.send(1).unwrap(); - }).join(); + }) + .join(); } } @@ -2072,14 +2024,15 @@ mod tests { fn oneshot_multi_thread_recv_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - thread::spawn(move|| { - let res = thread::spawn(move|| { + thread::spawn(move || { + let res = thread::spawn(move || { rx.recv().unwrap(); - }).join(); + }) + .join(); assert!(res.is_err()); }); - let _t = thread::spawn(move|| { - thread::spawn(move|| { + let _t = thread::spawn(move || { + thread::spawn(move || { drop(tx); }); }); @@ -2090,7 +2043,7 @@ mod tests { fn oneshot_multi_thread_send_recv_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<Box<isize>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(box 10).unwrap(); }); assert!(*rx.recv().unwrap() == 10); @@ -2106,18 +2059,22 @@ mod tests { recv(rx, 0); fn send(tx: Sender<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { tx.send(box i).unwrap(); send(tx, i + 1); }); } fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); @@ -2214,9 +2171,8 @@ mod tests { #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 fn very_long_recv_timeout_wont_panic() { let (tx, rx) = channel::<()>(); - let join_handle = thread::spawn(move || { - rx.recv_timeout(Duration::from_secs(u64::max_value())) - }); + let join_handle = + thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value()))); thread::sleep(Duration::from_secs(1)); assert!(tx.send(()).is_ok()); assert_eq!(join_handle.join().unwrap(), Ok(())); @@ -2226,8 +2182,12 @@ mod tests { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); - for _ in 0..10000 { tx.send(()).unwrap(); } - for _ in 0..10000 { rx.recv().unwrap(); } + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } } #[test] @@ -2237,12 +2197,14 @@ mod tests { let total = 5; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } - for _ in 0..total { rx.recv().unwrap(); } + for _ in 0..total { + rx.recv().unwrap(); + } assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); tx.send(()).unwrap(); @@ -2255,7 +2217,7 @@ mod tests { let total = stress_factor() + 100; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } @@ -2270,7 +2232,7 @@ mod tests { let (tx, rx) = channel::<i32>(); let (total_tx, total_rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; @@ -2290,7 +2252,7 @@ mod tests { let (tx, rx) = channel::<i32>(); let (count_tx, count_rx) = channel(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -2316,7 +2278,7 @@ mod tests { let (response_tx, response_rx) = channel(); // Request `x`s until we have `6`. - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let mut count = 0; loop { for x in response_rx.try_iter() { @@ -2341,11 +2303,11 @@ mod tests { #[test] fn test_recv_into_iter_owned() { let mut iter = { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); - rx.into_iter() + rx.into_iter() }; assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); @@ -2369,7 +2331,7 @@ mod tests { let (tx1, rx1) = channel::<i32>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); @@ -2394,13 +2356,15 @@ mod tests { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = channel(); let (tx2, rx2) = channel(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared + drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep - for _ in 0..5000 { thread::yield_now(); } + for _ in 0..5000 { + thread::yield_now(); + } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -2468,7 +2432,7 @@ mod sync_tests { #[test] fn smoke_threads() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); @@ -2493,7 +2457,7 @@ mod sync_tests { #[test] fn port_gone_concurrent() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} @@ -2503,7 +2467,7 @@ mod sync_tests { fn port_gone_concurrent_shared() { let (tx, rx) = sync_channel::<i32>(0); let tx2 = tx.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} @@ -2528,7 +2492,7 @@ mod sync_tests { #[test] fn chan_gone_concurrent() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { + thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); @@ -2538,8 +2502,10 @@ mod sync_tests { #[test] fn stress() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); @@ -2551,8 +2517,10 @@ mod sync_tests { fn stress_recv_timeout_two_threads() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); let mut recv_count = 0; @@ -2561,7 +2529,7 @@ mod sync_tests { Ok(v) => { assert_eq!(v, 1); recv_count += 1; - }, + } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } @@ -2578,14 +2546,14 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (dtx, drx) = sync_channel::<()>(0); - thread::spawn(move|| { + thread::spawn(move || { let mut recv_count = 0; loop { match rx.recv_timeout(Duration::from_millis(10)) { Ok(v) => { assert_eq!(v, 1); recv_count += 1; - }, + } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } @@ -2599,8 +2567,10 @@ mod sync_tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } @@ -2616,7 +2586,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (dtx, drx) = sync_channel::<()>(0); - thread::spawn(move|| { + thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } @@ -2629,8 +2599,10 @@ mod sync_tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } drop(tx); @@ -2662,11 +2634,12 @@ mod sync_tests { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = thread::spawn(move|| { + let res = thread::spawn(move || { let (tx, rx) = sync_channel::<i32>(0); drop(tx); rx.recv().unwrap(); - }).join(); + }) + .join(); // What is our res? assert!(res.is_err()); } @@ -2746,7 +2719,7 @@ mod sync_tests { #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); @@ -2756,12 +2729,13 @@ mod sync_tests { #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(tx); }); - let res = thread::spawn(move|| { + let res = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); - }).join(); + }) + .join(); assert!(res.is_err()); } @@ -2769,7 +2743,7 @@ mod sync_tests { fn oneshot_multi_thread_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); drop(tx); @@ -2780,12 +2754,13 @@ mod sync_tests { fn oneshot_multi_thread_send_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); let _ = thread::spawn(move || { tx.send(1).unwrap(); - }).join(); + }) + .join(); } } @@ -2793,14 +2768,15 @@ mod sync_tests { fn oneshot_multi_thread_recv_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { - let res = thread::spawn(move|| { + let _t = thread::spawn(move || { + let res = thread::spawn(move || { rx.recv().unwrap(); - }).join(); + }) + .join(); assert!(res.is_err()); }); - let _t = thread::spawn(move|| { - thread::spawn(move|| { + let _t = thread::spawn(move || { + thread::spawn(move || { drop(tx); }); }); @@ -2811,7 +2787,7 @@ mod sync_tests { fn oneshot_multi_thread_send_recv_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(box 10).unwrap(); }); assert!(*rx.recv().unwrap() == 10); @@ -2827,18 +2803,22 @@ mod sync_tests { recv(rx, 0); fn send(tx: SyncSender<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { tx.send(box i).unwrap(); send(tx, i + 1); }); } fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); @@ -2850,8 +2830,12 @@ mod sync_tests { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = sync_channel(10000); - for _ in 0..10000 { tx.send(()).unwrap(); } - for _ in 0..10000 { rx.recv().unwrap(); } + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } } #[test] @@ -2860,7 +2844,7 @@ mod sync_tests { let total = stress_factor() + 100; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } @@ -2875,7 +2859,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (total_tx, total_rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; @@ -2895,7 +2879,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (count_tx, count_rx) = sync_channel(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -2920,7 +2904,7 @@ mod sync_tests { let (tx1, rx1) = sync_channel::<i32>(1); let (tx2, rx2) = sync_channel::<()>(1); let (tx3, rx3) = sync_channel::<()>(1); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); @@ -2945,13 +2929,15 @@ mod sync_tests { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = sync_channel::<()>(0); let (tx2, rx2) = sync_channel::<()>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared + drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep - for _ in 0..5000 { thread::yield_now(); } + for _ in 0..5000 { + thread::yield_now(); + } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -2965,14 +2951,18 @@ mod sync_tests { #[test] fn send1() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { rx.recv().unwrap(); }); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); assert_eq!(tx.send(1), Ok(())); } #[test] fn send2() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { drop(rx); }); + let _t = thread::spawn(move || { + drop(rx); + }); assert!(tx.send(1).is_err()); } @@ -2980,7 +2970,9 @@ mod sync_tests { fn send3() { let (tx, rx) = sync_channel::<i32>(1); assert_eq!(tx.send(1), Ok(())); - let _t =thread::spawn(move|| { drop(rx); }); + let _t = thread::spawn(move || { + drop(rx); + }); assert!(tx.send(1).is_err()); } @@ -2990,11 +2982,11 @@ mod sync_tests { let tx2 = tx.clone(); let (done, donerx) = channel(); let done2 = done.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(tx.send(1).is_err()); done.send(()).unwrap(); }); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(tx2.send(2).is_err()); done2.send(()).unwrap(); }); @@ -3030,7 +3022,7 @@ mod sync_tests { let (tx1, rx1) = sync_channel::<()>(3); let (tx2, rx2) = sync_channel::<()>(3); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx1.recv().unwrap(); tx2.try_send(()).unwrap(); }); |
