diff options
| author | David Tolnay <dtolnay@gmail.com> | 2019-11-27 10:29:00 -0800 |
|---|---|---|
| committer | David Tolnay <dtolnay@gmail.com> | 2019-11-29 18:43:27 -0800 |
| commit | 4436c9d35498e7ae3da261f6141d6d73b915e1e8 (patch) | |
| tree | 5bee9f8714a41c4ad672d0cc5c302ede56197726 /src/libstd/sync | |
| parent | 9081929d45f12d3f56d43b1d6db7519981580fc9 (diff) | |
| download | rust-4436c9d35498e7ae3da261f6141d6d73b915e1e8.tar.gz rust-4436c9d35498e7ae3da261f6141d6d73b915e1e8.zip | |
Format libstd with rustfmt
This commit applies rustfmt with rust-lang/rust's default settings to
files in src/libstd *that are not involved in any currently open PR* to
minimize merge conflicts. THe list of files involved in open PRs was
determined by querying GitHub's GraphQL API with this script:
https://gist.github.com/dtolnay/aa9c34993dc051a4f344d1b10e4487e8
With the list of files from the script in outstanding_files, the
relevant commands were:
$ find src/libstd -name '*.rs' \
| xargs rustfmt --edition=2018 --unstable-features --skip-children
$ rg libstd outstanding_files | xargs git checkout --
Repeating this process several months apart should get us coverage of
most of the rest of libstd.
To confirm no funny business:
$ git checkout $THIS_COMMIT^
$ git show --pretty= --name-only $THIS_COMMIT \
| xargs rustfmt --edition=2018 --unstable-features --skip-children
$ git diff $THIS_COMMIT # there should be no difference
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/barrier.rs | 22 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/blocking.rs | 17 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/cache_aligned.rs | 14 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 510 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mpsc_queue.rs | 28 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/oneshot.rs | 56 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/shared.rs | 57 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/spsc_queue.rs | 47 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/stream.rs | 112 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/sync.rs | 121 |
10 files changed, 499 insertions, 485 deletions
diff --git a/src/libstd/sync/barrier.rs b/src/libstd/sync/barrier.rs index 23ba63a6109..eddbdff257a 100644 --- a/src/libstd/sync/barrier.rs +++ b/src/libstd/sync/barrier.rs @@ -1,5 +1,5 @@ use crate::fmt; -use crate::sync::{Mutex, Condvar}; +use crate::sync::{Condvar, Mutex}; /// A barrier enables multiple threads to synchronize the beginning /// of some computation. @@ -82,10 +82,7 @@ impl Barrier { #[stable(feature = "rust1", since = "1.0.0")] pub fn new(n: usize) -> Barrier { Barrier { - lock: Mutex::new(BarrierState { - count: 0, - generation_id: 0, - }), + lock: Mutex::new(BarrierState { count: 0, generation_id: 0 }), cvar: Condvar::new(), num_threads: n, } @@ -135,8 +132,7 @@ impl Barrier { if lock.count < self.num_threads { // We need a while loop to guard against spurious wakeups. // http://en.wikipedia.org/wiki/Spurious_wakeup - while local_gen == lock.generation_id && - lock.count < self.num_threads { + while local_gen == lock.generation_id && lock.count < self.num_threads { lock = self.cvar.wait(lock).unwrap(); } BarrierWaitResult(false) @@ -152,9 +148,7 @@ impl Barrier { #[stable(feature = "std_debug", since = "1.16.0")] impl fmt::Debug for BarrierWaitResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BarrierWaitResult") - .field("is_leader", &self.is_leader()) - .finish() + f.debug_struct("BarrierWaitResult").field("is_leader", &self.is_leader()).finish() } } @@ -176,13 +170,15 @@ impl BarrierWaitResult { /// println!("{:?}", barrier_wait_result.is_leader()); /// ``` #[stable(feature = "rust1", since = "1.0.0")] - pub fn is_leader(&self) -> bool { self.0 } + pub fn is_leader(&self) -> bool { + self.0 + } } #[cfg(test)] mod tests { - use crate::sync::{Arc, Barrier}; use crate::sync::mpsc::{channel, TryRecvError}; + use crate::sync::{Arc, Barrier}; use crate::thread; #[test] @@ -196,7 +192,7 @@ mod tests { for _ in 0..N - 1 { let c = barrier.clone(); let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(c.wait().is_leader()).unwrap(); }); } diff --git a/src/libstd/sync/mpsc/blocking.rs b/src/libstd/sync/mpsc/blocking.rs index 6eacfaec253..d34de6a4fac 100644 --- a/src/libstd/sync/mpsc/blocking.rs +++ b/src/libstd/sync/mpsc/blocking.rs @@ -1,9 +1,9 @@ //! Generic support for building blocking abstractions. -use crate::thread::{self, Thread}; +use crate::mem; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::Arc; -use crate::mem; +use crate::thread::{self, Thread}; use crate::time::Instant; struct Inner { @@ -28,16 +28,9 @@ impl !Send for WaitToken {} impl !Sync for WaitToken {} pub fn tokens() -> (WaitToken, SignalToken) { - let inner = Arc::new(Inner { - thread: thread::current(), - woken: AtomicBool::new(false), - }); - let wait_token = WaitToken { - inner: inner.clone(), - }; - let signal_token = SignalToken { - inner, - }; + let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) }); + let wait_token = WaitToken { inner: inner.clone() }; + let signal_token = SignalToken { inner }; (wait_token, signal_token) } diff --git a/src/libstd/sync/mpsc/cache_aligned.rs b/src/libstd/sync/mpsc/cache_aligned.rs index b14a9e5d61b..b0842144328 100644 --- a/src/libstd/sync/mpsc/cache_aligned.rs +++ b/src/libstd/sync/mpsc/cache_aligned.rs @@ -8,16 +8,16 @@ pub(super) struct Aligner; pub(super) struct CacheAligned<T>(pub T, pub Aligner); impl<T> Deref for CacheAligned<T> { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.0 - } + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } } impl<T> DerefMut for CacheAligned<T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } impl<T> CacheAligned<T> { diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index c2884a28f3c..2831bbcb88d 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -269,20 +269,20 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! -use crate::sync::Arc; +use crate::cell::UnsafeCell; use crate::error; use crate::fmt; use crate::mem; -use crate::cell::UnsafeCell; +use crate::sync::Arc; use crate::time::{Duration, Instant}; mod blocking; +mod mpsc_queue; mod oneshot; mod shared; +mod spsc_queue; mod stream; mod sync; -mod mpsc_queue; -mod spsc_queue; mod cache_aligned; @@ -322,10 +322,10 @@ pub struct Receiver<T> { // The receiver port can be sent from place to place, so long as it // is not used to receive non-sendable things. #[stable(feature = "rust1", since = "1.0.0")] -unsafe impl<T: Send> Send for Receiver<T> { } +unsafe impl<T: Send> Send for Receiver<T> {} #[stable(feature = "rust1", since = "1.0.0")] -impl<T> !Sync for Receiver<T> { } +impl<T> !Sync for Receiver<T> {} /// An iterator over messages on a [`Receiver`], created by [`iter`]. /// @@ -359,7 +359,7 @@ impl<T> !Sync for Receiver<T> { } #[stable(feature = "rust1", since = "1.0.0")] #[derive(Debug)] pub struct Iter<'a, T: 'a> { - rx: &'a Receiver<T> + rx: &'a Receiver<T>, } /// An iterator that attempts to yield all pending values for a [`Receiver`], @@ -404,7 +404,7 @@ pub struct Iter<'a, T: 'a> { #[stable(feature = "receiver_try_iter", since = "1.15.0")] #[derive(Debug)] pub struct TryIter<'a, T: 'a> { - rx: &'a Receiver<T> + rx: &'a Receiver<T>, } /// An owning iterator over messages on a [`Receiver`], @@ -439,7 +439,7 @@ pub struct TryIter<'a, T: 'a> { #[stable(feature = "receiver_into_iter", since = "1.1.0")] #[derive(Debug)] pub struct IntoIter<T> { - rx: Receiver<T> + rx: Receiver<T>, } /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be @@ -482,10 +482,10 @@ pub struct Sender<T> { // The send port can be sent from place to place, so long as it // is not used to send non-sendable things. #[stable(feature = "rust1", since = "1.0.0")] -unsafe impl<T: Send> Send for Sender<T> { } +unsafe impl<T: Send> Send for Sender<T> {} #[stable(feature = "rust1", since = "1.0.0")] -impl<T> !Sync for Sender<T> { } +impl<T> !Sync for Sender<T> {} /// The sending-half of Rust's synchronous [`sync_channel`] type. /// @@ -772,9 +772,7 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { impl<T> Sender<T> { fn new(inner: Flavor<T>) -> Sender<T> { - Sender { - inner: UnsafeCell::new(inner), - } + Sender { inner: UnsafeCell::new(inner) } } /// Attempts to send a value on this channel, returning it back if it could @@ -856,8 +854,7 @@ impl<T> Clone for Sender<T> { let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); let sleeper = match p.upgrade(rx) { - oneshot::UpSuccess | - oneshot::UpDisconnected => None, + oneshot::UpSuccess | oneshot::UpDisconnected => None, oneshot::UpWoke(task) => Some(task), }; a.inherit_blocker(sleeper, guard); @@ -870,8 +867,7 @@ impl<T> Clone for Sender<T> { let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); let sleeper = match p.upgrade(rx) { - stream::UpSuccess | - stream::UpDisconnected => None, + stream::UpSuccess | stream::UpDisconnected => None, stream::UpWoke(task) => Some(task), }; a.inherit_blocker(sleeper, guard); @@ -1078,48 +1074,31 @@ impl<T> Receiver<T> { pub fn try_recv(&self) -> Result<T, TryRecvError> { loop { let new_port = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return Err(TryRecvError::Empty), - Err(oneshot::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - Err(oneshot::Upgraded(rx)) => rx, - } - } - Flavor::Stream(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(stream::Empty) => return Err(TryRecvError::Empty), - Err(stream::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - Err(stream::Upgraded(rx)) => rx, - } - } - Flavor::Shared(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(shared::Empty) => return Err(TryRecvError::Empty), - Err(shared::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - } - } - Flavor::Sync(ref p) => { - match p.try_recv() { - Ok(t) => return Ok(t), - Err(sync::Empty) => return Err(TryRecvError::Empty), - Err(sync::Disconnected) => { - return Err(TryRecvError::Disconnected) - } - } - } + Flavor::Oneshot(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(TryRecvError::Empty), + Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected), + Err(oneshot::Upgraded(rx)) => rx, + }, + Flavor::Stream(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(TryRecvError::Empty), + Err(stream::Disconnected) => return Err(TryRecvError::Disconnected), + Err(stream::Upgraded(rx)) => rx, + }, + Flavor::Shared(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(TryRecvError::Empty), + Err(shared::Disconnected) => return Err(TryRecvError::Disconnected), + }, + Flavor::Sync(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(TryRecvError::Empty), + Err(sync::Disconnected) => return Err(TryRecvError::Disconnected), + }, }; unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); + mem::swap(self.inner_mut(), new_port.inner_mut()); } } } @@ -1185,29 +1164,23 @@ impl<T> Receiver<T> { pub fn recv(&self) -> Result<T, RecvError> { loop { let new_port = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(oneshot::Disconnected) => return Err(RecvError), - Err(oneshot::Upgraded(rx)) => rx, - Err(oneshot::Empty) => unreachable!(), - } - } - Flavor::Stream(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(stream::Disconnected) => return Err(RecvError), - Err(stream::Upgraded(rx)) => rx, - Err(stream::Empty) => unreachable!(), - } - } - Flavor::Shared(ref p) => { - match p.recv(None) { - Ok(t) => return Ok(t), - Err(shared::Disconnected) => return Err(RecvError), - Err(shared::Empty) => unreachable!(), - } - } + Flavor::Oneshot(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Upgraded(rx)) => rx, + Err(oneshot::Empty) => unreachable!(), + }, + Flavor::Stream(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Upgraded(rx)) => rx, + Err(stream::Empty) => unreachable!(), + }, + Flavor::Shared(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(RecvError), + Err(shared::Empty) => unreachable!(), + }, Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), }; unsafe { @@ -1383,36 +1356,28 @@ impl<T> Receiver<T> { loop { let port_or_empty = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(oneshot::Disconnected) => return Err(Disconnected), - Err(oneshot::Upgraded(rx)) => Some(rx), - Err(oneshot::Empty) => None, - } - } - Flavor::Stream(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(stream::Disconnected) => return Err(Disconnected), - Err(stream::Upgraded(rx)) => Some(rx), - Err(stream::Empty) => None, - } - } - Flavor::Shared(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(shared::Disconnected) => return Err(Disconnected), - Err(shared::Empty) => None, - } - } - Flavor::Sync(ref p) => { - match p.recv(Some(deadline)) { - Ok(t) => return Ok(t), - Err(sync::Disconnected) => return Err(Disconnected), - Err(sync::Empty) => None, - } - } + Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(Disconnected), + Err(oneshot::Upgraded(rx)) => Some(rx), + Err(oneshot::Empty) => None, + }, + Flavor::Stream(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(Disconnected), + Err(stream::Upgraded(rx)) => Some(rx), + Err(stream::Empty) => None, + }, + Flavor::Shared(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(Disconnected), + Err(shared::Empty) => None, + }, + Flavor::Sync(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(sync::Disconnected) => return Err(Disconnected), + Err(sync::Empty) => None, + }, }; if let Some(new_port) = port_or_empty { @@ -1502,21 +1467,24 @@ impl<T> Receiver<T> { pub fn try_iter(&self) -> TryIter<'_, T> { TryIter { rx: self } } - } #[stable(feature = "rust1", since = "1.0.0")] impl<'a, T> Iterator for Iter<'a, T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } } #[stable(feature = "receiver_try_iter", since = "1.15.0")] impl<'a, T> Iterator for TryIter<'a, T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.try_recv().ok() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] @@ -1524,17 +1492,21 @@ impl<'a, T> IntoIterator for &'a Receiver<T> { type Item = T; type IntoIter = Iter<'a, T>; - fn into_iter(self) -> Iter<'a, T> { self.iter() } + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] impl<T> Iterator for IntoIter<T> { type Item = T; - fn next(&mut self) -> Option<T> { self.rx.recv().ok() } + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } } #[stable(feature = "receiver_into_iter", since = "1.1.0")] -impl <T> IntoIterator for Receiver<T> { +impl<T> IntoIterator for Receiver<T> { type Item = T; type IntoIter = IntoIter<T>; @@ -1597,27 +1569,18 @@ impl<T> fmt::Debug for TrySendError<T> { impl<T> fmt::Display for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - TrySendError::Full(..) => { - "sending on a full channel".fmt(f) - } - TrySendError::Disconnected(..) => { - "sending on a closed channel".fmt(f) - } + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), } } } #[stable(feature = "rust1", since = "1.0.0")] impl<T: Send> error::Error for TrySendError<T> { - fn description(&self) -> &str { match *self { - TrySendError::Full(..) => { - "sending on a full channel" - } - TrySendError::Disconnected(..) => { - "sending on a closed channel" - } + TrySendError::Full(..) => "sending on a full channel", + TrySendError::Disconnected(..) => "sending on a closed channel", } } } @@ -1640,7 +1603,6 @@ impl fmt::Display for RecvError { #[stable(feature = "rust1", since = "1.0.0")] impl error::Error for RecvError { - fn description(&self) -> &str { "receiving on a closed channel" } @@ -1650,27 +1612,18 @@ impl error::Error for RecvError { impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - TryRecvError::Empty => { - "receiving on an empty channel".fmt(f) - } - TryRecvError::Disconnected => { - "receiving on a closed channel".fmt(f) - } + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), } } } #[stable(feature = "rust1", since = "1.0.0")] impl error::Error for TryRecvError { - fn description(&self) -> &str { match *self { - TryRecvError::Empty => { - "receiving on an empty channel" - } - TryRecvError::Disconnected => { - "receiving on a closed channel" - } + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", } } } @@ -1688,12 +1641,8 @@ impl From<RecvError> for TryRecvError { impl fmt::Display for RecvTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - RecvTimeoutError::Timeout => { - "timed out waiting on channel".fmt(f) - } - RecvTimeoutError::Disconnected => { - "channel is empty and sending half is closed".fmt(f) - } + RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), } } } @@ -1702,12 +1651,8 @@ impl fmt::Display for RecvTimeoutError { impl error::Error for RecvTimeoutError { fn description(&self) -> &str { match *self { - RecvTimeoutError::Timeout => { - "timed out waiting on channel" - } - RecvTimeoutError::Disconnected => { - "channel is empty and sending half is closed" - } + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", } } } @@ -1769,7 +1714,7 @@ mod tests { #[test] fn smoke_threads() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); @@ -1801,7 +1746,7 @@ mod tests { #[test] fn port_gone_concurrent() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} @@ -1811,7 +1756,7 @@ mod tests { fn port_gone_concurrent_shared() { let (tx, rx) = channel::<i32>(); let tx2 = tx.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} @@ -1836,7 +1781,7 @@ mod tests { #[test] fn chan_gone_concurrent() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); @@ -1846,8 +1791,10 @@ mod tests { #[test] fn stress() { let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); @@ -1861,7 +1808,7 @@ mod tests { const NTHREADS: u32 = 8; let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } @@ -1873,8 +1820,10 @@ mod tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } drop(tx); @@ -1885,14 +1834,14 @@ mod tests { fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move|| { + let t1 = thread::spawn(move || { tx1.send(()).unwrap(); for _ in 0..40 { assert_eq!(rx2.recv().unwrap(), 1); } }); rx1.recv().unwrap(); - let t2 = thread::spawn(move|| { + let t2 = thread::spawn(move || { for _ in 0..40 { tx2.send(1).unwrap(); } @@ -1904,7 +1853,7 @@ mod tests { #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { for _ in 0..40 { assert_eq!(rx.recv().unwrap(), 1); } @@ -1919,11 +1868,11 @@ mod tests { fn no_runtime() { let (tx1, rx1) = channel::<i32>(); let (tx2, rx2) = channel::<i32>(); - let t1 = thread::spawn(move|| { + let t1 = thread::spawn(move || { assert_eq!(rx1.recv().unwrap(), 1); tx2.send(2).unwrap(); }); - let t2 = thread::spawn(move|| { + let t2 = thread::spawn(move || { tx1.send(1).unwrap(); assert_eq!(rx2.recv().unwrap(), 2); }); @@ -1956,11 +1905,12 @@ mod tests { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = thread::spawn(move|| { + let res = thread::spawn(move || { let (tx, rx) = channel::<i32>(); drop(tx); rx.recv().unwrap(); - }).join(); + }) + .join(); // What is our res? assert!(res.is_err()); } @@ -2025,7 +1975,7 @@ mod tests { #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); @@ -2035,12 +1985,13 @@ mod tests { #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = channel::<Box<i32>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(tx); }); - let res = thread::spawn(move|| { + let res = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); - }).join(); + }) + .join(); assert!(res.is_err()); } @@ -2048,7 +1999,7 @@ mod tests { fn oneshot_multi_thread_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); drop(tx); @@ -2059,12 +2010,13 @@ mod tests { fn oneshot_multi_thread_send_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); - let _ = thread::spawn(move|| { + let _ = thread::spawn(move || { tx.send(1).unwrap(); - }).join(); + }) + .join(); } } @@ -2072,14 +2024,15 @@ mod tests { fn oneshot_multi_thread_recv_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<i32>(); - thread::spawn(move|| { - let res = thread::spawn(move|| { + thread::spawn(move || { + let res = thread::spawn(move || { rx.recv().unwrap(); - }).join(); + }) + .join(); assert!(res.is_err()); }); - let _t = thread::spawn(move|| { - thread::spawn(move|| { + let _t = thread::spawn(move || { + thread::spawn(move || { drop(tx); }); }); @@ -2090,7 +2043,7 @@ mod tests { fn oneshot_multi_thread_send_recv_stress() { for _ in 0..stress_factor() { let (tx, rx) = channel::<Box<isize>>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(box 10).unwrap(); }); assert!(*rx.recv().unwrap() == 10); @@ -2106,18 +2059,22 @@ mod tests { recv(rx, 0); fn send(tx: Sender<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { tx.send(box i).unwrap(); send(tx, i + 1); }); } fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); @@ -2214,9 +2171,8 @@ mod tests { #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 fn very_long_recv_timeout_wont_panic() { let (tx, rx) = channel::<()>(); - let join_handle = thread::spawn(move || { - rx.recv_timeout(Duration::from_secs(u64::max_value())) - }); + let join_handle = + thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value()))); thread::sleep(Duration::from_secs(1)); assert!(tx.send(()).is_ok()); assert_eq!(join_handle.join().unwrap(), Ok(())); @@ -2226,8 +2182,12 @@ mod tests { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); - for _ in 0..10000 { tx.send(()).unwrap(); } - for _ in 0..10000 { rx.recv().unwrap(); } + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } } #[test] @@ -2237,12 +2197,14 @@ mod tests { let total = 5; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } - for _ in 0..total { rx.recv().unwrap(); } + for _ in 0..total { + rx.recv().unwrap(); + } assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); tx.send(()).unwrap(); @@ -2255,7 +2217,7 @@ mod tests { let total = stress_factor() + 100; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } @@ -2270,7 +2232,7 @@ mod tests { let (tx, rx) = channel::<i32>(); let (total_tx, total_rx) = channel::<i32>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; @@ -2290,7 +2252,7 @@ mod tests { let (tx, rx) = channel::<i32>(); let (count_tx, count_rx) = channel(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -2316,7 +2278,7 @@ mod tests { let (response_tx, response_rx) = channel(); // Request `x`s until we have `6`. - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let mut count = 0; loop { for x in response_rx.try_iter() { @@ -2341,11 +2303,11 @@ mod tests { #[test] fn test_recv_into_iter_owned() { let mut iter = { - let (tx, rx) = channel::<i32>(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); - rx.into_iter() + rx.into_iter() }; assert_eq!(iter.next().unwrap(), 1); assert_eq!(iter.next().unwrap(), 2); @@ -2369,7 +2331,7 @@ mod tests { let (tx1, rx1) = channel::<i32>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); @@ -2394,13 +2356,15 @@ mod tests { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = channel(); let (tx2, rx2) = channel(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared + drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep - for _ in 0..5000 { thread::yield_now(); } + for _ in 0..5000 { + thread::yield_now(); + } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -2468,7 +2432,7 @@ mod sync_tests { #[test] fn smoke_threads() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(1).unwrap(); }); assert_eq!(rx.recv().unwrap(), 1); @@ -2493,7 +2457,7 @@ mod sync_tests { #[test] fn port_gone_concurrent() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() {} @@ -2503,7 +2467,7 @@ mod sync_tests { fn port_gone_concurrent_shared() { let (tx, rx) = sync_channel::<i32>(0); let tx2 = tx.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); }); while tx.send(1).is_ok() && tx2.send(1).is_ok() {} @@ -2528,7 +2492,7 @@ mod sync_tests { #[test] fn chan_gone_concurrent() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { + thread::spawn(move || { tx.send(1).unwrap(); tx.send(1).unwrap(); }); @@ -2538,8 +2502,10 @@ mod sync_tests { #[test] fn stress() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); for _ in 0..10000 { assert_eq!(rx.recv().unwrap(), 1); @@ -2551,8 +2517,10 @@ mod sync_tests { fn stress_recv_timeout_two_threads() { let (tx, rx) = sync_channel::<i32>(0); - thread::spawn(move|| { - for _ in 0..10000 { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } }); let mut recv_count = 0; @@ -2561,7 +2529,7 @@ mod sync_tests { Ok(v) => { assert_eq!(v, 1); recv_count += 1; - }, + } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } @@ -2578,14 +2546,14 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (dtx, drx) = sync_channel::<()>(0); - thread::spawn(move|| { + thread::spawn(move || { let mut recv_count = 0; loop { match rx.recv_timeout(Duration::from_millis(10)) { Ok(v) => { assert_eq!(v, 1); recv_count += 1; - }, + } Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => break, } @@ -2599,8 +2567,10 @@ mod sync_tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } @@ -2616,7 +2586,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (dtx, drx) = sync_channel::<()>(0); - thread::spawn(move|| { + thread::spawn(move || { for _ in 0..AMT * NTHREADS { assert_eq!(rx.recv().unwrap(), 1); } @@ -2629,8 +2599,10 @@ mod sync_tests { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..AMT { tx.send(1).unwrap(); } + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } }); } drop(tx); @@ -2662,11 +2634,12 @@ mod sync_tests { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = thread::spawn(move|| { + let res = thread::spawn(move || { let (tx, rx) = sync_channel::<i32>(0); drop(tx); rx.recv().unwrap(); - }).join(); + }) + .join(); // What is our res? assert!(res.is_err()); } @@ -2746,7 +2719,7 @@ mod sync_tests { #[test] fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); }); @@ -2756,12 +2729,13 @@ mod sync_tests { #[test] fn oneshot_multi_task_recv_then_close() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(tx); }); - let res = thread::spawn(move|| { + let res = thread::spawn(move || { assert!(*rx.recv().unwrap() == 10); - }).join(); + }) + .join(); assert!(res.is_err()); } @@ -2769,7 +2743,7 @@ mod sync_tests { fn oneshot_multi_thread_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); drop(tx); @@ -2780,12 +2754,13 @@ mod sync_tests { fn oneshot_multi_thread_send_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { drop(rx); }); let _ = thread::spawn(move || { tx.send(1).unwrap(); - }).join(); + }) + .join(); } } @@ -2793,14 +2768,15 @@ mod sync_tests { fn oneshot_multi_thread_recv_close_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { - let res = thread::spawn(move|| { + let _t = thread::spawn(move || { + let res = thread::spawn(move || { rx.recv().unwrap(); - }).join(); + }) + .join(); assert!(res.is_err()); }); - let _t = thread::spawn(move|| { - thread::spawn(move|| { + let _t = thread::spawn(move || { + thread::spawn(move || { drop(tx); }); }); @@ -2811,7 +2787,7 @@ mod sync_tests { fn oneshot_multi_thread_send_recv_stress() { for _ in 0..stress_factor() { let (tx, rx) = sync_channel::<Box<i32>>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { tx.send(box 10).unwrap(); }); assert!(*rx.recv().unwrap() == 10); @@ -2827,18 +2803,22 @@ mod sync_tests { recv(rx, 0); fn send(tx: SyncSender<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { tx.send(box i).unwrap(); send(tx, i + 1); }); } fn recv(rx: Receiver<Box<i32>>, i: i32) { - if i == 10 { return } + if i == 10 { + return; + } - thread::spawn(move|| { + thread::spawn(move || { assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); @@ -2850,8 +2830,12 @@ mod sync_tests { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = sync_channel(10000); - for _ in 0..10000 { tx.send(()).unwrap(); } - for _ in 0..10000 { rx.recv().unwrap(); } + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } } #[test] @@ -2860,7 +2844,7 @@ mod sync_tests { let total = stress_factor() + 100; for _ in 0..total { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { tx.send(()).unwrap(); }); } @@ -2875,7 +2859,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (total_tx, total_rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut acc = 0; for x in rx.iter() { acc += x; @@ -2895,7 +2879,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<i32>(0); let (count_tx, count_rx) = sync_channel(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -2920,7 +2904,7 @@ mod sync_tests { let (tx1, rx1) = sync_channel::<i32>(1); let (tx2, rx2) = sync_channel::<()>(1); let (tx3, rx3) = sync_channel::<()>(1); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx2.recv().unwrap(); tx1.send(1).unwrap(); tx3.send(()).unwrap(); @@ -2945,13 +2929,15 @@ mod sync_tests { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = sync_channel::<()>(0); let (tx2, rx2) = sync_channel::<()>(0); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx.recv().unwrap(); // wait on a oneshot - drop(rx); // destroy a shared + drop(rx); // destroy a shared tx2.send(()).unwrap(); }); // make sure the other thread has gone to sleep - for _ in 0..5000 { thread::yield_now(); } + for _ in 0..5000 { + thread::yield_now(); + } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -2965,14 +2951,18 @@ mod sync_tests { #[test] fn send1() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { rx.recv().unwrap(); }); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); assert_eq!(tx.send(1), Ok(())); } #[test] fn send2() { let (tx, rx) = sync_channel::<i32>(0); - let _t = thread::spawn(move|| { drop(rx); }); + let _t = thread::spawn(move || { + drop(rx); + }); assert!(tx.send(1).is_err()); } @@ -2980,7 +2970,9 @@ mod sync_tests { fn send3() { let (tx, rx) = sync_channel::<i32>(1); assert_eq!(tx.send(1), Ok(())); - let _t =thread::spawn(move|| { drop(rx); }); + let _t = thread::spawn(move || { + drop(rx); + }); assert!(tx.send(1).is_err()); } @@ -2990,11 +2982,11 @@ mod sync_tests { let tx2 = tx.clone(); let (done, donerx) = channel(); let done2 = done.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(tx.send(1).is_err()); done.send(()).unwrap(); }); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { assert!(tx2.send(2).is_err()); done2.send(()).unwrap(); }); @@ -3030,7 +3022,7 @@ mod sync_tests { let (tx1, rx1) = sync_channel::<()>(3); let (tx2, rx2) = sync_channel::<()>(3); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { rx1.recv().unwrap(); tx2.try_send(()).unwrap(); }); diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs index 8f5681b97f4..6e7a7be4430 100644 --- a/src/libstd/sync/mpsc/mpsc_queue.rs +++ b/src/libstd/sync/mpsc/mpsc_queue.rs @@ -13,8 +13,8 @@ pub use self::PopResult::*; -use core::ptr; use core::cell::UnsafeCell; +use core::ptr; use crate::boxed::Box; use crate::sync::atomic::{AtomicPtr, Ordering}; @@ -45,15 +45,12 @@ pub struct Queue<T> { tail: UnsafeCell<*mut Node<T>>, } -unsafe impl<T: Send> Send for Queue<T> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Node<T> { - Box::into_raw(box Node { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - }) + Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) } } @@ -62,10 +59,7 @@ impl<T> Queue<T> { /// one consumer. pub fn new() -> Queue<T> { let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -101,7 +95,7 @@ impl<T> Queue<T> { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } } } } @@ -121,7 +115,7 @@ impl<T> Drop for Queue<T> { #[cfg(all(test, not(target_os = "emscripten")))] mod tests { - use super::{Queue, Data, Empty, Inconsistent}; + use super::{Data, Empty, Inconsistent, Queue}; use crate::sync::mpsc::channel; use crate::sync::Arc; use crate::thread; @@ -140,7 +134,7 @@ mod tests { let q = Queue::new(); match q.pop() { Empty => {} - Inconsistent | Data(..) => panic!() + Inconsistent | Data(..) => panic!(), } let (tx, rx) = channel(); let q = Arc::new(q); @@ -148,7 +142,7 @@ mod tests { for _ in 0..nthreads { let tx = tx.clone(); let q = q.clone(); - thread::spawn(move|| { + thread::spawn(move || { for i in 0..nmsgs { q.push(i); } @@ -159,8 +153,8 @@ mod tests { let mut i = 0; while i < nthreads * nmsgs { match q.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } + Empty | Inconsistent => {} + Data(_) => i += 1, } } drop(tx); diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs index e7a5cc46b31..bbe77e7d0fb 100644 --- a/src/libstd/sync/mpsc/oneshot.rs +++ b/src/libstd/sync/mpsc/oneshot.rs @@ -21,22 +21,21 @@ /// consuming the port). This upgrade is then also stored in the shared packet. /// The one caveat to consider is that when a port sees a disconnected channel /// it must check for data because there is no "data plus upgrade" state. - pub use self::Failure::*; -pub use self::UpgradeResult::*; use self::MyUpgrade::*; +pub use self::UpgradeResult::*; -use crate::sync::mpsc::Receiver; -use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::cell::UnsafeCell; use crate::ptr; use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::Receiver; use crate::time::Instant; // Various states you can find a port in. -const EMPTY: usize = 0; // initial state: no data, no blocked receiver -const DATA: usize = 1; // data ready for receiver to take -const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded +const EMPTY: usize = 0; // initial state: no data, no blocked receiver +const DATA: usize = 1; // data ready for receiver to take +const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded // Any other value represents a pointer to a SignalToken value. The // protocol ensures that when the state moves *to* a pointer, // ownership of the token is given to the packet, and when the state @@ -178,21 +177,17 @@ impl<T> Packet<T> { // and an upgrade flags the channel as disconnected, so when we see // this we first need to check if there's data available and *then* // we go through and process the upgrade. - DISCONNECTED => { - match (&mut *self.data.get()).take() { - Some(data) => Ok(data), - None => { - match ptr::replace(self.upgrade.get(), SendUsed) { - SendUsed | NothingSent => Err(Disconnected), - GoUp(upgrade) => Err(Upgraded(upgrade)) - } - } - } - } + DISCONNECTED => match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)), + }, + }, // We are the sole receiver; there cannot be a blocking // receiver already. - _ => unreachable!() + _ => unreachable!(), } } } @@ -217,10 +212,13 @@ impl<T> Packet<T> { // If the other end is already disconnected, then we failed the // upgrade. Be sure to trash the port we were given. - DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected } + DISCONNECTED => { + ptr::replace(self.upgrade.get(), prev); + UpDisconnected + } // If someone's waiting, we gotta wake them up - ptr => UpWoke(SignalToken::cast_from_usize(ptr)) + ptr => UpWoke(SignalToken::cast_from_usize(ptr)), } } } @@ -232,7 +230,7 @@ impl<T> Packet<T> { // If someone's waiting, we gotta wake them up ptr => unsafe { SignalToken::cast_from_usize(ptr).signal(); - } + }, } } @@ -246,10 +244,12 @@ impl<T> Packet<T> { // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). - DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, + DATA => unsafe { + (&mut *self.data.get()).take().unwrap(); + }, // We're the only ones that can block on this port - _ => unreachable!() + _ => unreachable!(), } } @@ -265,13 +265,11 @@ impl<T> Packet<T> { let state = match self.state.load(Ordering::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection - s @ EMPTY | - s @ DATA | - s @ DISCONNECTED => s, + s @ EMPTY | s @ DATA | s @ DISCONNECTED => s, // If we've got a blocked thread, then use an atomic to gain ownership // of it (may fail) - ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst) + ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst), }; // Now that we've got ownership of our state, figure out what to do @@ -302,7 +300,7 @@ impl<T> Packet<T> { ptr => unsafe { drop(SignalToken::cast_from_usize(ptr)); Ok(false) - } + }, } } } diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs index dbcdcdac932..2b0393573fd 100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@ -7,7 +7,6 @@ /// High level implementation details can be found in the comment of the parent /// module. You'll also note that the implementation of the shared and stream /// channels are quite similar, and this is no coincidence! - pub use self::Failure::*; use self::StartResult::*; @@ -17,7 +16,7 @@ use core::isize; use crate::cell::UnsafeCell; use crate::ptr; -use crate::sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::mpsc_queue as mpsc; use crate::sync::{Mutex, MutexGuard}; @@ -34,9 +33,9 @@ const MAX_STEALS: isize = 1 << 20; pub struct Packet<T> { queue: mpsc::Queue<T>, - cnt: AtomicIsize, // How many items are on this channel + cnt: AtomicIsize, // How many items are on this channel steals: UnsafeCell<isize>, // How many times has a port received without blocking? - to_wake: AtomicUsize, // SignalToken for wake up + to_wake: AtomicUsize, // SignalToken for wake up // The number of channels which are currently using this packet. channels: AtomicUsize, @@ -92,9 +91,7 @@ impl<T> Packet<T> { // threads in select(). // // This can only be called at channel-creation time - pub fn inherit_blocker(&self, - token: Option<SignalToken>, - guard: MutexGuard<'_, ()>) { + pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) { token.map(|token| { assert_eq!(self.cnt.load(Ordering::SeqCst), 0); assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); @@ -119,7 +116,9 @@ impl<T> Packet<T> { // To offset this bad increment, we initially set the steal count to // -1. You'll find some special code in abort_selection() as well to // ensure that this -1 steal count doesn't escape too far. - unsafe { *self.steals.get() = -1; } + unsafe { + *self.steals.get() = -1; + } }); // When the shared packet is constructed, we grabbed this lock. The @@ -132,7 +131,9 @@ impl<T> Packet<T> { pub fn send(&self, t: T) -> Result<(), T> { // See Port::drop for what's going on - if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } + if self.port_dropped.load(Ordering::SeqCst) { + return Err(t); + } // Note that the multiple sender case is a little trickier // semantically than the single sender case. The logic for @@ -160,7 +161,7 @@ impl<T> Packet<T> { // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { - return Err(t) + return Err(t); } self.queue.push(t); @@ -197,7 +198,7 @@ impl<T> Packet<T> { // maybe we're done, if we're not the last ones // here, then we need to go try again. if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { - break + break; } } @@ -236,7 +237,10 @@ impl<T> Packet<T> { } match self.try_recv() { - data @ Ok(..) => unsafe { *self.steals.get() -= 1; data }, + data @ Ok(..) => unsafe { + *self.steals.get() -= 1; + data + }, data => data, } } @@ -252,12 +256,16 @@ impl<T> Packet<T> { let steals = ptr::replace(self.steals.get(), 0); match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { assert!(n >= 0); - if n - steals <= 0 { return Installed } + if n - steals <= 0 { + return Installed; + } } } @@ -290,7 +298,10 @@ impl<T> Packet<T> { loop { thread::yield_now(); match self.queue.pop() { - mpsc::Data(t) => { data = t; break } + mpsc::Data(t) => { + data = t; + break; + } mpsc::Empty => panic!("inconsistent => empty"), mpsc::Inconsistent => {} } @@ -361,9 +372,13 @@ impl<T> Packet<T> { } match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { - -1 => { self.take_to_wake().signal(); } + -1 => { + self.take_to_wake().signal(); + } DISCONNECTED => {} - n => { assert!(n >= 0); } + n => { + assert!(n >= 0); + } } } @@ -380,7 +395,9 @@ impl<T> Packet<T> { // control of this thread. loop { match self.queue.pop() { - mpsc::Data(..) => { steals += 1; } + mpsc::Data(..) => { + steals += 1; + } mpsc::Empty | mpsc::Inconsistent => break, } } @@ -406,7 +423,7 @@ impl<T> Packet<T> { self.cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } - n => n + n => n, } } @@ -432,7 +449,7 @@ impl<T> Packet<T> { // positive. let steals = { let cnt = self.cnt.load(Ordering::SeqCst); - if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 } }; let prev = self.bump(steals + 1); diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 0edb1c24e80..c51aa7619db 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -6,8 +6,8 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue -use core::ptr; use core::cell::UnsafeCell; +use core::ptr; use crate::boxed::Box; use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; @@ -19,16 +19,16 @@ struct Node<T> { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? - value: Option<T>, // nullable for re-use of nodes - cached: bool, // This node goes into the node cache - next: AtomicPtr<Node<T>>, // next node in the queue + value: Option<T>, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache + next: AtomicPtr<Node<T>>, // next node in the queue } /// The single-producer single-consumer queue. This structure is not cloneable, /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { +pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> { // consumer fields consumer: CacheAligned<Consumer<T, ConsumerAddition>>, @@ -38,9 +38,9 @@ pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { struct Consumer<T, Addition> { tail: UnsafeCell<*mut Node<T>>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - cache_bound: usize, // maximum cache size - cached_nodes: AtomicUsize, // number of nodes marked as cachable + tail_prev: AtomicPtr<Node<T>>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable addition: Addition, } @@ -51,9 +51,9 @@ struct Producer<T, Addition> { addition: Addition, } -unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {} -unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {} impl<T> Node<T> { fn new() -> *mut Node<T> { @@ -66,7 +66,6 @@ impl<T> Node<T> { } impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { - /// Creates a new queue. With given additional elements in the producer and /// consumer portions of the queue. /// @@ -107,13 +106,13 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA tail_prev: AtomicPtr::new(n1), cache_bound: bound, cached_nodes: AtomicUsize::new(0), - addition: consumer_addition + addition: consumer_addition, }), producer: CacheAligned::new(Producer { head: UnsafeCell::new(n2), first: UnsafeCell::new(n1), tail_copy: UnsafeCell::new(n1), - addition: producer_addition + addition: producer_addition, }), } } @@ -142,8 +141,7 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA } // If the above fails, then update our copy of the tail and try // again. - *self.producer.0.tail_copy.get() = - self.consumer.tail_prev.load(Ordering::Acquire); + *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); if *self.producer.first.get() != *self.producer.tail_copy.get() { let ret = *self.producer.first.get(); *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); @@ -164,7 +162,9 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA // the current tail node is a candidate for going into the cache. let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); - if next.is_null() { return None } + if next.is_null() { + return None; + } assert!((*next).value.is_some()); let ret = (*next).value.take(); @@ -182,7 +182,8 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA self.consumer.tail_prev.store(tail, Ordering::Release); } else { (*self.consumer.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + .next + .store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box<Node<T>> = Box::from_raw(tail); @@ -234,9 +235,9 @@ impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, #[cfg(all(test, not(target_os = "emscripten")))] mod tests { use super::Queue; + use crate::sync::mpsc::channel; use crate::sync::Arc; use crate::thread; - use crate::sync::mpsc::channel; #[test] fn smoke() { @@ -265,15 +266,15 @@ mod tests { match queue.peek() { Some(vec) => { assert_eq!(&*vec, &[1]); - }, - None => unreachable!() + } + None => unreachable!(), } match queue.pop() { Some(vec) => { assert_eq!(&*vec, &[1]); - }, - None => unreachable!() + } + None => unreachable!(), } } } @@ -316,7 +317,7 @@ mod tests { let (tx, rx) = channel(); let q2 = q.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { for _ in 0..100000 { loop { match q2.pop() { diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index 40877282761..2e3270e81fc 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -6,10 +6,9 @@ /// /// High level implementation details can be found in the comment of the parent /// module. - pub use self::Failure::*; -pub use self::UpgradeResult::*; use self::Message::*; +pub use self::UpgradeResult::*; use core::cmp; use core::isize; @@ -19,10 +18,10 @@ use crate::ptr; use crate::thread; use crate::time::Instant; -use crate::sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool}; -use crate::sync::mpsc::Receiver; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::spsc_queue as spsc; +use crate::sync::mpsc::Receiver; const DISCONNECTED: isize = isize::MIN; #[cfg(test)] @@ -36,17 +35,16 @@ pub struct Packet<T> { } struct ProducerAddition { - cnt: AtomicIsize, // How many items are on this channel + cnt: AtomicIsize, // How many items are on this channel to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. } struct ConsumerAddition { - steals: UnsafeCell<isize>, // How many times has a port received without blocking? + steals: UnsafeCell<isize>, // How many times has a port received without blocking? } - pub enum Failure<T> { Empty, Disconnected, @@ -69,18 +67,18 @@ enum Message<T> { impl<T> Packet<T> { pub fn new() -> Packet<T> { Packet { - queue: unsafe { spsc::Queue::with_additions( - 128, - ProducerAddition { - cnt: AtomicIsize::new(0), - to_wake: AtomicUsize::new(0), - - port_dropped: AtomicBool::new(false), - }, - ConsumerAddition { - steals: UnsafeCell::new(0), - } - )}, + queue: unsafe { + spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicUsize::new(0), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { steals: UnsafeCell::new(0) }, + ) + }, } } @@ -88,11 +86,15 @@ impl<T> Packet<T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return Err(t); + } match self.do_send(Data(t)) { - UpSuccess | UpDisconnected => {}, - UpWoke(token) => { token.signal(); } + UpSuccess | UpDisconnected => {} + UpWoke(token) => { + token.signal(); + } } Ok(()) } @@ -101,7 +103,7 @@ impl<T> Packet<T> { // If the port has gone away, then there's no need to proceed any // further. if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { - return UpDisconnected + return UpDisconnected; } self.do_send(GoUp(up)) @@ -136,7 +138,10 @@ impl<T> Packet<T> { // Otherwise we just sent some data on a non-waiting queue, so just // make sure the world is sane and carry on! - n => { assert!(n >= 0); UpSuccess } + n => { + assert!(n >= 0); + UpSuccess + } } } @@ -166,7 +171,9 @@ impl<T> Packet<T> { // data, we successfully sleep n => { assert!(n >= 0); - if n - steals <= 0 { return Ok(()) } + if n - steals <= 0 { + return Ok(()); + } } } @@ -199,8 +206,7 @@ impl<T> Packet<T> { // Messages which actually popped from the queue shouldn't count as // a steal, so offset the decrement here (we already have our // "steal" factored into the channel count above). - data @ Ok(..) | - data @ Err(Upgraded(..)) => unsafe { + data @ Ok(..) | data @ Err(Upgraded(..)) => unsafe { *self.queue.consumer_addition().steals.get() -= 1; data }, @@ -226,8 +232,10 @@ impl<T> Packet<T> { if *self.queue.consumer_addition().steals.get() > MAX_STEALS { match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.queue.producer_addition().cnt.store( - DISCONNECTED, Ordering::SeqCst); + self.queue + .producer_addition() + .cnt + .store(DISCONNECTED, Ordering::SeqCst); } n => { let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); @@ -259,13 +267,11 @@ impl<T> Packet<T> { // We can ignore steals because the other end is // disconnected and we'll never need to really factor in our // steals again. - _ => { - match self.queue.pop() { - Some(Data(t)) => Ok(t), - Some(GoUp(up)) => Err(Upgraded(up)), - None => Err(Disconnected), - } - } + _ => match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + }, } } } @@ -275,9 +281,13 @@ impl<T> Packet<T> { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { - -1 => { self.take_to_wake().signal(); } + -1 => { + self.take_to_wake().signal(); + } DISCONNECTED => {} - n => { assert!(n >= 0); } + n => { + assert!(n >= 0); + } } } @@ -314,10 +324,15 @@ impl<T> Packet<T> { let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; while { let cnt = self.queue.producer_addition().cnt.compare_and_swap( - steals, DISCONNECTED, Ordering::SeqCst); + steals, + DISCONNECTED, + Ordering::SeqCst, + ); cnt != DISCONNECTED && cnt != steals } { - while let Some(_) = self.queue.pop() { steals += 1; } + while let Some(_) = self.queue.pop() { + steals += 1; + } } // At this point in time, we have gated all future senders from sending, @@ -338,13 +353,12 @@ impl<T> Packet<T> { self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } - n => n + n => n, } } // Removes a previous thread from being blocked in this port - pub fn abort_selection(&self, - was_upgrade: bool) -> Result<bool, Receiver<T>> { + pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> { // If we're aborting selection after upgrading from a oneshot, then // we're guarantee that no one is waiting. The only way that we could // have seen the upgrade is if data was actually sent on the channel @@ -361,7 +375,7 @@ impl<T> Packet<T> { if was_upgrade { assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); - return Ok(true) + return Ok(true); } // We want to make sure that the count on the channel goes non-negative, @@ -416,12 +430,10 @@ impl<T> Packet<T> { // upgraded port. if has_data { match self.queue.peek() { - Some(&mut GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => Err(port), - _ => unreachable!(), - } - } + Some(&mut GoUp(..)) => match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + }, _ => Ok(true), } } else { diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs index 58a4b716afb..79e86817154 100644 --- a/src/libstd/sync/mpsc/sync.rs +++ b/src/libstd/sync/mpsc/sync.rs @@ -1,3 +1,4 @@ +use self::Blocker::*; /// Synchronous channels/ports /// /// This channel implementation differs significantly from the asynchronous @@ -22,17 +23,15 @@ /// implementation shares almost all code for the buffered and unbuffered cases /// of a synchronous channel. There are a few branches for the unbuffered case, /// but they're mostly just relevant to blocking senders. - pub use self::Failure::*; -use self::Blocker::*; use core::intrinsics::abort; use core::isize; use core::mem; use core::ptr; -use crate::sync::atomic::{Ordering, AtomicUsize}; -use crate::sync::mpsc::blocking::{self, WaitToken, SignalToken}; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken}; use crate::sync::{Mutex, MutexGuard}; use crate::time::Instant; @@ -46,9 +45,9 @@ pub struct Packet<T> { lock: Mutex<State<T>>, } -unsafe impl<T: Send> Send for Packet<T> { } +unsafe impl<T: Send> Send for Packet<T> {} -unsafe impl<T: Send> Sync for Packet<T> { } +unsafe impl<T: Send> Sync for Packet<T> {} struct State<T> { disconnected: bool, // Is the channel disconnected yet? @@ -72,7 +71,7 @@ unsafe impl<T: Send> Send for State<T> {} enum Blocker { BlockedSender(SignalToken), BlockedReceiver(SignalToken), - NoneBlocked + NoneBlocked, } /// Simple queue for threading threads together. Nodes are stack-allocated, so @@ -104,35 +103,35 @@ pub enum Failure { /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` /// in the meantime. This re-locks the mutex upon returning. -fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>, - mut guard: MutexGuard<'b, State<T>>, - f: fn(SignalToken) -> Blocker) - -> MutexGuard<'a, State<T>> -{ +fn wait<'a, 'b, T>( + lock: &'a Mutex<State<T>>, + mut guard: MutexGuard<'b, State<T>>, + f: fn(SignalToken) -> Blocker, +) -> MutexGuard<'a, State<T>> { let (wait_token, signal_token) = blocking::tokens(); match mem::replace(&mut guard.blocker, f(signal_token)) { NoneBlocked => {} _ => unreachable!(), } - drop(guard); // unlock - wait_token.wait(); // block + drop(guard); // unlock + wait_token.wait(); // block lock.lock().unwrap() // relock } /// Same as wait, but waiting at most until `deadline`. -fn wait_timeout_receiver<'a, 'b, T>(lock: &'a Mutex<State<T>>, - deadline: Instant, - mut guard: MutexGuard<'b, State<T>>, - success: &mut bool) - -> MutexGuard<'a, State<T>> -{ +fn wait_timeout_receiver<'a, 'b, T>( + lock: &'a Mutex<State<T>>, + deadline: Instant, + mut guard: MutexGuard<'b, State<T>>, + success: &mut bool, +) -> MutexGuard<'a, State<T>> { let (wait_token, signal_token) = blocking::tokens(); match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) { NoneBlocked => {} _ => unreachable!(), } - drop(guard); // unlock - *success = wait_token.wait_max_until(deadline); // block + drop(guard); // unlock + *success = wait_token.wait_max_until(deadline); // block let mut new_guard = lock.lock().unwrap(); // relock if !*success { abort_selection(&mut new_guard); @@ -147,7 +146,10 @@ fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool { guard.blocker = BlockedSender(token); true } - BlockedReceiver(token) => { drop(token); false } + BlockedReceiver(token) => { + drop(token); + false + } } } @@ -168,12 +170,9 @@ impl<T> Packet<T> { blocker: NoneBlocked, cap: capacity, canceled: None, - queue: Queue { - head: ptr::null_mut(), - tail: ptr::null_mut(), - }, + queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }, buf: Buffer { - buf: (0..capacity + if capacity == 0 {1} else {0}).map(|_| None).collect(), + buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(), start: 0, size: 0, }, @@ -200,7 +199,9 @@ impl<T> Packet<T> { pub fn send(&self, t: T) -> Result<(), T> { let mut guard = self.acquire_send_slot(); - if guard.disconnected { return Err(t) } + if guard.disconnected { + return Err(t); + } guard.buf.enqueue(t); match mem::replace(&mut guard.blocker, NoneBlocked) { @@ -213,14 +214,17 @@ impl<T> Packet<T> { assert!(guard.canceled.is_none()); guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); let mut guard = wait(&self.lock, guard, BlockedSender); - if canceled {Err(guard.buf.dequeue())} else {Ok(())} + if canceled { Err(guard.buf.dequeue()) } else { Ok(()) } } // success, we buffered some data NoneBlocked => Ok(()), // success, someone's about to receive our buffered data. - BlockedReceiver(token) => { wakeup(token, guard); Ok(()) } + BlockedReceiver(token) => { + wakeup(token, guard); + Ok(()) + } BlockedSender(..) => panic!("lolwut"), } @@ -271,10 +275,8 @@ impl<T> Packet<T> { // while loop because we're the only receiver. if !guard.disconnected && guard.buf.size() == 0 { if let Some(deadline) = deadline { - guard = wait_timeout_receiver(&self.lock, - deadline, - guard, - &mut woke_up_after_waiting); + guard = + wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting); } else { guard = wait(&self.lock, guard, BlockedReceiver); woke_up_after_waiting = true; @@ -290,7 +292,9 @@ impl<T> Packet<T> { // Pick up the data, wake up our neighbors, and carry on assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting)); - if guard.buf.size() == 0 { return Err(Empty); } + if guard.buf.size() == 0 { + return Err(Empty); + } let ret = guard.buf.dequeue(); self.wakeup_senders(woke_up_after_waiting, guard); @@ -301,8 +305,12 @@ impl<T> Packet<T> { let mut guard = self.lock.lock().unwrap(); // Easy cases first - if guard.disconnected && guard.buf.size() == 0 { return Err(Disconnected) } - if guard.buf.size() == 0 { return Err(Empty) } + if guard.disconnected && guard.buf.size() == 0 { + return Err(Disconnected); + } + if guard.buf.size() == 0 { + return Err(Empty); + } // Be sure to wake up neighbors let ret = Ok(guard.buf.dequeue()); @@ -357,12 +365,14 @@ impl<T> Packet<T> { // Only flag the channel as disconnected if we're the last channel match self.channels.fetch_sub(1, Ordering::SeqCst) { 1 => {} - _ => return + _ => return, } // Not much to do other than wake up a receiver if one's there let mut guard = self.lock.lock().unwrap(); - if guard.disconnected { return } + if guard.disconnected { + return; + } guard.disconnected = true; match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => {} @@ -374,7 +384,9 @@ impl<T> Packet<T> { pub fn drop_port(&self) { let mut guard = self.lock.lock().unwrap(); - if guard.disconnected { return } + if guard.disconnected { + return; + } guard.disconnected = true; // If the capacity is 0, then the sender may want its data back after @@ -382,15 +394,9 @@ impl<T> Packet<T> { // the buffered data. As with many other portions of this code, this // needs to be careful to destroy the data *outside* of the lock to // prevent deadlock. - let _data = if guard.cap != 0 { - mem::take(&mut guard.buf.buf) - } else { - Vec::new() - }; - let mut queue = mem::replace(&mut guard.queue, Queue { - head: ptr::null_mut(), - tail: ptr::null_mut(), - }); + let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() }; + let mut queue = + mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }); let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => None, @@ -402,7 +408,9 @@ impl<T> Packet<T> { }; mem::drop(guard); - while let Some(token) = queue.dequeue() { token.signal(); } + while let Some(token) = queue.dequeue() { + token.signal(); + } waiter.map(|t| t.signal()); } } @@ -416,7 +424,6 @@ impl<T> Drop for Packet<T> { } } - //////////////////////////////////////////////////////////////////////////////// // Buffer, a simple ring buffer backed by Vec<T> //////////////////////////////////////////////////////////////////////////////// @@ -437,8 +444,12 @@ impl<T> Buffer<T> { result.take().unwrap() } - fn size(&self) -> usize { self.size } - fn capacity(&self) -> usize { self.buf.len() } + fn size(&self) -> usize { + self.size + } + fn capacity(&self) -> usize { + self.buf.len() + } } //////////////////////////////////////////////////////////////////////////////// @@ -466,7 +477,7 @@ impl Queue { fn dequeue(&mut self) -> Option<SignalToken> { if self.head.is_null() { - return None + return None; } let node = self.head; self.head = unsafe { (*node).next }; |
