diff options
| author | Celina G. Val <celinval@amazon.com> | 2025-06-11 11:12:32 -0700 |
|---|---|---|
| committer | Celina G. Val <celinval@amazon.com> | 2025-06-11 11:12:32 -0700 |
| commit | 0b9b1df0064396708a5e5ca27fd010ae3ad3a305 (patch) | |
| tree | c854a9e2fd775dba7f0a5e70a2c7be121b5fec0b | |
| parent | 35c5144394c1b93784867d330f694fa7c8f480e3 (diff) | |
| download | rust-0b9b1df0064396708a5e5ca27fd010ae3ad3a305.tar.gz rust-0b9b1df0064396708a5e5ca27fd010ae3ad3a305.zip | |
Fix format and tidy for code moved from rayon
25 files changed, 190 insertions, 328 deletions
diff --git a/compiler/rustc_thread_pool/src/broadcast/mod.rs b/compiler/rustc_thread_pool/src/broadcast/mod.rs index 442891f2d28..c2b0d47f829 100644 --- a/compiler/rustc_thread_pool/src/broadcast/mod.rs +++ b/compiler/rustc_thread_pool/src/broadcast/mod.rs @@ -1,10 +1,11 @@ -use crate::job::{ArcJob, StackJob}; -use crate::latch::{CountLatch, LatchRef}; -use crate::registry::{Registry, WorkerThread}; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; +use crate::job::{ArcJob, StackJob}; +use crate::latch::{CountLatch, LatchRef}; +use crate::registry::{Registry, WorkerThread}; + mod test; /// Executes `op` within every thread in the current threadpool. If this is @@ -53,10 +54,7 @@ impl<'a> BroadcastContext<'a> { pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R { let worker_thread = WorkerThread::current(); assert!(!worker_thread.is_null()); - f(BroadcastContext { - worker: unsafe { &*worker_thread }, - _marker: PhantomData, - }) + f(BroadcastContext { worker: unsafe { &*worker_thread }, _marker: PhantomData }) } /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`). @@ -108,9 +106,8 @@ where let current_thread = WorkerThread::current().as_ref(); let tlv = crate::tlv::get(); let latch = CountLatch::with_count(n_threads, current_thread); - let jobs: Vec<_> = (0..n_threads) - .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))) - .collect(); + let jobs: Vec<_> = + (0..n_threads).map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))).collect(); let job_refs = jobs.iter().map(|job| job.as_job_ref()); registry.inject_broadcast(job_refs); diff --git a/compiler/rustc_thread_pool/src/broadcast/test.rs b/compiler/rustc_thread_pool/src/broadcast/tests.rs index 00ab4ad7fe4..fac8b8ad466 100644 --- a/compiler/rustc_thread_pool/src/broadcast/test.rs +++ b/compiler/rustc_thread_pool/src/broadcast/tests.rs @@ -1,11 +1,12 @@ #![cfg(test)] -use crate::ThreadPoolBuilder; +use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; -use std::sync::Arc; use std::{thread, time}; +use crate::ThreadPoolBuilder; + #[test] fn broadcast_global() { let v = crate::broadcast(|ctx| ctx.index()); diff --git a/compiler/rustc_thread_pool/src/job.rs b/compiler/rustc_thread_pool/src/job.rs index 394c7576b2c..3241914ba81 100644 --- a/compiler/rustc_thread_pool/src/job.rs +++ b/compiler/rustc_thread_pool/src/job.rs @@ -1,13 +1,14 @@ -use crate::latch::Latch; -use crate::tlv; -use crate::tlv::Tlv; -use crate::unwind; -use crossbeam_deque::{Injector, Steal}; use std::any::Any; use std::cell::UnsafeCell; use std::mem; use std::sync::Arc; +use crossbeam_deque::{Injector, Steal}; + +use crate::latch::Latch; +use crate::tlv::Tlv; +use crate::{tlv, unwind}; + pub(super) enum JobResult<T> { None, Ok(T), @@ -29,7 +30,7 @@ pub(super) trait Job { /// Effectively a Job trait object. Each JobRef **must** be executed /// exactly once, or else data may leak. /// -/// Internally, we store the job's data in a `*const ()` pointer. The +/// Internally, we store the job's data in a `*const ()` pointer. The /// true type is something like `*const StackJob<...>`, but we hide /// it. We also carry the "execute fn" from the `Job` trait. pub(super) struct JobRef { @@ -48,10 +49,7 @@ impl JobRef { T: Job, { // erase types: - JobRef { - pointer: data as *const (), - execute_fn: <T as Job>::execute, - } + JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute } } /// Returns an opaque handle that can be saved and compared, @@ -69,7 +67,7 @@ impl JobRef { /// A job that will be owned by a stack slot. This means that when it /// executes it need not free any heap data, the cleanup occurs when -/// the stack frame is later popped. The function parameter indicates +/// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. pub(super) struct StackJob<L, F, R> where @@ -248,13 +246,11 @@ pub(super) struct JobFifo { impl JobFifo { pub(super) fn new() -> Self { - JobFifo { - inner: Injector::new(), - } + JobFifo { inner: Injector::new() } } pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { - // A little indirection ensures that spawns are always prioritized in FIFO order. The + // A little indirection ensures that spawns are always prioritized in FIFO order. The // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front // (FIFO), but either way they will end up popping from the front of this queue. self.inner.push(job_ref); diff --git a/compiler/rustc_thread_pool/src/join/mod.rs b/compiler/rustc_thread_pool/src/join/mod.rs index 032eec9c4c8..798a8347d79 100644 --- a/compiler/rustc_thread_pool/src/join/mod.rs +++ b/compiler/rustc_thread_pool/src/join/mod.rs @@ -1,11 +1,10 @@ +use std::any::Any; + use crate::job::StackJob; use crate::latch::SpinLatch; use crate::registry::{self, WorkerThread}; use crate::tlv::{self, Tlv}; -use crate::unwind; -use std::any::Any; - -use crate::FnContext; +use crate::{FnContext, unwind}; #[cfg(test)] mod test; @@ -22,7 +21,7 @@ mod test; /// it. /// /// When `join` is called from outside the thread pool, the calling -/// thread will block while the closures execute in the pool. When +/// thread will block while the closures execute in the pool. When /// `join` is called within the pool, the calling thread still actively /// participates in the thread pool. It will begin by executing closure /// A (on the current thread). While it is doing that, it will advertise @@ -80,13 +79,13 @@ mod test; /// CPU-bound tasks that do not perform I/O or other blocking /// operations. If you do perform I/O, and that I/O should block /// (e.g., waiting for a network request), the overall performance may -/// be poor. Moreover, if you cause one closure to be blocked waiting +/// be poor. Moreover, if you cause one closure to be blocked waiting /// on another (for example, using a channel), that could lead to a /// deadlock. /// /// # Panics /// -/// No matter what happens, both closures will always be executed. If +/// No matter what happens, both closures will always be executed. If /// a single closure panics, whether it be the first or second /// closure, that panic will be propagated and hence `join()` will /// panic with the same panic value. If both closures panic, `join()` @@ -109,7 +108,7 @@ where /// Identical to `join`, except that the closures have a parameter /// that provides context for the way the closure has been called, /// especially indicating whether they're executing on a different -/// thread than where `join_context` was called. This will occur if +/// thread than where `join_context` was called. This will occur if /// the second job is stolen by a different thread, or if /// `join_context` was called from outside the thread pool to begin /// with. @@ -148,7 +147,7 @@ where }; // Now that task A has finished, try to pop job B from the - // local stack. It may already have been popped by job A; it + // local stack. It may already have been popped by job A; it // may also have been stolen. There may also be some tasks // pushed on top of it in the stack, and we will have to pop // those off to get to it. diff --git a/compiler/rustc_thread_pool/src/join/test.rs b/compiler/rustc_thread_pool/src/join/tests.rs index 03f4ab4478d..9df99072c3a 100644 --- a/compiler/rustc_thread_pool/src/join/test.rs +++ b/compiler/rustc_thread_pool/src/join/tests.rs @@ -1,11 +1,12 @@ //! Tests for the join code. -use super::*; -use crate::ThreadPoolBuilder; use rand::distr::StandardUniform; use rand::{Rng, SeedableRng}; use rand_xorshift::XorShiftRng; +use super::*; +use crate::ThreadPoolBuilder; + fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) { if v.len() <= 1 { return; diff --git a/compiler/rustc_thread_pool/src/latch.rs b/compiler/rustc_thread_pool/src/latch.rs index 8903942a8ce..f2f806e0184 100644 --- a/compiler/rustc_thread_pool/src/latch.rs +++ b/compiler/rustc_thread_pool/src/latch.rs @@ -28,7 +28,7 @@ use crate::registry::{Registry, WorkerThread}; /// - Once `probe()` returns true, all memory effects from the `set()` /// are visible (in other words, the set should synchronize-with /// the probe). -/// - Once `set()` occurs, the next `probe()` *will* observe it. This +/// - Once `set()` occurs, the next `probe()` *will* observe it. This /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. pub(super) trait Latch { @@ -78,9 +78,7 @@ pub(super) struct CoreLatch { impl CoreLatch { #[inline] fn new() -> Self { - Self { - state: AtomicUsize::new(0), - } + Self { state: AtomicUsize::new(0) } } /// Invoked by owning thread as it prepares to sleep. Returns true @@ -88,9 +86,7 @@ impl CoreLatch { /// latch was set in the meantime. #[inline] pub(super) fn get_sleepy(&self) -> bool { - self.state - .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() + self.state.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed).is_ok() } /// Invoked by owning thread as it falls asleep sleep. Returns @@ -98,9 +94,7 @@ impl CoreLatch { /// was set in the meantime. #[inline] pub(super) fn fall_asleep(&self) -> bool { - self.state - .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() + self.state.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed).is_ok() } /// Invoked by owning thread as it falls asleep sleep. Returns @@ -110,8 +104,7 @@ impl CoreLatch { pub(super) fn wake_up(&self) { if !self.probe() { let _ = - self.state - .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); + self.state.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); } } @@ -166,15 +159,12 @@ impl<'r> SpinLatch<'r> { } } - /// Creates a new spin latch for cross-threadpool blocking. Notably, we + /// Creates a new spin latch for cross-threadpool blocking. Notably, we /// need to make sure the registry is kept alive after setting, so we can /// safely call the notification. #[inline] pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { - SpinLatch { - cross: true, - ..SpinLatch::new(thread) - } + SpinLatch { cross: true, ..SpinLatch::new(thread) } } #[inline] @@ -235,10 +225,7 @@ pub(super) struct LockLatch { impl LockLatch { #[inline] pub(super) fn new() -> LockLatch { - LockLatch { - m: Mutex::new(false), - v: Condvar::new(), - } + LockLatch { m: Mutex::new(false), v: Condvar::new() } } /// Block until latch is set, then resets this lock latch so it can be reused again. @@ -288,9 +275,7 @@ pub(super) struct OnceLatch { impl OnceLatch { #[inline] pub(super) fn new() -> OnceLatch { - Self { - core_latch: CoreLatch::new(), - } + Self { core_latch: CoreLatch::new() } } /// Set the latch, then tickle the specific worker thread, @@ -372,9 +357,7 @@ impl CountLatch { registry: Arc::clone(owner.registry()), worker_index: owner.index(), }, - None => CountLatchKind::Blocking { - latch: LockLatch::new(), - }, + None => CountLatchKind::Blocking { latch: LockLatch::new() }, }, } } @@ -387,11 +370,7 @@ impl CountLatch { pub(super) fn wait(&self, owner: Option<&WorkerThread>) { match &self.kind { - CountLatchKind::Stealing { - latch, - registry, - worker_index, - } => unsafe { + CountLatchKind::Stealing { latch, registry, worker_index } => unsafe { let owner = owner.expect("owner thread"); debug_assert_eq!(registry.id(), owner.registry().id()); debug_assert_eq!(*worker_index, owner.index()); @@ -409,11 +388,7 @@ impl Latch for CountLatch { // NOTE: Once we call `set` on the internal `latch`, // the target may proceed and invalidate `this`! match (*this).kind { - CountLatchKind::Stealing { - ref latch, - ref registry, - worker_index, - } => { + CountLatchKind::Stealing { ref latch, ref registry, worker_index } => { let registry = Arc::clone(registry); if CoreLatch::set(latch) { registry.notify_worker_latch_is_set(worker_index); @@ -433,10 +408,7 @@ pub(super) struct LatchRef<'a, L> { impl<L> LatchRef<'_, L> { pub(super) fn new(inner: &L) -> LatchRef<'_, L> { - LatchRef { - inner, - marker: PhantomData, - } + LatchRef { inner, marker: PhantomData } } } diff --git a/compiler/rustc_thread_pool/src/lib.rs b/compiler/rustc_thread_pool/src/lib.rs index 72064547e52..179d63ed668 100644 --- a/compiler/rustc_thread_pool/src/lib.rs +++ b/compiler/rustc_thread_pool/src/lib.rs @@ -57,20 +57,17 @@ //! //! While we strive to keep `rayon-core` semver-compatible, it's still //! possible to arrive at this situation if different crates have overly -//! restrictive tilde or inequality requirements for `rayon-core`. The +//! restrictive tilde or inequality requirements for `rayon-core`. The //! conflicting requirements will need to be resolved before the build will //! succeed. #![warn(rust_2018_idioms)] use std::any::Any; -use std::env; use std::error::Error; -use std::fmt; -use std::io; use std::marker::PhantomData; use std::str::FromStr; -use std::thread; +use std::{env, fmt, io, thread}; #[macro_use] mod private; @@ -92,20 +89,18 @@ mod test; pub mod tlv; -pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; -pub use self::join::{join, join_context}; -pub use self::registry::ThreadBuilder; -pub use self::registry::{mark_blocked, mark_unblocked, Registry}; -pub use self::scope::{in_place_scope, scope, Scope}; -pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; -pub use self::spawn::{spawn, spawn_fifo}; -pub use self::thread_pool::current_thread_has_pending_tasks; -pub use self::thread_pool::current_thread_index; -pub use self::thread_pool::ThreadPool; -pub use self::thread_pool::{yield_local, yield_now, Yield}; pub use worker_local::WorkerLocal; +pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast}; +pub use self::join::{join, join_context}; use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; +pub use self::registry::{Registry, ThreadBuilder, mark_blocked, mark_unblocked}; +pub use self::scope::{Scope, ScopeFifo, in_place_scope, in_place_scope_fifo, scope, scope_fifo}; +pub use self::spawn::{spawn, spawn_fifo}; +pub use self::thread_pool::{ + ThreadPool, Yield, current_thread_has_pending_tasks, current_thread_index, yield_local, + yield_now, +}; /// Returns the maximum number of threads that Rayon supports in a single thread-pool. /// @@ -282,7 +277,7 @@ where } /// Initializes the global thread pool. This initialization is - /// **optional**. If you do not call this function, the thread pool + /// **optional**. If you do not call this function, the thread pool /// will be automatically initialized with the default /// configuration. Calling `build_global` is not recommended, except /// in two scenarios: @@ -290,7 +285,7 @@ where /// - You wish to change the default configuration. /// - You are running a benchmark, in which case initializing may /// yield slightly more consistent results, since the worker threads - /// will already be ready to go even in the first iteration. But + /// will already be ready to go even in the first iteration. But /// this cost is minimal. /// /// Initialization of the global thread pool happens exactly @@ -490,26 +485,16 @@ impl<S> ThreadPoolBuilder<S> { if self.num_threads > 0 { self.num_threads } else { - let default = || { - thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - }; - - match env::var("RAYON_NUM_THREADS") - .ok() - .and_then(|s| usize::from_str(&s).ok()) - { + let default = || thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + + match env::var("RAYON_NUM_THREADS").ok().and_then(|s| usize::from_str(&s).ok()) { Some(x @ 1..) => return x, Some(0) => return default(), _ => {} } // Support for deprecated `RAYON_RS_NUM_CPUS`. - match env::var("RAYON_RS_NUM_CPUS") - .ok() - .and_then(|s| usize::from_str(&s).ok()) - { + match env::var("RAYON_RS_NUM_CPUS").ok().and_then(|s| usize::from_str(&s).ok()) { Some(x @ 1..) => x, _ => default(), } @@ -723,9 +708,7 @@ impl<S> ThreadPoolBuilder<S> { impl Configuration { /// Creates and return a valid rayon thread pool configuration, but does not initialize it. pub fn new() -> Configuration { - Configuration { - builder: ThreadPoolBuilder::new(), - } + Configuration { builder: ThreadPoolBuilder::new() } } /// Deprecated in favor of `ThreadPoolBuilder::build`. @@ -905,10 +888,7 @@ pub struct FnContext { impl FnContext { #[inline] fn new(migrated: bool) -> Self { - FnContext { - migrated, - _marker: PhantomData, - } + FnContext { migrated, _marker: PhantomData } } } diff --git a/compiler/rustc_thread_pool/src/private.rs b/compiler/rustc_thread_pool/src/private.rs index c85e77b9cbb..5d4f4a8c2ca 100644 --- a/compiler/rustc_thread_pool/src/private.rs +++ b/compiler/rustc_thread_pool/src/private.rs @@ -1,5 +1,5 @@ //! The public parts of this private module are used to create traits -//! that cannot be implemented outside of our own crate. This way we +//! that cannot be implemented outside of our own crate. This way we //! can feel free to extend those traits without worrying about it //! being a breaking change for other implementations. diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 781b6827b82..2848556aab6 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -1,23 +1,20 @@ +use std::cell::Cell; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hasher; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; +use std::{fmt, io, mem, ptr, thread}; + +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; + use crate::job::{JobFifo, JobRef, StackJob}; use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::sleep::Sleep; use crate::tlv::Tlv; -use crate::unwind; use crate::{ AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, - ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, + ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, unwind, }; -use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use std::cell::Cell; -use std::collections::hash_map::DefaultHasher; -use std::fmt; -use std::hash::Hasher; -use std::io; -use std::mem; -use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Once}; -use std::thread; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). @@ -193,9 +190,7 @@ fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadP where F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, { - let mut result = Err(ThreadPoolBuildError::new( - ErrorKind::GlobalPoolAlreadyInitialized, - )); + let mut result = Err(ThreadPoolBuildError::new(ErrorKind::GlobalPoolAlreadyInitialized)); THE_REGISTRY_SET.call_once(|| { result = registry().map(|registry: Arc<Registry>| { @@ -222,25 +217,23 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new() - .num_threads(1) - .spawn_handler(|thread| { - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - unsafe { - WorkerThread::set_current(worker_thread); - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - } + let builder = ThreadPoolBuilder::new().num_threads(1).spawn_handler(|thread| { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + unsafe { + WorkerThread::set_current(worker_thread); + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + } - Ok(()) - }); + Ok(()) + }); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { @@ -273,11 +266,7 @@ impl Registry { let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { - let worker = if breadth_first { - Worker::new_fifo() - } else { - Worker::new_lifo() - }; + let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() }; let stealer = worker.stealer(); (worker, stealer) @@ -341,7 +330,7 @@ impl Registry { } } - /// Returns the number of threads in the current registry. This + /// Returns the number of threads in the current registry. This /// is better than `Registry::current().num_threads()` because it /// avoids incrementing the `Arc`. pub(super) fn current_num_threads() -> usize { @@ -359,11 +348,7 @@ impl Registry { pub(super) fn current_thread(&self) -> Option<&WorkerThread> { unsafe { let worker = WorkerThread::current().as_ref()?; - if worker.registry().id() == self.id() { - Some(worker) - } else { - None - } + if worker.registry().id() == self.id() { Some(worker) } else { None } } } @@ -371,9 +356,7 @@ impl Registry { pub(super) fn id(&self) -> RegistryId { // We can rely on `self` not to change since we only ever create // registries that are boxed up in an `Arc` (see `new()` above). - RegistryId { - addr: self as *const Self as usize, - } + RegistryId { addr: self as *const Self as usize } } pub(super) fn num_threads(&self) -> usize { @@ -391,7 +374,7 @@ impl Registry { } } - /// Waits for the worker threads to get up and running. This is + /// Waits for the worker threads to get up and running. This is /// meant to be used for benchmarking purposes, primarily, so that /// you can get more consistent numbers by having everything /// "ready to go". @@ -512,7 +495,7 @@ impl Registry { /// If already in a worker-thread of this registry, just execute `op`. /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` /// completes and return its return value. If `op` panics, that panic will - /// be propagated as well. The second argument indicates `true` if injection + /// be propagated as well. The second argument indicates `true` if injection /// was performed, `false` if executed directly. pub(super) fn in_worker<OP, R>(&self, op: OP) -> R where @@ -844,9 +827,7 @@ impl WorkerThread { // The job might have injected local work, so go back to the outer loop. continue 'outer; } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, &self) + self.registry.sleep.no_work_found(&mut idle_state, latch, &self) } } @@ -880,9 +861,7 @@ impl WorkerThread { // deques, and finally to injected jobs from the // outside. The idea is to finish what we started before // we take on something new. - self.take_local_job() - .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job()) + self.take_local_job().or_else(|| self.steal()).or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { @@ -984,10 +963,10 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.release_thread(); } -/// If already in a worker-thread, just execute `op`. Otherwise, +/// If already in a worker-thread, just execute `op`. Otherwise, /// execute `op` in the default thread-pool. Either way, block until /// `op` completes and return its return value. If `op` panics, that -/// panic will be propagated as well. The second argument indicates +/// panic will be propagated as well. The second argument indicates /// `true` if injection was performed, `false` if executed directly. pub(super) fn in_worker<OP, R>(op: OP) -> R where @@ -1026,9 +1005,7 @@ impl XorShift64Star { seed = hasher.finish(); } - XorShift64Star { - state: Cell::new(seed), - } + XorShift64Star { state: Cell::new(seed) } } fn next(&self) -> u64 { diff --git a/compiler/rustc_thread_pool/src/scope/mod.rs b/compiler/rustc_thread_pool/src/scope/mod.rs index 364b322baad..95a4e0b7a18 100644 --- a/compiler/rustc_thread_pool/src/scope/mod.rs +++ b/compiler/rustc_thread_pool/src/scope/mod.rs @@ -5,19 +5,19 @@ //! [`in_place_scope()`]: fn.in_place_scope.html //! [`join()`]: ../join/join.fn.html +use std::any::Any; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::sync::Arc; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::{fmt, ptr}; + use crate::broadcast::BroadcastContext; use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; use crate::latch::{CountLatch, Latch}; -use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; +use crate::registry::{Registry, WorkerThread, global_registry, in_worker}; use crate::tlv::{self, Tlv}; use crate::unwind; -use std::any::Any; -use std::fmt; -use std::marker::PhantomData; -use std::mem::ManuallyDrop; -use std::ptr; -use std::sync::atomic::{AtomicPtr, Ordering}; -use std::sync::Arc; #[cfg(test)] mod test; @@ -53,7 +53,7 @@ struct ScopeBase<'scope> { job_completed_latch: CountLatch, /// You can think of a scope as containing a list of closures to execute, - /// all of which outlive `'scope`. They're not actually required to be + /// all of which outlive `'scope`. They're not actually required to be /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. #[allow(clippy::type_complexity)] @@ -179,9 +179,9 @@ struct ScopeBase<'scope> { /// they were spawned. So in this example, absent any stealing, we can /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other /// threads always steal from the other end of the deque, like FIFO -/// order. The idea is that "recent" tasks are most likely to be fresh +/// order. The idea is that "recent" tasks are most likely to be fresh /// in the local CPU's cache, while other threads can steal older -/// "stale" tasks. For an alternate approach, consider +/// "stale" tasks. For an alternate approach, consider /// [`scope_fifo()`] instead. /// /// [`scope_fifo()`]: fn.scope_fifo.html @@ -353,7 +353,7 @@ where /// /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on /// the thread from which they were spawned, as opposed to `scope()`'s -/// LIFO. So in this example, we can expect `s.1` to execute before +/// LIFO. So in this example, we can expect `s.1` to execute before /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in /// FIFO order, as usual. Overall, this has roughly the same order as /// the now-deprecated [`breadth_first`] option, except the effect is @@ -469,7 +469,7 @@ impl<'scope> Scope<'scope> { } /// Spawns a job into the fork-join scope `self`. This job will - /// execute sometime before the fork-join scope completes. The + /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its /// own reference to the scope `self` as argument. This can be /// used to inject new jobs into `self`. @@ -539,7 +539,7 @@ impl<'scope> Scope<'scope> { } /// Spawns a job into every thread of the fork-join scope `self`. This job will - /// execute on each thread sometime before the fork-join scope completes. The + /// execute on each thread sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its own reference /// to the scope `self` as argument, as well as a `BroadcastContext`. pub fn spawn_broadcast<BODY>(&self, body: BODY) @@ -567,7 +567,7 @@ impl<'scope> ScopeFifo<'scope> { } /// Spawns a job into the fork-join scope `self`. This job will - /// execute sometime before the fork-join scope completes. The + /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its /// own reference to the scope `self` as argument. This can be /// used to inject new jobs into `self`. @@ -575,7 +575,7 @@ impl<'scope> ScopeFifo<'scope> { /// # See also /// /// This method is akin to [`Scope::spawn()`], but with a FIFO - /// priority. The [`scope_fifo` function] has more details about + /// priority. The [`scope_fifo` function] has more details about /// this distinction. /// /// [`Scope::spawn()`]: struct.Scope.html#method.spawn @@ -605,7 +605,7 @@ impl<'scope> ScopeFifo<'scope> { } /// Spawns a job into every thread of the fork-join scope `self`. This job will - /// execute on each thread sometime before the fork-join scope completes. The + /// execute on each thread sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its own reference /// to the scope `self` as argument, as well as a `BroadcastContext`. pub fn spawn_broadcast<BODY>(&self, body: BODY) diff --git a/compiler/rustc_thread_pool/src/scope/test.rs b/compiler/rustc_thread_pool/src/scope/tests.rs index 4505ba7c4fb..2df3bc67e29 100644 --- a/compiler/rustc_thread_pool/src/scope/test.rs +++ b/compiler/rustc_thread_pool/src/scope/tests.rs @@ -1,13 +1,13 @@ -use crate::unwind; -use crate::ThreadPoolBuilder; -use crate::{scope, scope_fifo, Scope, ScopeFifo}; -use rand::{Rng, SeedableRng}; -use rand_xorshift::XorShiftRng; use std::iter::once; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Barrier, Mutex}; use std::vec; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; + +use crate::{Scope, ScopeFifo, ThreadPoolBuilder, scope, scope_fifo, unwind}; + #[test] fn scope_empty() { scope(|_| {}); @@ -93,10 +93,7 @@ impl<T: Send> Tree<T> { where OP: Fn(&mut T) + Sync, { - let Tree { - ref mut value, - ref mut children, - } = *self; + let Tree { ref mut value, ref mut children } = *self; scope.spawn(move |scope| { for child in children { scope.spawn(move |scope| child.update_in_scope(op, scope)); @@ -124,10 +121,7 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> { .collect() }; - Tree { - value: rng.random_range(0..1_000_000), - children, - } + Tree { value: rng.random_range(0..1_000_000), children } } #[test] @@ -161,11 +155,7 @@ fn linear_stack_growth() { let diff_when_500 = *max_diff.get_mut().unwrap() as f64; let ratio = diff_when_5 / diff_when_500; - assert!( - ratio > 0.9 && ratio < 1.1, - "stack usage ratio out of bounds: {}", - ratio - ); + assert!(ratio > 0.9 && ratio < 1.1, "stack usage ratio out of bounds: {}", ratio); }); } @@ -366,10 +356,7 @@ fn nested_fifo_order() { fn nested_lifo_fifo_order() { // LIFO on the outside, FIFO on the inside let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); - let expected: Vec<i32> = (0..10) - .rev() - .flat_map(|i| (0..10).map(move |j| i * 10 + j)) - .collect(); + let expected: Vec<i32> = (0..10).rev().flat_map(|i| (0..10).map(move |j| i * 10 + j)).collect(); assert_eq!(vec, expected); } @@ -378,9 +365,7 @@ fn nested_lifo_fifo_order() { fn nested_fifo_lifo_order() { // FIFO on the outside, LIFO on the inside let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); - let expected: Vec<i32> = (0..10) - .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) - .collect(); + let expected: Vec<i32> = (0..10).flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)).collect(); assert_eq!(vec, expected); } diff --git a/compiler/rustc_thread_pool/src/sleep/README.md b/compiler/rustc_thread_pool/src/sleep/README.md index e79efd15ca9..1e11da55f4a 100644 --- a/compiler/rustc_thread_pool/src/sleep/README.md +++ b/compiler/rustc_thread_pool/src/sleep/README.md @@ -182,7 +182,7 @@ This is possible because the C++ memory model typically offers guarantees of the form "if you see the access A, then you must see those other accesses" -- but it doesn't guarantee that you will see the access A (i.e., if you think of processors with independent caches, you may be operating on very out of date -cache state). +cache state). ## Using seq-cst fences to prevent deadlock diff --git a/compiler/rustc_thread_pool/src/sleep/counters.rs b/compiler/rustc_thread_pool/src/sleep/counters.rs index 05941becd1c..f2682028b96 100644 --- a/compiler/rustc_thread_pool/src/sleep/counters.rs +++ b/compiler/rustc_thread_pool/src/sleep/counters.rs @@ -89,9 +89,7 @@ const ONE_JEC: usize = 1 << JEC_SHIFT; impl AtomicCounters { #[inline] pub(super) fn new() -> AtomicCounters { - AtomicCounters { - value: AtomicUsize::new(0), - } + AtomicCounters { value: AtomicUsize::new(0) } } /// Load and return the current value of the various counters. @@ -230,9 +228,7 @@ impl Counters { fn increment_jobs_counter(self) -> Counters { // We can freely add to JEC because it occupies the most significant bits. // Thus it doesn't overflow into the other counters, just wraps itself. - Counters { - word: self.word.wrapping_add(ONE_JEC), - } + Counters { word: self.word.wrapping_add(ONE_JEC) } } #[inline] diff --git a/compiler/rustc_thread_pool/src/sleep/mod.rs b/compiler/rustc_thread_pool/src/sleep/mod.rs index 7d88ece2107..bee7c82c450 100644 --- a/compiler/rustc_thread_pool/src/sleep/mod.rs +++ b/compiler/rustc_thread_pool/src/sleep/mod.rs @@ -1,14 +1,16 @@ //! Code that decides when workers should go to sleep. See README.md //! for an overview. -use crate::latch::CoreLatch; -use crate::registry::WorkerThread; -use crate::DeadlockHandler; -use crossbeam_utils::CachePadded; use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex}; use std::thread; +use crossbeam_utils::CachePadded; + +use crate::DeadlockHandler; +use crate::latch::CoreLatch; +use crate::registry::WorkerThread; + mod counters; pub(crate) use self::counters::THREADS_MAX; use self::counters::{AtomicCounters, JobsEventCounter}; @@ -125,11 +127,7 @@ impl Sleep { pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { self.counters.add_inactive_thread(); - IdleState { - worker_index, - rounds: 0, - jobs_counter: JobsEventCounter::DUMMY, - } + IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY } } #[inline] @@ -165,9 +163,7 @@ impl Sleep { #[cold] fn announce_sleepy(&self) -> JobsEventCounter { - self.counters - .increment_jobs_event_counter_if(JobsEventCounter::is_active) - .jobs_counter() + self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter() } #[cold] @@ -258,7 +254,7 @@ impl Sleep { } /// Notify the given thread that it should wake up (if it is - /// sleeping). When this method is invoked, we typically know the + /// sleeping). When this method is invoked, we typically know the /// thread is asleep, though in rare cases it could have been /// awoken by (e.g.) new work having been posted. pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { @@ -307,9 +303,7 @@ impl Sleep { // Read the counters and -- if sleepy workers have announced themselves // -- announce that there is now work available. The final value of `counters` // with which we exit the loop thus corresponds to a state when - let counters = self - .counters - .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); + let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); let num_awake_but_idle = counters.awake_but_idle_threads(); let num_sleepers = counters.sleeping_threads(); diff --git a/compiler/rustc_thread_pool/src/spawn/mod.rs b/compiler/rustc_thread_pool/src/spawn/mod.rs index 034df30dcfb..f1679a98234 100644 --- a/compiler/rustc_thread_pool/src/spawn/mod.rs +++ b/compiler/rustc_thread_pool/src/spawn/mod.rs @@ -1,9 +1,10 @@ +use std::mem; +use std::sync::Arc; + use crate::job::*; use crate::registry::Registry; use crate::tlv::Tlv; use crate::unwind; -use std::mem; -use std::sync::Arc; /// Puts the task into the Rayon threadpool's job queue in the "static" /// or "global" scope. Just like a standard thread, this task is not @@ -28,9 +29,9 @@ use std::sync::Arc; /// other threads may steal tasks at any time. However, they are /// generally prioritized in a LIFO order on the thread from which /// they were spawned. Other threads always steal from the other end of -/// the deque, like FIFO order. The idea is that "recent" tasks are +/// the deque, like FIFO order. The idea is that "recent" tasks are /// most likely to be fresh in the local CPU's cache, while other -/// threads can steal older "stale" tasks. For an alternate approach, +/// threads can steal older "stale" tasks. For an alternate approach, /// consider [`spawn_fifo()`] instead. /// /// [`spawn_fifo()`]: fn.spawn_fifo.html @@ -39,7 +40,7 @@ use std::sync::Arc; /// /// If this closure should panic, the resulting panic will be /// propagated to the panic handler registered in the `ThreadPoolBuilder`, -/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more /// details. /// /// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler @@ -103,7 +104,7 @@ where } /// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not +/// "global" scope. Just like a standard thread, this task is not /// tied to the current stack frame, and hence it cannot hold any /// references other than those with `'static` lifetime. If you want /// to spawn a task that references stack data, use [the `scope_fifo()` @@ -124,7 +125,7 @@ where /// /// If this closure should panic, the resulting panic will be /// propagated to the panic handler registered in the `ThreadPoolBuilder`, -/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more /// details. /// /// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler @@ -152,7 +153,7 @@ where let job_ref = spawn_job(func, registry); // If we're in the pool, use our thread's private fifo for this thread to execute - // in a locally-FIFO order. Otherwise, just use the pool's global injector. + // in a locally-FIFO order. Otherwise, just use the pool's global injector. match registry.current_thread() { Some(worker) => worker.push_fifo(job_ref), None => registry.inject(job_ref), diff --git a/compiler/rustc_thread_pool/src/spawn/test.rs b/compiler/rustc_thread_pool/src/spawn/tests.rs index b7a0535aabf..8a70d2faf9c 100644 --- a/compiler/rustc_thread_pool/src/spawn/test.rs +++ b/compiler/rustc_thread_pool/src/spawn/tests.rs @@ -1,10 +1,9 @@ -use crate::scope; use std::any::Any; -use std::sync::mpsc::channel; use std::sync::Mutex; +use std::sync::mpsc::channel; use super::{spawn, spawn_fifo}; -use crate::ThreadPoolBuilder; +use crate::{ThreadPoolBuilder, scope}; #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] @@ -45,10 +44,7 @@ fn panic_fwd() { let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); - builder - .build() - .unwrap() - .spawn(move || panic!("Hello, world!")); + builder.build().unwrap().spawn(move || panic!("Hello, world!")); assert_eq!(1, rx.recv().unwrap()); } @@ -193,10 +189,7 @@ fn fifo_order() { fn lifo_fifo_order() { // LIFO on the outside, FIFO on the inside let vec = test_order!(spawn, spawn_fifo); - let expected: Vec<i32> = (0..10) - .rev() - .flat_map(|i| (0..10).map(move |j| i * 10 + j)) - .collect(); + let expected: Vec<i32> = (0..10).rev().flat_map(|i| (0..10).map(move |j| i * 10 + j)).collect(); assert_eq!(vec, expected); } @@ -205,9 +198,7 @@ fn lifo_fifo_order() { fn fifo_lifo_order() { // FIFO on the outside, LIFO on the inside let vec = test_order!(spawn_fifo, spawn); - let expected: Vec<i32> = (0..10) - .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) - .collect(); + let expected: Vec<i32> = (0..10).flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)).collect(); assert_eq!(vec, expected); } diff --git a/compiler/rustc_thread_pool/src/test.rs b/compiler/rustc_thread_pool/src/tests.rs index 25b8487f73b..3082f11a167 100644 --- a/compiler/rustc_thread_pool/src/test.rs +++ b/compiler/rustc_thread_pool/src/tests.rs @@ -1,9 +1,10 @@ #![cfg(test)] -use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Barrier}; +use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; + #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn worker_thread_index() { @@ -29,9 +30,7 @@ fn start_callback_called() { b.wait(); }; - let conf = ThreadPoolBuilder::new() - .num_threads(n_threads) - .start_handler(start_handler); + let conf = ThreadPoolBuilder::new().num_threads(n_threads).start_handler(start_handler); let _ = conf.build().unwrap(); // Wait for all the threads to have been scheduled to run. @@ -56,9 +55,7 @@ fn exit_callback_called() { b.wait(); }; - let conf = ThreadPoolBuilder::new() - .num_threads(n_threads) - .exit_handler(exit_handler); + let conf = ThreadPoolBuilder::new().num_threads(n_threads).exit_handler(exit_handler); { let _ = conf.build().unwrap(); // Drop the pool so it stops the running threads. diff --git a/compiler/rustc_thread_pool/src/thread_pool/mod.rs b/compiler/rustc_thread_pool/src/thread_pool/mod.rs index 65af6d7106e..ce8783cf0d6 100644 --- a/compiler/rustc_thread_pool/src/thread_pool/mod.rs +++ b/compiler/rustc_thread_pool/src/thread_pool/mod.rs @@ -3,18 +3,17 @@ //! //! [`ThreadPool`]: struct.ThreadPool.html -use crate::broadcast::{self, BroadcastContext}; -use crate::join; -use crate::registry::{Registry, ThreadSpawn, WorkerThread}; -use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; -use crate::spawn; -use crate::{scope, Scope}; -use crate::{scope_fifo, ScopeFifo}; -use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; use std::error::Error; use std::fmt; use std::sync::Arc; +use crate::broadcast::{self, BroadcastContext}; +use crate::registry::{Registry, ThreadSpawn, WorkerThread}; +use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; +use crate::{ + Scope, ScopeFifo, ThreadPoolBuildError, ThreadPoolBuilder, join, scope, scope_fifo, spawn, +}; + mod test; /// Represents a user created [thread-pool]. diff --git a/compiler/rustc_thread_pool/src/thread_pool/test.rs b/compiler/rustc_thread_pool/src/thread_pool/tests.rs index 88b36282d48..42c99565088 100644 --- a/compiler/rustc_thread_pool/src/thread_pool/test.rs +++ b/compiler/rustc_thread_pool/src/thread_pool/tests.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; -use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; +use crate::{Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder, join}; #[test] #[should_panic(expected = "Hello, world!")] @@ -296,9 +296,8 @@ fn nested_scopes() { } } - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); + let pools: Vec<_> = + (0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect(); let counter = AtomicUsize::new(0); nest(&pools, vec![], |scopes| { @@ -333,9 +332,8 @@ fn nested_fifo_scopes() { } } - let pools: Vec<_> = (0..10) - .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) - .collect(); + let pools: Vec<_> = + (0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect(); let counter = AtomicUsize::new(0); nest(&pools, vec![], |scopes| { diff --git a/compiler/rustc_thread_pool/src/tlv.rs b/compiler/rustc_thread_pool/src/tlv.rs index ce22f7aa0ce..b5f63479e2f 100644 --- a/compiler/rustc_thread_pool/src/tlv.rs +++ b/compiler/rustc_thread_pool/src/tlv.rs @@ -1,7 +1,8 @@ //! Allows access to the Rayon's thread local value //! which is preserved when moving jobs across threads -use std::{cell::Cell, ptr}; +use std::cell::Cell; +use std::ptr; thread_local!(pub static TLV: Cell<*const ()> = const { Cell::new(ptr::null()) }); diff --git a/compiler/rustc_thread_pool/src/worker_local.rs b/compiler/rustc_thread_pool/src/worker_local.rs index 85d51687c19..d108c91f9ee 100644 --- a/compiler/rustc_thread_pool/src/worker_local.rs +++ b/compiler/rustc_thread_pool/src/worker_local.rs @@ -1,8 +1,9 @@ -use crate::registry::{Registry, WorkerThread}; use std::fmt; use std::ops::Deref; use std::sync::Arc; +use crate::registry::{Registry, WorkerThread}; + #[repr(align(64))] #[derive(Debug)] struct CacheAligned<T>(T); @@ -27,9 +28,7 @@ impl<T> WorkerLocal<T> { pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> { let registry = Registry::current(); WorkerLocal { - locals: (0..registry.num_threads()) - .map(|i| CacheAligned(initial(i))) - .collect(), + locals: (0..registry.num_threads()).map(|i| CacheAligned(initial(i))).collect(), registry, } } @@ -62,9 +61,7 @@ impl<T> WorkerLocal<Vec<T>> { impl<T: fmt::Debug> fmt::Debug for WorkerLocal<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WorkerLocal") - .field("registry", &self.registry.id()) - .finish() + f.debug_struct("WorkerLocal").field("registry", &self.registry.id()).finish() } } diff --git a/compiler/rustc_thread_pool/tests/double_init_fail.rs b/compiler/rustc_thread_pool/tests/double_init_fail.rs index 15915304ddc..71ed425bb32 100644 --- a/compiler/rustc_thread_pool/tests/double_init_fail.rs +++ b/compiler/rustc_thread_pool/tests/double_init_fail.rs @@ -1,6 +1,7 @@ -use rayon_core::ThreadPoolBuilder; use std::error::Error; +use rayon_core::ThreadPoolBuilder; + #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn double_init_fail() { @@ -8,8 +9,5 @@ fn double_init_fail() { assert!(result1.is_ok()); let err = ThreadPoolBuilder::new().build_global().unwrap_err(); assert!(err.source().is_none()); - assert_eq!( - err.to_string(), - "The global thread pool has already been initialized.", - ); + assert_eq!(err.to_string(), "The global thread pool has already been initialized.",); } diff --git a/compiler/rustc_thread_pool/tests/init_zero_threads.rs b/compiler/rustc_thread_pool/tests/init_zero_threads.rs index 3c1ad251c7e..c1770e57f3c 100644 --- a/compiler/rustc_thread_pool/tests/init_zero_threads.rs +++ b/compiler/rustc_thread_pool/tests/init_zero_threads.rs @@ -3,8 +3,5 @@ use rayon_core::ThreadPoolBuilder; #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn init_zero_threads() { - ThreadPoolBuilder::new() - .num_threads(0) - .build_global() - .unwrap(); + ThreadPoolBuilder::new().num_threads(0).build_global().unwrap(); } diff --git a/compiler/rustc_thread_pool/tests/scoped_threadpool.rs b/compiler/rustc_thread_pool/tests/scoped_threadpool.rs index 932147179f5..8cc2c859c0c 100644 --- a/compiler/rustc_thread_pool/tests/scoped_threadpool.rs +++ b/compiler/rustc_thread_pool/tests/scoped_threadpool.rs @@ -10,9 +10,7 @@ scoped_tls::scoped_thread_local!(static LOCAL: Local); #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn missing_scoped_tls() { LOCAL.set(&Local(42), || { - let pool = ThreadPoolBuilder::new() - .build() - .expect("thread pool created"); + let pool = ThreadPoolBuilder::new().build().expect("thread pool created"); // `LOCAL` is not set in the pool. pool.install(|| { diff --git a/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs index a6494069212..c7a880de8bb 100644 --- a/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs +++ b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs @@ -1,10 +1,9 @@ -use rayon_core::ThreadPoolBuilder; - use std::env; -use std::process::{Command, ExitStatus, Stdio}; - #[cfg(target_os = "linux")] use std::os::unix::process::ExitStatusExt; +use std::process::{Command, ExitStatus, Stdio}; + +use rayon_core::ThreadPoolBuilder; fn force_stack_overflow(depth: u32) { let mut buffer = [0u8; 1024 * 1024]; @@ -18,13 +17,7 @@ fn force_stack_overflow(depth: u32) { #[cfg(unix)] fn disable_core() { unsafe { - libc::setrlimit( - libc::RLIMIT_CORE, - &libc::rlimit { - rlim_cur: 0, - rlim_max: 0, - }, - ); + libc::setrlimit(libc::RLIMIT_CORE, &libc::rlimit { rlim_cur: 0, rlim_max: 0 }); } } @@ -50,10 +43,7 @@ fn stack_overflow_crash() { #[cfg(any(unix, windows))] assert_eq!(status.code(), overflow_code()); #[cfg(target_os = "linux")] - assert!(matches!( - status.signal(), - Some(libc::SIGABRT | libc::SIGSEGV) - )); + assert!(matches!(status.signal(), Some(libc::SIGABRT | libc::SIGSEGV))); // Now run with a larger stack and verify correct operation. let status = run_ignored("run_with_large_stack"); @@ -86,10 +76,7 @@ fn run_with_large_stack() { } fn run_with_stack(stack_size_in_mb: usize) { - let pool = ThreadPoolBuilder::new() - .stack_size(stack_size_in_mb * 1024 * 1024) - .build() - .unwrap(); + let pool = ThreadPoolBuilder::new().stack_size(stack_size_in_mb * 1024 * 1024).build().unwrap(); pool.install(|| { #[cfg(unix)] disable_core(); |
