diff options
Diffstat (limited to 'compiler/rustc_thread_pool/src/registry.rs')
| -rw-r--r-- | compiler/rustc_thread_pool/src/registry.rs | 97 |
1 files changed, 93 insertions, 4 deletions
diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 03a01aa29d2..5e2f2e7f1b7 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Once}; use std::{fmt, io, mem, ptr, thread}; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use smallvec::SmallVec; use crate::job::{JobFifo, JobRef, StackJob}; use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; @@ -796,14 +797,81 @@ impl WorkerThread { /// stealing tasks as necessary. #[inline] pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) { + unsafe { self.wait_or_steal_until(latch, false) }; + } + + #[inline] + pub(super) unsafe fn wait_for_jobs<L: AsCoreLatch + ?Sized, const BROADCAST_JOBS: bool>( + &self, + latch: &L, + mut all_jobs_started: impl FnMut() -> bool, + mut is_job: impl FnMut(&JobRef) -> bool, + mut execute_job: impl FnMut(JobRef) -> (), + ) { + let mut jobs = SmallVec::<[JobRef; 8]>::new(); + let mut broadcast_jobs = SmallVec::<[JobRef; 8]>::new(); + + while !all_jobs_started() { + if let Some(job) = self.worker.pop() { + if is_job(&job) { + execute_job(job); + } else { + jobs.push(job); + } + } else { + if BROADCAST_JOBS { + let broadcast_job = loop { + match self.stealer.steal() { + Steal::Success(job) => break Some(job), + Steal::Empty => break None, + Steal::Retry => continue, + } + }; + if let Some(job) = broadcast_job { + if is_job(&job) { + execute_job(job); + } else { + broadcast_jobs.push(job); + } + } + } + break; + } + } + + // Restore the jobs that we weren't looking for. + for job in jobs { + self.worker.push(job); + } + if BROADCAST_JOBS { + let broadcasts = self.registry.broadcasts.lock().unwrap(); + for job in broadcast_jobs { + broadcasts[self.index].push(job); + } + } + + // Wait for the jobs to finish. + unsafe { self.wait_until(latch) }; + debug_assert!(latch.as_core_latch().probe()); + } + + pub(super) unsafe fn wait_or_steal_until<L: AsCoreLatch + ?Sized>( + &self, + latch: &L, + steal: bool, + ) { let latch = latch.as_core_latch(); if !latch.probe() { - unsafe { self.wait_until_cold(latch) }; + if steal { + unsafe { self.wait_or_steal_until_cold(latch) }; + } else { + unsafe { self.wait_until_cold(latch) }; + } } } #[cold] - unsafe fn wait_until_cold(&self, latch: &CoreLatch) { + unsafe fn wait_or_steal_until_cold(&self, latch: &CoreLatch) { // the code below should swallow all panics and hence never // unwind; but if something does wrong, we want to abort, // because otherwise other code in rayon may assume that the @@ -827,7 +895,7 @@ impl WorkerThread { // The job might have injected local work, so go back to the outer loop. continue 'outer; } else { - self.registry.sleep.no_work_found(&mut idle_state, latch, &self) + self.registry.sleep.no_work_found(&mut idle_state, latch, &self, true) } } @@ -840,13 +908,34 @@ impl WorkerThread { mem::forget(abort_guard); // successful execution, do not abort } + #[cold] + unsafe fn wait_until_cold(&self, latch: &CoreLatch) { + // the code below should swallow all panics and hence never + // unwind; but if something does wrong, we want to abort, + // because otherwise other code in rayon may assume that the + // latch has been signaled, and that can lead to random memory + // accesses, which would be *very bad* + let abort_guard = unwind::AbortIfPanic; + + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + self.registry.sleep.no_work_found(&mut idle_state, latch, &self, false); + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + + mem::forget(abort_guard); // successful execution, do not abort + } + unsafe fn wait_until_out_of_work(&self) { debug_assert_eq!(self as *const _, WorkerThread::current()); let registry = &*self.registry; let index = self.index; registry.acquire_thread(); - unsafe { self.wait_until(®istry.thread_infos[index].terminate) }; + unsafe { self.wait_or_steal_until(®istry.thread_infos[index].terminate, true) }; // Should not be any work left in our queue. debug_assert!(self.take_local_job().is_none()); |
