about summary refs log tree commit diff
diff options
context:
space:
mode:
authorywxt <ywxtcwh@gmail.com>2025-06-25 19:38:40 +0800
committerywxt <ywxtcwh@gmail.com>2025-06-28 17:58:20 +0800
commit0ceac216c9ea96b4b63cba77ae1fabfc8e0320a1 (patch)
treef5fbd7d559d9c36276958d2a7673cf57991fc666
parentd41e12f1f4e4884c356f319b881921aa37040de5 (diff)
downloadrust-0ceac216c9ea96b4b63cba77ae1fabfc8e0320a1.tar.gz
rust-0ceac216c9ea96b4b63cba77ae1fabfc8e0320a1.zip
Only work-steal in the main loop for rustc_thread_pool
Co-authored-by: Zoxc <zoxc32@gmail.com>
-rw-r--r--Cargo.lock2
-rw-r--r--compiler/rustc_thread_pool/Cargo.toml8
-rw-r--r--compiler/rustc_thread_pool/src/broadcast/mod.rs24
-rw-r--r--compiler/rustc_thread_pool/src/broadcast/tests.rs2
-rw-r--r--compiler/rustc_thread_pool/src/job.rs40
-rw-r--r--compiler/rustc_thread_pool/src/join/mod.rs80
-rw-r--r--compiler/rustc_thread_pool/src/join/tests.rs1
-rw-r--r--compiler/rustc_thread_pool/src/latch.rs17
-rw-r--r--compiler/rustc_thread_pool/src/registry.rs97
-rw-r--r--compiler/rustc_thread_pool/src/scope/mod.rs76
-rw-r--r--compiler/rustc_thread_pool/src/scope/tests.rs17
-rw-r--r--compiler/rustc_thread_pool/src/sleep/mod.rs13
-rw-r--r--compiler/rustc_thread_pool/src/spawn/mod.rs2
-rw-r--r--compiler/rustc_thread_pool/src/spawn/tests.rs6
-rw-r--r--compiler/rustc_thread_pool/src/thread_pool/tests.rs6
-rw-r--r--compiler/rustc_thread_pool/tests/stack_overflow_crash.rs2
16 files changed, 281 insertions, 112 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e1cf17e2c01..302e88368c9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4538,10 +4538,12 @@ version = "0.0.0"
 dependencies = [
  "crossbeam-deque",
  "crossbeam-utils",
+ "indexmap",
  "libc",
  "rand 0.9.1",
  "rand_xorshift",
  "scoped-tls",
+ "smallvec",
 ]
 
 [[package]]
diff --git a/compiler/rustc_thread_pool/Cargo.toml b/compiler/rustc_thread_pool/Cargo.toml
index d0bd065c457..c73c7961cbc 100644
--- a/compiler/rustc_thread_pool/Cargo.toml
+++ b/compiler/rustc_thread_pool/Cargo.toml
@@ -1,8 +1,10 @@
 [package]
 name = "rustc_thread_pool"
 version = "0.0.0"
-authors = ["Niko Matsakis <niko@alum.mit.edu>",
-           "Josh Stone <cuviper@gmail.com>"]
+authors = [
+    "Niko Matsakis <niko@alum.mit.edu>",
+    "Josh Stone <cuviper@gmail.com>",
+]
 description = "Core APIs for Rayon - fork for rustc"
 license = "MIT OR Apache-2.0"
 rust-version = "1.63"
@@ -14,6 +16,8 @@ categories = ["concurrency"]
 [dependencies]
 crossbeam-deque = "0.8"
 crossbeam-utils = "0.8"
+indexmap = "2.4.0"
+smallvec = "1.8.1"
 
 [dev-dependencies]
 rand = "0.9"
diff --git a/compiler/rustc_thread_pool/src/broadcast/mod.rs b/compiler/rustc_thread_pool/src/broadcast/mod.rs
index 9545c4b15d8..1707ebb5988 100644
--- a/compiler/rustc_thread_pool/src/broadcast/mod.rs
+++ b/compiler/rustc_thread_pool/src/broadcast/mod.rs
@@ -1,6 +1,7 @@
 use std::fmt;
 use std::marker::PhantomData;
 use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 use crate::job::{ArcJob, StackJob};
 use crate::latch::{CountLatch, LatchRef};
