about summary refs log tree commit diff
path: root/compiler/rustc_thread_pool/src/registry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'compiler/rustc_thread_pool/src/registry.rs')
-rw-r--r--compiler/rustc_thread_pool/src/registry.rs97
1 files changed, 93 insertions, 4 deletions
diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs
index 03a01aa29d2..5e2f2e7f1b7 100644
--- a/compiler/rustc_thread_pool/src/registry.rs
+++ b/compiler/rustc_thread_pool/src/registry.rs
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Once};
 use std::{fmt, io, mem, ptr, thread};
 
 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
+use smallvec::SmallVec;
 
 use crate::job::{JobFifo, JobRef, StackJob};
 use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
@@ -796,14 +797,81 @@ impl WorkerThread {
     /// stealing tasks as necessary.
     #[inline]
     pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
+        unsafe { self.wait_or_steal_until(latch, false) };
+    }
+
+    #[inline]
+    pub(super) unsafe fn wait_for_jobs<L: AsCoreLatch + ?Sized, const BROADCAST_JOBS: bool>(
+        &self,
+        latch: &L,
+        mut all_jobs_started: impl FnMut() -> bool,
+        mut is_job: impl FnMut(&JobRef) -> bool,
+        mut execute_job: impl FnMut(JobRef) -> (),
+    ) {
+        let mut jobs = SmallVec::<[JobRef; 8]>::new();
+        let mut broadcast_jobs = SmallVec::<[JobRef; 8]>::new();
+
+        while !all_jobs_started() {
+            if let Some(job) = self.worker.pop() {
+                if is_job(&job) {
+                    execute_job(job);
+                } else {
+                    jobs.push(job);
+                }
+            } else {
+                if BROADCAST_JOBS {
+                    let broadcast_job = loop {
+                        match self.stealer.steal() {
+                            Steal::Success(job) => break Some(job),
+                            Steal::Empty => break None,
+                            Steal::Retry => continue,
+                        }
+                    };
+                    if let Some(job) = broadcast_job {
+                        if is_job(&job) {
+                            execute_job(job);
+                        } else {
+                            broadcast_jobs.push(job);
+                        }
+                    }
+                }
+                break;
+            }
+        }
+
+        // Restore the jobs that we weren't looking for.
+        for job in jobs {
+            self.worker.push(job);
+        }
+        if BROADCAST_JOBS {
+            let broadcasts = self.registry.broadcasts.lock().unwrap();
+            for job in broadcast_jobs {
+                broadcasts[self.index].push(job);
+            }
+        }
+
+        // Wait for the jobs to finish.
+        unsafe { self.wait_until(latch) };
+        debug_assert!(latch.as_core_latch().probe());
+    }
+
+    pub(super) unsafe fn wait_or_steal_until<L: AsCoreLatch + ?Sized>(
+        &self,
+        latch: &L,
+        steal: bool,
+    ) {
         let latch = latch.as_core_latch();
         if !latch.probe() {
-            unsafe { self.wait_until_cold(latch) };
+            if steal {
+                unsafe { self.wait_or_steal_until_cold(latch) };
+            } else {
+                unsafe { self.wait_until_cold(latch) };
+            }
         }
     }
 
     #[cold]
-    unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
+    unsafe fn wait_or_steal_until_cold(&self, latch: &CoreLatch) {
         // the code below should swallow all panics and hence never
         // unwind; but if something does wrong, we want to abort,
         // because otherwise other code in rayon may assume that the
@@ -827,7 +895,7 @@ impl WorkerThread {
                     // The job might have injected local work, so go back to the outer loop.
                     continue 'outer;
                 } else {
-                    self.registry.sleep.no_work_found(&mut idle_state, latch, &self)
+                    self.registry.sleep.no_work_found(&mut idle_state, latch, &self, true)
                 }
             }
 
@@ -840,13 +908,34 @@ impl WorkerThread {
         mem::forget(abort_guard); // successful execution, do not abort
     }
 
+    #[cold]
+    unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
+        // the code below should swallow all panics and hence never
+        // unwind; but if something does wrong, we want to abort,
+        // because otherwise other code in rayon may assume that the
+        // latch has been signaled, and that can lead to random memory
+        // accesses, which would be *very bad*
+        let abort_guard = unwind::AbortIfPanic;
+
+        let mut idle_state = self.registry.sleep.start_looking(self.index);
+        while !latch.probe() {
+            self.registry.sleep.no_work_found(&mut idle_state, latch, &self, false);
+        }
+
+        // If we were sleepy, we are not anymore. We "found work" --
+        // whatever the surrounding thread was doing before it had to wait.
+        self.registry.sleep.work_found();
+
+        mem::forget(abort_guard); // successful execution, do not abort
+    }
+
     unsafe fn wait_until_out_of_work(&self) {
         debug_assert_eq!(self as *const _, WorkerThread::current());
         let registry = &*self.registry;
         let index = self.index;
 
         registry.acquire_thread();
-        unsafe { self.wait_until(&registry.thread_infos[index].terminate) };
+        unsafe { self.wait_or_steal_until(&registry.thread_infos[index].terminate, true) };
 
         // Should not be any work left in our queue.
         debug_assert!(self.take_local_job().is_none());