about summary refs log tree commit diff
diff options
context:
space:
mode:
authorJohn Kåre Alsaker <john.kare.alsaker@gmail.com>2025-04-22 07:02:27 +0200
committerJohn Kåre Alsaker <john.kare.alsaker@gmail.com>2025-04-29 07:20:13 +0200
commitcff9efde748b6027fe9f135ddd5aaf8a3276601d (patch)
tree8d6877af6e5f512fd0b7615ddb25632516dad48f
parent25cdf1f67463c9365d8d83778c933ec7480e940b (diff)
downloadrust-cff9efde748b6027fe9f135ddd5aaf8a3276601d.tar.gz
rust-cff9efde748b6027fe9f135ddd5aaf8a3276601d.zip
Add a jobserver proxy to ensure at least one token is always held
-rw-r--r--compiler/rustc_data_structures/src/jobserver.rs94
-rw-r--r--compiler/rustc_data_structures/src/marker.rs8
-rw-r--r--compiler/rustc_interface/src/interface.rs6
-rw-r--r--compiler/rustc_interface/src/passes.rs15
-rw-r--r--compiler/rustc_interface/src/util.rs28
-rw-r--r--compiler/rustc_middle/src/ty/context.rs5
-rw-r--r--compiler/rustc_query_impl/src/plumbing.rs6
-rw-r--r--compiler/rustc_query_system/src/query/job.rs11
-rw-r--r--compiler/rustc_query_system/src/query/mod.rs3
-rw-r--r--compiler/rustc_query_system/src/query/plumbing.rs2
10 files changed, 149 insertions, 29 deletions
diff --git a/compiler/rustc_data_structures/src/jobserver.rs b/compiler/rustc_data_structures/src/jobserver.rs
index 1204f2d692d..3ed1ea7543f 100644
--- a/compiler/rustc_data_structures/src/jobserver.rs
+++ b/compiler/rustc_data_structures/src/jobserver.rs
@@ -1,7 +1,8 @@
-use std::sync::{LazyLock, OnceLock};
+use std::sync::{Arc, LazyLock, OnceLock};
 
 pub use jobserver_crate::{Acquired, Client, HelperThread};
 use jobserver_crate::{FromEnv, FromEnvErrorKind};
+use parking_lot::{Condvar, Mutex};
 
 // We can only call `from_env_ext` once per process
 
@@ -71,10 +72,93 @@ pub fn client() -> Client {
     GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
 }
 