@@ -97,13 +98,22 @@ where
     OP: Fn(BroadcastContext<'_>) -> R + Sync,
     R: Send,
 {
+    let current_thread = WorkerThread::current();
+    let current_thread_addr = current_thread.expose_provenance();
+    let started = &AtomicBool::new(false);
     let f = move |injected: bool| {
         debug_assert!(injected);
+
+        // Mark as started if we are the thread that initiated that broadcast.
+        if current_thread_addr == WorkerThread::current().expose_provenance() {
+            started.store(true, Ordering::Relaxed);
+        }
+
         BroadcastContext::with(&op)
     };
 
     let n_threads = registry.num_threads();
-    let current_thread = unsafe { WorkerThread::current().as_ref() };
+    let current_thread = unsafe { current_thread.as_ref() };
     let tlv = crate::tlv::get();
     let latch = CountLatch::with_count(n_threads, current_thread);
     let jobs: Vec<_> =
@@ -112,8 +122,16 @@ where
 
     registry.inject_broadcast(job_refs);
 
+    let current_thread_job_id = current_thread
+        .and_then(|worker| (registry.id() == worker.registry.id()).then(|| worker))
+        .map(|worker| unsafe { jobs[worker.index()].as_job_ref() }.id());
+
     // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
-    latch.wait(current_thread);
+    latch.wait(
+        current_thread,
+        || started.load(Ordering::Relaxed),
+        |job| Some(job.id()) == current_thread_job_id,
+    );
     jobs.into_iter().map(|job| unsafe { job.into_result() }).collect()
 }
 
@@ -129,7 +147,7 @@ where
 {
     let job = ArcJob::new({
         let registry = Arc::clone(registry);
-        move || {
+        move |_| {
             registry.catch_unwind(|| BroadcastContext::with(&op));
             registry.terminate(); // (*) permit registry to terminate now
         }
diff --git a/compiler/rustc_thread_pool/src/broadcast/tests.rs b/compiler/rustc_thread_pool/src/broadcast/tests.rs
index fac8b8ad466..201cb932192 100644
--- a/compiler/rustc_thread_pool/src/broadcast/tests.rs
+++ b/compiler/rustc_thread_pool/src/broadcast/tests.rs
@@ -65,6 +65,7 @@ fn spawn_broadcast_self() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn broadcast_mutual() {
     let count = AtomicUsize::new(0);
@@ -99,6 +100,7 @@ fn spawn_broadcast_mutual() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn broadcast_mutual_sleepy() {
     let count = AtomicUsize::new(0);
diff --git a/compiler/rustc_thread_pool/src/job.rs b/compiler/rustc_thread_pool/src/job.rs
index e6e84ac2320..60a64fe59c9 100644
--- a/compiler/rustc_thread_pool/src/job.rs
+++ b/compiler/rustc_thread_pool/src/job.rs
@@ -27,6 +27,11 @@ pub(super) trait Job {
     unsafe fn execute(this: *const ());
 }
 
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+pub(super) struct JobRefId {
+    pointer: usize,
+}
+
 /// Effectively a Job trait object. Each JobRef **must** be executed
 /// exactly once, or else data may leak.
 ///
@@ -52,11 +57,9 @@ impl JobRef {
         JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
     }
 
-    /// Returns an opaque handle that can be saved and compared,
-    /// without making `JobRef` itself `Copy + Eq`.
     #[inline]
-    pub(super) fn id(&self) -> impl Eq {
-        (self.pointer, self.execute_fn)
+    pub(super) fn id(&self) -> JobRefId {
+        JobRefId { pointer: self.pointer.expose_provenance() }
     }
 
     #[inline]
@@ -100,8 +103,15 @@ where
         unsafe { JobRef::new(self) }
     }
 
-    pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
-        self.func.into_inner().unwrap()(stolen)
+    pub(super) unsafe fn run_inline(&self, stolen: bool) {
+        unsafe {
+            let func = (*self.func.get()).take().unwrap();
+            *(self.result.get()) = match unwind::halt_unwinding(|| func(stolen)) {
+                Ok(x) => JobResult::Ok(x),
+                Err(x) => JobResult::Panic(x),
+            };
+            Latch::set(&self.latch);
+        }
     }
 
     pub(super) unsafe fn into_result(self) -> R {
@@ -138,7 +148,7 @@ where
 /// (Probably `StackJob` should be refactored in a similar fashion.)
 pub(super) struct HeapJob<BODY>
 where
-    BODY: FnOnce() + Send,
+    BODY: FnOnce(JobRefId) + Send,
 {
     job: BODY,
     tlv: Tlv,
@@ -146,7 +156,7 @@ where
 
 impl<BODY> HeapJob<BODY>
 where
-    BODY: FnOnce() + Send,
+    BODY: FnOnce(JobRefId) + Send,
 {
     pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
         Box::new(HeapJob { job, tlv })
@@ -170,12 +180,13 @@ where
 
 impl<BODY> Job for HeapJob<BODY>
 where
-    BODY: FnOnce() + Send,
+    BODY: FnOnce(JobRefId) + Send,
 {
     unsafe fn execute(this: *const ()) {
+        let pointer = this.expose_provenance();
         let this = unsafe { Box::from_raw(this as *mut Self) };
         tlv::set(this.tlv);
-        (this.job)();
+        (this.job)(JobRefId { pointer });
     }
 }
 
@@ -183,14 +194,14 @@ where
 /// be turned into multiple `JobRef`s and called multiple times.
 pub(super) struct ArcJob<BODY>
 where
-    BODY: Fn() + Send + Sync,
+    BODY: Fn(JobRefId) + Send + Sync,
 {
     job: BODY,
 }
 
 impl<BODY> ArcJob<BODY>
 where
-    BODY: Fn() + Send + Sync,
+    BODY: Fn(JobRefId) + Send + Sync,
 {
     pub(super) fn new(job: BODY) -> Arc<Self> {
         Arc::new(ArcJob { job })
@@ -214,11 +225,12 @@ where
 
 impl<BODY> Job for ArcJob<BODY>
 where
-    BODY: Fn() + Send + Sync,
+    BODY: Fn(JobRefId) + Send + Sync,
 {
     unsafe fn execute(this: *const ()) {
+        let pointer = this.expose_provenance();
         let this = unsafe { Arc::from_raw(this as *mut Self) };
-        (this.job)();
+        (this.job)(JobRefId { pointer });
     }
 }
 
diff --git a/compiler/rustc_thread_pool/src/join/mod.rs b/compiler/rustc_thread_pool/src/join/mod.rs
index f285362c19b..08c4c4e96ab 100644
--- a/compiler/rustc_thread_pool/src/join/mod.rs
+++ b/compiler/rustc_thread_pool/src/join/mod.rs
@@ -1,10 +1,8 @@
-use std::any::Any;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 use crate::job::StackJob;
 use crate::latch::SpinLatch;
-use crate::registry::{self, WorkerThread};
-use crate::tlv::{self, Tlv};
-use crate::{FnContext, unwind};
+use crate::{FnContext, registry, tlv, unwind};
 
 #[cfg(test)]
 mod tests;
@@ -134,68 +132,38 @@ where
         // Create virtual wrapper for task b; this all has to be
         // done here so that the stack frame can keep it all live
         // long enough.
-        let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread));
+        let job_b_started = AtomicBool::new(false);
+        let job_b = StackJob::new(
+            tlv,
+            |migrated| {
+                job_b_started.store(true, Ordering::Relaxed);
+                call_b(oper_b)(migrated)
+            },
+            SpinLatch::new(worker_thread),
+        );
         let job_b_ref = job_b.as_job_ref();
         let job_b_id = job_b_ref.id();
         worker_thread.push(job_b_ref);
 
         // Execute task a; hopefully b gets stolen in the meantime.
         let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
-        let result_a = match status_a {
-            Ok(v) => v,
-            Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv),
-        };
-
-        // Now that task A has finished, try to pop job B from the
-        // local stack. It may already have been popped by job A; it
-        // may also have been stolen. There may also be some tasks
-        // pushed on top of it in the stack, and we will have to pop
-        // those off to get to it.
-        while !job_b.latch.probe() {
-            if let Some(job) = worker_thread.take_local_job() {
-                if job_b_id == job.id() {
-                    // Found it! Let's run it.
-                    //
-                    // Note that this could panic, but it's ok if we unwind here.
-
-                    // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
-                    tlv::set(tlv);
-
-                    let result_b = job_b.run_inline(injected);
-                    return (result_a, result_b);
-                } else {
-                    worker_thread.execute(job);
-                }
-            } else {
-                // Local deque is empty. Time to steal from other
-                // threads.
-                worker_thread.wait_until(&job_b.latch);
-                debug_assert!(job_b.latch.probe());
-                break;
-            }
-        }
+        worker_thread.wait_for_jobs::<_, false>(
+            &job_b.latch,
+            || job_b_started.load(Ordering::Relaxed),
+            |job| job.id() == job_b_id,
+            |job| {
+                debug_assert_eq!(job.id(), job_b_id);
+                job_b.run_inline(injected);
+            },
+        );
 
         // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
         tlv::set(tlv);
 
+        let result_a = match status_a {
+            Ok(v) => v,
+            Err(err) => unwind::resume_unwinding(err),
+        };
         (result_a, job_b.into_result())
     })
 }
-
-/// If job A panics, we still cannot return until we are sure that job
-/// B is complete. This is because it may contain references into the
-/// enclosing stack frame(s).
-#[cold] // cold path
-unsafe fn join_recover_from_panic(
-    worker_thread: &WorkerThread,
-    job_b_latch: &SpinLatch<'_>,
-    err: Box<dyn Any + Send>,
-    tlv: Tlv,
-) -> ! {
-    unsafe { worker_thread.wait_until(job_b_latch) };
-
-    // Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
-    tlv::set(tlv);
-
-    unwind::resume_unwinding(err)
-}
diff --git a/compiler/rustc_thread_pool/src/join/tests.rs b/compiler/rustc_thread_pool/src/join/tests.rs
index 9df99072c3a..ec196632f75 100644
--- a/compiler/rustc_thread_pool/src/join/tests.rs
+++ b/compiler/rustc_thread_pool/src/join/tests.rs
@@ -97,6 +97,7 @@ fn join_context_both() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn join_context_neither() {
     // If we're already in a 1-thread pool, neither job should be stolen.
diff --git a/compiler/rustc_thread_pool/src/latch.rs b/compiler/rustc_thread_pool/src/latch.rs
index 49ba62d3bea..18d654d9f78 100644
--- a/compiler/rustc_thread_pool/src/latch.rs
+++ b/compiler/rustc_thread_pool/src/latch.rs
@@ -3,6 +3,7 @@ use std::ops::Deref;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Condvar, Mutex};
 
+use crate::job::JobRef;
 use crate::registry::{Registry, WorkerThread};
 
 /// We define various kinds of latches, which are all a primitive signaling
@@ -166,11 +167,6 @@ impl<'r> SpinLatch<'r> {
     pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
         SpinLatch { cross: true, ..SpinLatch::new(thread) }
     }
-
-    #[inline]
-    pub(super) fn probe(&self) -> bool {
-        self.core_latch.probe()
-    }
 }
 
 impl<'r> AsCoreLatch for SpinLatch<'r> {
@@ -368,13 +364,20 @@ impl CountLatch {
         debug_assert!(old_counter != 0);
     }
 
-    pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
+    pub(super) fn wait(
+        &self,
+        owner: Option<&WorkerThread>,
+        all_jobs_started: impl FnMut() -> bool,
+        is_job: impl FnMut(&JobRef) -> bool,
+    ) {
         match &self.kind {
             CountLatchKind::Stealing { latch, registry, worker_index } => unsafe {
                 let owner = owner.expect("owner thread");
                 debug_assert_eq!(registry.id(), owner.registry().id());
                 debug_assert_eq!(*worker_index, owner.index());
-                owner.wait_until(latch);
+                owner.wait_for_jobs::<_, true>(latch, all_jobs_started, is_job, |job| {
+                    owner.execute(job);
+                });
             },
             CountLatchKind::Blocking { latch } => latch.wait(),
         }
diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs
index 03a01aa29d2..5e2f2e7f1b7 100644
--- a/compiler/rustc_thread_pool/src/registry.rs
+++ b/compiler/rustc_thread_pool/src/registry.rs
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Once};
 use std::{fmt, io, mem, ptr, thread};
 
 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
+use smallvec::SmallVec;
 
 use crate::job::{JobFifo, JobRef, StackJob};
 use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
@@ -796,14 +797,81 @@ impl WorkerThread {
     /// stealing tasks as necessary.
     #[inline]
     pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
+        unsafe { self.wait_or_steal_until(latch, false) };
+    }
+
+    #[inline]
+    pub(super) unsafe fn wait_for_jobs<L: AsCoreLatch + ?Sized, const BROADCAST_JOBS: bool>(
+        &self,
+        latch: &L,
+        mut all_jobs_started: impl FnMut() -> bool,
+        mut is_job: impl FnMut(&JobRef) -> bool,
+        mut execute_job: impl FnMut(JobRef) -> (),
+    ) {
+        let mut jobs = SmallVec::<[JobRef; 8]>::new();
+        let mut broadcast_jobs = SmallVec::<[JobRef; 8]>::new();
+
+        while !all_jobs_started() {
+            if let Some(job) = self.worker.pop() {
+                if is_job(&job) {
+                    execute_job(job);
+                } else {
+                    jobs.push(job);
+                }
+            } else {
+                if BROADCAST_JOBS {
+                    let broadcast_job = loop {
+                        match self.stealer.steal() {
+                            Steal::Success(job) => break Some(job),
+                            Steal::Empty => break None,
+                            Steal::Retry => continue,
+                        }
+                    };
+                    if let Some(job) = broadcast_job {
+                        if is_job(&job) {
+                            execute_job(job);
+                        } else {
+                            broadcast_jobs.push(job);
+                        }
+                    }
+                }
+                break;
+            }
+        }
+
+        // Restore the jobs that we weren't looking for.
+        for job in jobs {
+            self.worker.push(job);
+        }
+        if BROADCAST_JOBS {
+            let broadcasts = self.registry.broadcasts.lock().unwrap();
+            for job in broadcast_jobs {
+                broadcasts[self.index].push(job);
+            }
+        }
+
+        // Wait for the jobs to finish.
+        unsafe { self.wait_until(latch) };
+        debug_assert!(latch.as_core_latch().probe());
+    }
+
+    pub(super) unsafe fn wait_or_steal_until<L: AsCoreLatch + ?Sized>(
+        &self,
+        latch: &L,
+        steal: bool,
+    ) {
         let latch = latch.as_core_latch();
         if !latch.probe() {
-            unsafe { self.wait_until_cold(latch) };
+            if steal {
+                unsafe { self.wait_or_steal_until_cold(latch) };
+            } else {
+                unsafe { self.wait_until_cold(latch) };
+            }
         }
     }
 
     #[cold]
-    unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
+    unsafe fn wait_or_steal_until_cold(&self, latch: &CoreLatch) {
         // the code below should swallow all panics and hence never
         // unwind; but if something does wrong, we want to abort,
         // because otherwise other code in rayon may assume that the
@@ -827,7 +895,7 @@ impl WorkerThread {
                     // The job might have injected local work, so go back to the outer loop.
                     continue 'outer;
                 } else {
-                    self.registry.sleep.no_work_found(&mut idle_state, latch, &self)
+                    self.registry.sleep.no_work_found(&mut idle_state, latch, &self, true)
                 }
             }
 
@@ -840,13 +908,34 @@ impl WorkerThread {
         mem::forget(abort_guard); // successful execution, do not abort
     }
 
+    #[cold]
+    unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
+        // the code below should swallow all panics and hence never
+        // unwind; but if something does wrong, we want to abort,
+        // because otherwise other code in rayon may assume that the
+        // latch has been signaled, and that can lead to random memory
+        // accesses, which would be *very bad*
+        let abort_guard = unwind::AbortIfPanic;
+
+        let mut idle_state = self.registry.sleep.start_looking(self.index);
+        while !latch.probe() {
+            self.registry.sleep.no_work_found(&mut idle_state, latch, &self, false);
+        }
+
+        // If we were sleepy, we are not anymore. We "found work" --
+        // whatever the surrounding thread was doing before it had to wait.
+        self.registry.sleep.work_found();
+
+        mem::forget(abort_guard); // successful execution, do not abort
+    }
+
     unsafe fn wait_until_out_of_work(&self) {
         debug_assert_eq!(self as *const _, WorkerThread::current());
         let registry = &*self.registry;
         let index = self.index;
 
         registry.acquire_thread();
-        unsafe { self.wait_until(&registry.thread_infos[index].terminate) };
+        unsafe { self.wait_or_steal_until(&registry.thread_infos[index].terminate, true) };
 
         // Should not be any work left in our queue.
         debug_assert!(self.take_local_job().is_none());
diff --git a/compiler/rustc_thread_pool/src/scope/mod.rs b/compiler/rustc_thread_pool/src/scope/mod.rs
index 55e58b3509d..b6601d0cbcc 100644
--- a/compiler/rustc_thread_pool/src/scope/mod.rs
+++ b/compiler/rustc_thread_pool/src/scope/mod.rs
@@ -8,12 +8,14 @@
 use std::any::Any;
 use std::marker::PhantomData;
 use std::mem::ManuallyDrop;
-use std::sync::Arc;
 use std::sync::atomic::{AtomicPtr, Ordering};
+use std::sync::{Arc, Mutex};
 use std::{fmt, ptr};
 
+use indexmap::IndexSet;
+
 use crate::broadcast::BroadcastContext;
-use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
+use crate::job::{ArcJob, HeapJob, JobFifo, JobRef, JobRefId};
 use crate::latch::{CountLatch, Latch};
 use crate::registry::{Registry, WorkerThread, global_registry, in_worker};
 use crate::tlv::{self, Tlv};
@@ -52,6 +54,12 @@ struct ScopeBase<'scope> {
     /// latch to track job counts
     job_completed_latch: CountLatch,
 
+    /// Jobs that have been spawned, but not yet started.
+    pending_jobs: Mutex<IndexSet<JobRefId>>,
+
+    /// The worker which will wait on scope completion, if any.
+    worker: Option<usize>,
+
     /// You can think of a scope as containing a list of closures to execute,
     /// all of which outlive `'scope`. They're not actually required to be
     /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
@@ -525,13 +533,19 @@ impl<'scope> Scope<'scope> {
         BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
     {
         let scope_ptr = ScopePtr(self);
-        let job = HeapJob::new(self.base.tlv, move || unsafe {
+        let job = HeapJob::new(self.base.tlv, move |id| unsafe {
             // SAFETY: this job will execute before the scope ends.
             let scope = scope_ptr.as_ref();
+
+            // Mark this job is started.
+            scope.base.pending_jobs.lock().unwrap().swap_remove_full(&id);
+
             ScopeBase::execute_job(&scope.base, move || body(scope))
         });
         let job_ref = self.base.heap_job_ref(job);
 
+        // Mark this job as pending.
+        self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
         // Since `Scope` implements `Sync`, we can't be sure that we're still in a
         // thread of this pool, so we can't just push to the local worker thread.
         // Also, this might be an in-place scope.
@@ -547,10 +561,17 @@ impl<'scope> Scope<'scope> {
         BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
     {
         let scope_ptr = ScopePtr(self);
-        let job = ArcJob::new(move || unsafe {
+        let job = ArcJob::new(move |id| unsafe {
             // SAFETY: this job will execute before the scope ends.
             let scope = scope_ptr.as_ref();
             let body = &body;
+
+            let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
+            if current_index == scope.base.worker {
+                // Mark this job as started on the scope's worker thread.
+                scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
+            }
+
             let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
             ScopeBase::execute_job(&scope.base, func)
         });
@@ -585,23 +606,24 @@ impl<'scope> ScopeFifo<'scope> {
         BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
     {
         let scope_ptr = ScopePtr(self);
-        let job = HeapJob::new(self.base.tlv, move || unsafe {
+        let job = HeapJob::new(self.base.tlv, move |id| unsafe {
             // SAFETY: this job will execute before the scope ends.
             let scope = scope_ptr.as_ref();
+
+            // Mark this job is started.
+            scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
+
             ScopeBase::execute_job(&scope.base, move || body(scope))
         });
         let job_ref = self.base.heap_job_ref(job);
 
-        // If we're in the pool, use our scope's private fifo for this thread to execute
-        // in a locally-FIFO order. Otherwise, just use the pool's global injector.
-        match self.base.registry.current_thread() {
-            Some(worker) => {
-                let fifo = &self.fifos[worker.index()];
-                // SAFETY: this job will execute before the scope ends.
-                unsafe { worker.push(fifo.push(job_ref)) };
-            }
-            None => self.base.registry.inject(job_ref),
-        }
+        // Mark this job as pending.
+        self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
+
+        // Since `ScopeFifo` implements `Sync`, we can't be sure that we're still in a
+        // thread of this pool, so we can't just push to the local worker thread.
+        // Also, this might be an in-place scope.
+        self.base.registry.inject_or_push(job_ref);
     }
 
     /// Spawns a job into every thread of the fork-join scope `self`. This job will
@@ -613,9 +635,15 @@ impl<'scope> ScopeFifo<'scope> {
         BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
     {
         let scope_ptr = ScopePtr(self);
-        let job = ArcJob::new(move || unsafe {
+        let job = ArcJob::new(move |id| unsafe {
             // SAFETY: this job will execute before the scope ends.
             let scope = scope_ptr.as_ref();
+
+            let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
+            if current_index == scope.base.worker {
+                // Mark this job as started on the scope's worker thread.
+                scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
+            }
             let body = &body;
             let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
             ScopeBase::execute_job(&scope.base, func)
@@ -636,6 +664,8 @@ impl<'scope> ScopeBase<'scope> {
             registry: Arc::clone(registry),
             panic: AtomicPtr::new(ptr::null_mut()),
             job_completed_latch: CountLatch::new(owner),
+            pending_jobs: Mutex::new(IndexSet::new()),
+            worker: owner.map(|w| w.index()),
             marker: PhantomData,
             tlv: tlv::get(),
         }
@@ -643,7 +673,7 @@ impl<'scope> ScopeBase<'scope> {
 
     fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
     where
-        FUNC: FnOnce() + Send + 'scope,
+        FUNC: FnOnce(JobRefId) + Send + 'scope,
     {
         unsafe {
             self.job_completed_latch.increment();
@@ -653,8 +683,12 @@ impl<'scope> ScopeBase<'scope> {
 
     fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
     where
-        FUNC: Fn() + Send + Sync + 'scope,
+        FUNC: Fn(JobRefId) + Send + Sync + 'scope,
     {
+        if self.worker.is_some() {
+            let id = unsafe { ArcJob::as_job_ref(&job).id() };
+            self.pending_jobs.lock().unwrap().insert(id);
+        }
         let n_threads = self.registry.num_threads();
         let job_refs = (0..n_threads).map(|_| unsafe {
             self.job_completed_latch.increment();
@@ -671,7 +705,11 @@ impl<'scope> ScopeBase<'scope> {
         FUNC: FnOnce() -> R,
     {
         let result = unsafe { Self::execute_job_closure(self, func) };
-        self.job_completed_latch.wait(owner);
+        self.job_completed_latch.wait(
+            owner,
+            || self.pending_jobs.lock().unwrap().is_empty(),
+            |job| self.pending_jobs.lock().unwrap().contains(&job.id()),
+        );
 
         // Restore the TLV if we ran some jobs while waiting
         tlv::set(self.tlv);
diff --git a/compiler/rustc_thread_pool/src/scope/tests.rs b/compiler/rustc_thread_pool/src/scope/tests.rs
index 2df3bc67e29..049548f4a18 100644
--- a/compiler/rustc_thread_pool/src/scope/tests.rs
+++ b/compiler/rustc_thread_pool/src/scope/tests.rs
@@ -290,6 +290,7 @@ macro_rules! test_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn lifo_order() {
     // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
@@ -299,6 +300,7 @@ fn lifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn fifo_order() {
     // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
@@ -334,6 +336,7 @@ macro_rules! test_nested_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_lifo_order() {
     // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
@@ -343,6 +346,7 @@ fn nested_lifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_fifo_order() {
     // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
@@ -352,6 +356,7 @@ fn nested_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_lifo_fifo_order() {
     // LIFO on the outside, FIFO on the inside
@@ -361,6 +366,7 @@ fn nested_lifo_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_fifo_lifo_order() {
     // FIFO on the outside, LIFO on the inside
@@ -402,6 +408,7 @@ macro_rules! test_mixed_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_lifo_order() {
     // NB: the end of the inner scope makes us execute some of the outer scope
@@ -412,6 +419,7 @@ fn mixed_lifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_fifo_order() {
     let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
@@ -420,6 +428,7 @@ fn mixed_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_lifo_fifo_order() {
     // NB: the end of the inner scope makes us execute some of the outer scope
@@ -430,6 +439,7 @@ fn mixed_lifo_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_fifo_lifo_order() {
     let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
@@ -519,8 +529,9 @@ fn mixed_lifetime_scope_fifo() {
 
 #[test]
 fn scope_spawn_broadcast() {
+    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
     let sum = AtomicUsize::new(0);
-    let n = scope(|s| {
+    let n = pool.scope(|s| {
         s.spawn_broadcast(|_, ctx| {
             sum.fetch_add(ctx.index(), Ordering::Relaxed);
         });
@@ -531,8 +542,9 @@ fn scope_spawn_broadcast() {
 
 #[test]
 fn scope_fifo_spawn_broadcast() {
+    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
     let sum = AtomicUsize::new(0);
-    let n = scope_fifo(|s| {
+    let n = pool.scope_fifo(|s| {
         s.spawn_broadcast(|_, ctx| {
             sum.fetch_add(ctx.index(), Ordering::Relaxed);
         });
@@ -542,6 +554,7 @@ fn scope_fifo_spawn_broadcast() {
 }
 
 #[test]
+#[ignore]
 fn scope_spawn_broadcast_nested() {
     let sum = AtomicUsize::new(0);
     let n = scope(|s| {
diff --git a/compiler/rustc_thread_pool/src/sleep/mod.rs b/compiler/rustc_thread_pool/src/sleep/mod.rs
index a9cdf68cc7e..31bf7184b42 100644
--- a/compiler/rustc_thread_pool/src/sleep/mod.rs
+++ b/compiler/rustc_thread_pool/src/sleep/mod.rs
@@ -144,6 +144,7 @@ impl Sleep {
         idle_state: &mut IdleState,
         latch: &CoreLatch,
         thread: &WorkerThread,
+        steal: bool,
     ) {
         if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
             thread::yield_now();
@@ -157,7 +158,7 @@ impl Sleep {
             thread::yield_now();
         } else {
             debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
-            self.sleep(idle_state, latch, thread);
+            self.sleep(idle_state, latch, thread, steal);
         }
     }
 
@@ -167,7 +168,13 @@ impl Sleep {
     }
 
     #[cold]
-    fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
+    fn sleep(
+        &self,
+        idle_state: &mut IdleState,
+        latch: &CoreLatch,
+        thread: &WorkerThread,
+        steal: bool,
+    ) {
         let worker_index = idle_state.worker_index;
 
         if !latch.get_sleepy() {
@@ -215,7 +222,7 @@ impl Sleep {
         // - that job triggers the rollover over the JEC such that we don't see it
         // - we are the last active worker thread
         std::sync::atomic::fence(Ordering::SeqCst);
-        if thread.has_injected_job() {
+        if steal && thread.has_injected_job() {
             // If we see an externally injected job, then we have to 'wake
             // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
             // the one that wakes us.)
diff --git a/compiler/rustc_thread_pool/src/spawn/mod.rs b/compiler/rustc_thread_pool/src/spawn/mod.rs
index 040a02bfa67..d403deaa108 100644
--- a/compiler/rustc_thread_pool/src/spawn/mod.rs
+++ b/compiler/rustc_thread_pool/src/spawn/mod.rs
@@ -95,7 +95,7 @@ where
 
     HeapJob::new(Tlv::null(), {
         let registry = Arc::clone(registry);
-        move || {
+        move |_| {
             registry.catch_unwind(func);
             registry.terminate(); // (*) permit registry to terminate now
         }
diff --git a/compiler/rustc_thread_pool/src/spawn/tests.rs b/compiler/rustc_thread_pool/src/spawn/tests.rs
index 8a70d2faf9c..a4989759cf9 100644
--- a/compiler/rustc_thread_pool/src/spawn/tests.rs
+++ b/compiler/rustc_thread_pool/src/spawn/tests.rs
@@ -167,6 +167,7 @@ macro_rules! test_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn lifo_order() {
     // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
@@ -176,6 +177,7 @@ fn lifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn fifo_order() {
     // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
@@ -185,6 +187,7 @@ fn fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn lifo_fifo_order() {
     // LIFO on the outside, FIFO on the inside
@@ -194,6 +197,7 @@ fn lifo_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn fifo_lifo_order() {
     // FIFO on the outside, LIFO on the inside
@@ -230,6 +234,7 @@ macro_rules! test_mixed_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_lifo_fifo_order() {
     let vec = test_mixed_order!(spawn, spawn_fifo);
@@ -238,6 +243,7 @@ fn mixed_lifo_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mixed_fifo_lifo_order() {
     let vec = test_mixed_order!(spawn_fifo, spawn);
diff --git a/compiler/rustc_thread_pool/src/thread_pool/tests.rs b/compiler/rustc_thread_pool/src/thread_pool/tests.rs
index 42c99565088..9feaed7efd0 100644
--- a/compiler/rustc_thread_pool/src/thread_pool/tests.rs
+++ b/compiler/rustc_thread_pool/src/thread_pool/tests.rs
@@ -152,6 +152,7 @@ fn self_install() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mutual_install() {
     let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -172,6 +173,7 @@ fn mutual_install() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn mutual_install_sleepy() {
     use std::{thread, time};
@@ -227,6 +229,7 @@ macro_rules! test_scope_order {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn scope_lifo_order() {
     let vec = test_scope_order!(scope => spawn);
@@ -235,6 +238,7 @@ fn scope_lifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn scope_fifo_order() {
     let vec = test_scope_order!(scope_fifo => spawn_fifo);
@@ -276,6 +280,7 @@ fn spawn_fifo_order() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_scopes() {
     // Create matching scopes for every thread pool.
@@ -312,6 +317,7 @@ fn nested_scopes() {
 }
 
 #[test]
+#[ignore]
 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
 fn nested_fifo_scopes() {
     // Create matching fifo scopes for every thread pool.
diff --git a/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs
index 805b6d8ee3f..2b89ea4be19 100644
--- a/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs
+++ b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs
@@ -36,7 +36,7 @@ fn overflow_code() -> Option<i32> {
 }
 
 #[test]
-#[cfg_attr(not(any(unix, windows)), ignore)]
+#[cfg_attr(not(any(unix)), ignore)]
 fn stack_overflow_crash() {
     // First check that the recursive call actually causes a stack overflow,
     // and does not get optimized away.