-pub fn acquire_thread() {
-    GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
+struct ProxyData {
+    /// The number of tokens assigned to threads.
+    /// If this is 0, a single token is still assigned to this process, but is unused.
+    used: u16,
+
+    /// The number of threads requesting a token
+    pending: u16,
+}
+
+/// This is a jobserver proxy used to ensure that we hold on to at least one token.
+pub struct Proxy {
+    client: Client,
+    data: Mutex<ProxyData>,
+
+    /// Threads which are waiting on a token will wait on this.
+    wake_pending: Condvar,
+
+    helper: OnceLock<HelperThread>,
 }
 
-pub fn release_thread() {
-    GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
+impl Proxy {
+    pub fn new() -> Arc<Self> {
+        let proxy = Arc::new(Proxy {
+            client: client(),
+            data: Mutex::new(ProxyData { used: 1, pending: 0 }),
+            wake_pending: Condvar::new(),
+            helper: OnceLock::new(),
+        });
+        let proxy_ = Arc::clone(&proxy);
+        let helper = proxy
+            .client
+            .clone()
+            .into_helper_thread(move |token| {
+                if let Ok(token) = token {
+                    let mut data = proxy_.data.lock();
+                    if data.pending > 0 {
+                        // Give the token to a waiting thread
+                        token.drop_without_releasing();
+                        assert!(data.used > 0);
+                        data.used += 1;
+                        data.pending -= 1;
+                        proxy_.wake_pending.notify_one();
+                    } else {
+                        // The token is no longer needed, drop it.
+                        drop(data);
+                        drop(token);
+                    }
+                }
+            })
+            .expect("failed to create helper thread");
+        proxy.helper.set(helper).unwrap();
+        proxy
+    }
+
+    pub fn acquire_thread(&self) {
+        let mut data = self.data.lock();
+
+        if data.used == 0 {
+            // There was a free token around. This can
+            // happen when all threads release their token.
+            assert_eq!(data.pending, 0);
+            data.used += 1;
+        } else {
+            // Request a token from the helper thread. We can't directly use `acquire_raw`
+            // as we also need to be able to wait for the final token in the process which
+            // does not get a corresponding `release_raw` call.
+            self.helper.get().unwrap().request_token();
+            data.pending += 1;
+            self.wake_pending.wait(&mut data);
+        }
+    }
+
+    pub fn release_thread(&self) {
+        let mut data = self.data.lock();
+
+        if data.pending > 0 {
+            // Give the token to a waiting thread
+            data.pending -= 1;
+            self.wake_pending.notify_one();
+        } else {
+            data.used -= 1;
+
+            // Release the token unless it's the last one in the process
+            if data.used > 0 {
+                drop(data);
+                self.client.release_raw().ok();
+            }
+        }
+    }
 }
diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs
index dfd9bd32076..e0df1b232e1 100644
--- a/compiler/rustc_data_structures/src/marker.rs
+++ b/compiler/rustc_data_structures/src/marker.rs
@@ -59,8 +59,8 @@ macro_rules! already_send {
 // These structures are already `Send`.
 already_send!(
     [std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
-        [rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
-        [crate::owned_slice::OwnedSlice]
+        [rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
+        [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
 );
 
 macro_rules! impl_dyn_send {
@@ -134,8 +134,8 @@ macro_rules! already_sync {
 already_sync!(
     [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
         [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
-        [jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
-        [crate::owned_slice::OwnedSlice]
+        [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
+        [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
 );
 
 // Use portable AtomicU64 for targets without native 64-bit atomics
diff --git a/compiler/rustc_interface/src/interface.rs b/compiler/rustc_interface/src/interface.rs
index 708fe23b791..0178c1470fa 100644
--- a/compiler/rustc_interface/src/interface.rs
+++ b/compiler/rustc_interface/src/interface.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
 use rustc_ast::{LitKind, MetaItemKind, token};
 use rustc_codegen_ssa::traits::CodegenBackend;
 use rustc_data_structures::fx::{FxHashMap, FxHashSet};
-use rustc_data_structures::jobserver;
+use rustc_data_structures::jobserver::{self, Proxy};
 use rustc_data_structures::stable_hasher::StableHasher;
 use rustc_errors::registry::Registry;
 use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@@ -41,6 +41,7 @@ pub struct Compiler {
     pub codegen_backend: Box<dyn CodegenBackend>,
     pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
     pub(crate) current_gcx: CurrentGcx,
+    pub(crate) jobserver_proxy: Arc<Proxy>,
 }
 
 /// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@@ -415,7 +416,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
         config.opts.unstable_opts.threads,
         &config.extra_symbols,
         SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
-        |current_gcx| {
+        |current_gcx, jobserver_proxy| {
             // The previous `early_dcx` can't be reused here because it doesn't
             // impl `Send`. Creating a new one is fine.
             let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@@ -511,6 +512,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
                 codegen_backend,
                 override_queries: config.override_queries,
                 current_gcx,
+                jobserver_proxy,
             };
 
             // There are two paths out of `f`.
diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs
index 66d2a79b93a..c95442d908d 100644
--- a/compiler/rustc_interface/src/passes.rs
+++ b/compiler/rustc_interface/src/passes.rs
@@ -7,6 +7,7 @@ use std::{env, fs, iter};
 
 use rustc_ast as ast;
 use rustc_codegen_ssa::traits::CodegenBackend;
+use rustc_data_structures::jobserver::Proxy;
 use rustc_data_structures::parallel;
 use rustc_data_structures::steal::Steal;
 use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
         dyn for<'tcx> FnOnce(
             &'tcx Session,
             CurrentGcx,
+            Arc<Proxy>,
             &'tcx OnceLock<GlobalCtxt<'tcx>>,
             &'tcx WorkerLocal<Arena<'tcx>>,
             &'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
             F,
         ) -> T,
-    > = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
+    > = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
         TyCtxt::create_global_ctxt(
             gcx_cell,
             sess,
@@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
             ),
             providers.hooks,
             current_gcx,
+            jobserver_proxy,
             |tcx| {
                 let feed = tcx.create_crate_num(stable_crate_id).unwrap();
                 assert_eq!(feed.key(), LOCAL_CRATE);
@@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
         )
     });
 
-    inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
+    inner(
+        &compiler.sess,
+        compiler.current_gcx.clone(),
+        Arc::clone(&compiler.jobserver_proxy),
+        &gcx_cell,
+        &arena,
+        &hir_arena,
+        f,
+    )
 }
 
 /// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.
diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs
index c3a939f1ab0..3a291bbe802 100644
--- a/compiler/rustc_interface/src/util.rs
+++ b/compiler/rustc_interface/src/util.rs
@@ -1,11 +1,12 @@
 use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
 use std::path::{Path, PathBuf};
-use std::sync::OnceLock;
 use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, OnceLock};
 use std::{env, iter, thread};
 
 use rustc_ast as ast;
 use rustc_codegen_ssa::traits::CodegenBackend;
+use rustc_data_structures::jobserver::Proxy;
 use rustc_data_structures::sync;
 use rustc_metadata::{DylibError, load_symbol_from_dylib};
 use rustc_middle::ty::CurrentGcx;
@@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
     })
 }
 
-fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
+fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
     thread_stack_size: usize,
     edition: Edition,
     sm_inputs: SourceMapInputs,
@@ -139,7 +140,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
                     edition,
                     extra_symbols,
                     Some(sm_inputs),
-                    || f(CurrentGcx::new()),
+                    || f(CurrentGcx::new(), Proxy::new()),
                 )
             })
             .unwrap()
@@ -152,7 +153,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
     })
 }
 
-pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
+pub(crate) fn run_in_thread_pool_with_globals<
+    F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
+    R: Send,
+>(
     thread_builder_diag: &EarlyDiagCtxt,
     edition: Edition,
     threads: usize,
@@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
 ) -> R {
     use std::process;
 
+    use rustc_data_structures::defer;
     use rustc_data_structures::sync::FromDyn;
-    use rustc_data_structures::{defer, jobserver};
     use rustc_middle::ty::tls;
     use rustc_query_impl::QueryCtxt;
     use rustc_query_system::query::{QueryContext, break_query_cycles};
@@ -178,11 +182,11 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
             edition,
             sm_inputs,
             extra_symbols,
-            |current_gcx| {
+            |current_gcx, jobserver_proxy| {
                 // Register the thread for use with the `WorkerLocal` type.
                 registry.register();
 
-                f(current_gcx)
+                f(current_gcx, jobserver_proxy)
             },
         );
     }
@@ -190,10 +194,14 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
     let current_gcx = FromDyn::from(CurrentGcx::new());
     let current_gcx2 = current_gcx.clone();
 
+    let proxy = Proxy::new();
+
+    let proxy_ = Arc::clone(&proxy);
+    let proxy__ = Arc::clone(&proxy);
     let builder = rayon_core::ThreadPoolBuilder::new()
         .thread_name(|_| "rustc".to_string())
-        .acquire_thread_handler(jobserver::acquire_thread)
-        .release_thread_handler(jobserver::release_thread)
+        .acquire_thread_handler(move || proxy_.acquire_thread())
+        .release_thread_handler(move || proxy__.release_thread())
         .num_threads(threads)
         .deadlock_handler(move || {
             // On deadlock, creates a new thread and forwards information in thread
@@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
                     },
                     // Run `f` on the first thread in the thread pool.
                     move |pool: &rayon_core::ThreadPool| {
-                        pool.install(|| f(current_gcx.into_inner()))
+                        pool.install(|| f(current_gcx.into_inner(), proxy))
                     },
                 )
                 .unwrap()
diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs
index 1efd0d1d14b..eb12249882e 100644
--- a/compiler/rustc_middle/src/ty/context.rs
+++ b/compiler/rustc_middle/src/ty/context.rs
@@ -21,6 +21,7 @@ use rustc_data_structures::defer;
 use rustc_data_structures::fingerprint::Fingerprint;
 use rustc_data_structures::fx::FxHashMap;
 use rustc_data_structures::intern::Interned;
+use rustc_data_structures::jobserver::Proxy;
 use rustc_data_structures::profiling::SelfProfilerRef;
 use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
 use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@@ -1438,6 +1439,8 @@ pub struct GlobalCtxt<'tcx> {
     pub(crate) alloc_map: interpret::AllocMap<'tcx>,
 
     current_gcx: CurrentGcx,
+
+    pub jobserver_proxy: Arc<Proxy>,
 }
 
 impl<'tcx> GlobalCtxt<'tcx> {
@@ -1642,6 +1645,7 @@ impl<'tcx> TyCtxt<'tcx> {
         query_system: QuerySystem<'tcx>,
         hooks: crate::hooks::Providers,
         current_gcx: CurrentGcx,
+        jobserver_proxy: Arc<Proxy>,
         f: impl FnOnce(TyCtxt<'tcx>) -> T,
     ) -> T {
         let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@@ -1676,6 +1680,7 @@ impl<'tcx> TyCtxt<'tcx> {
             data_layout,
             alloc_map: interpret::AllocMap::new(),
             current_gcx,
+            jobserver_proxy,
         });
 
         // This is a separate function to work around a crash with parallel rustc (#135870)
diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs
index 19ccc5587d6..ea37dc5489b 100644
--- a/compiler/rustc_query_impl/src/plumbing.rs
+++ b/compiler/rustc_query_impl/src/plumbing.rs
@@ -4,6 +4,7 @@
 
 use std::num::NonZero;
 
+use rustc_data_structures::jobserver::Proxy;
 use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
 use rustc_data_structures::sync::{DynSend, DynSync};
 use rustc_data_structures::unord::UnordMap;
@@ -70,6 +71,11 @@ impl<'tcx> QueryContext for QueryCtxt<'tcx> {
     type QueryInfo = QueryStackDeferred<'tcx>;
 
     #[inline]
+    fn jobserver_proxy(&self) -> &Proxy {
+        &*self.jobserver_proxy
+    }
+
+    #[inline]
     fn next_job_id(self) -> QueryJobId {
         QueryJobId(
             NonZero::new(self.query_system.jobs.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs
index de35cd79ea2..6321abc5087 100644
--- a/compiler/rustc_query_system/src/query/job.rs
+++ b/compiler/rustc_query_system/src/query/job.rs
@@ -7,7 +7,6 @@ use std::sync::Arc;
 
 use parking_lot::{Condvar, Mutex};
 use rustc_data_structures::fx::{FxHashMap, FxHashSet};
-use rustc_data_structures::jobserver;
 use rustc_errors::{Diag, DiagCtxtHandle};
 use rustc_hir::def::DefKind;
 use rustc_session::Session;
@@ -207,12 +206,13 @@ impl<I> QueryLatch<I> {
     /// Awaits for the query job to complete.
     pub(super) fn wait_on(
         &self,
+        qcx: impl QueryContext,
         query: Option<QueryJobId>,
         span: Span,
     ) -> Result<(), CycleError<I>> {
         let waiter =
             Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() });
-        self.wait_on_inner(&waiter);
+        self.wait_on_inner(qcx, &waiter);
         // FIXME: Get rid of this lock. We have ownership of the QueryWaiter
         // although another thread may still have a Arc reference so we cannot
         // use Arc::get_mut
@@ -224,7 +224,7 @@ impl<I> QueryLatch<I> {
     }
 
     /// Awaits the caller on this latch by blocking the current thread.
-    fn wait_on_inner(&self, waiter: &Arc<QueryWaiter<I>>) {
+    fn wait_on_inner(&self, qcx: impl QueryContext, waiter: &Arc<QueryWaiter<I>>) {
         let mut info = self.info.lock();
         if !info.complete {
             // We push the waiter on to the `waiters` list. It can be accessed inside
@@ -237,11 +237,12 @@ impl<I> QueryLatch<I> {
             // we have to be in the `wait` call. This is ensured by the deadlock handler
             // getting the self.info lock.
             rayon_core::mark_blocked();
-            jobserver::release_thread();
+            let proxy = qcx.jobserver_proxy();
+            proxy.release_thread();
             waiter.condvar.wait(&mut info);
             // Release the lock before we potentially block in `acquire_thread`
             drop(info);
-            jobserver::acquire_thread();
+            proxy.acquire_thread();
         }
     }
 
diff --git a/compiler/rustc_query_system/src/query/mod.rs b/compiler/rustc_query_system/src/query/mod.rs
index ef21af7dafb..a87f598674e 100644
--- a/compiler/rustc_query_system/src/query/mod.rs
+++ b/compiler/rustc_query_system/src/query/mod.rs
@@ -16,6 +16,7 @@ mod caches;
 pub use self::caches::{DefIdCache, DefaultCache, QueryCache, SingleCache, VecCache};
 
 mod config;
+use rustc_data_structures::jobserver::Proxy;
 use rustc_data_structures::sync::{DynSend, DynSync};
 use rustc_errors::DiagInner;
 use rustc_hashes::Hash64;
@@ -151,6 +152,8 @@ pub enum QuerySideEffect {
 pub trait QueryContext: HasDepContext {
     type QueryInfo: Clone;
 
+    fn jobserver_proxy(&self) -> &Proxy;
+
     fn next_job_id(self) -> QueryJobId;
 
     /// Get the query information from the TLS context.
diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs
index 6ea8e3b9200..3c1fc731784 100644
--- a/compiler/rustc_query_system/src/query/plumbing.rs
+++ b/compiler/rustc_query_system/src/query/plumbing.rs
@@ -297,7 +297,7 @@ where
 
     // With parallel queries we might just have to wait on some other
     // thread.
-    let result = latch.wait_on(current, span);
+    let result = latch.wait_on(qcx, current, span);
 
     match result {
         Ok(()) => {