From cff9efde748b6027fe9f135ddd5aaf8a3276601d Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Tue, 22 Apr 2025 07:02:27 +0200 Subject: Add a jobserver proxy to ensure at least one token is always held --- compiler/rustc_data_structures/src/jobserver.rs | 94 +++++++++++++++++++++++-- compiler/rustc_data_structures/src/marker.rs | 8 +-- 2 files changed, 93 insertions(+), 9 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/jobserver.rs b/compiler/rustc_data_structures/src/jobserver.rs index 1204f2d692d..3ed1ea7543f 100644 --- a/compiler/rustc_data_structures/src/jobserver.rs +++ b/compiler/rustc_data_structures/src/jobserver.rs @@ -1,7 +1,8 @@ -use std::sync::{LazyLock, OnceLock}; +use std::sync::{Arc, LazyLock, OnceLock}; pub use jobserver_crate::{Acquired, Client, HelperThread}; use jobserver_crate::{FromEnv, FromEnvErrorKind}; +use parking_lot::{Condvar, Mutex}; // We can only call `from_env_ext` once per process @@ -71,10 +72,93 @@ pub fn client() -> Client { GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone() } -pub fn acquire_thread() { - GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok(); +struct ProxyData { + /// The number of tokens assigned to threads. + /// If this is 0, a single token is still assigned to this process, but is unused. + used: u16, + + /// The number of threads requesting a token + pending: u16, +} + +/// This is a jobserver proxy used to ensure that we hold on to at least one token. +pub struct Proxy { + client: Client, + data: Mutex, + + /// Threads which are waiting on a token will wait on this. + wake_pending: Condvar, + + helper: OnceLock, } -pub fn release_thread() { - GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok(); +impl Proxy { + pub fn new() -> Arc { + let proxy = Arc::new(Proxy { + client: client(), + data: Mutex::new(ProxyData { used: 1, pending: 0 }), + wake_pending: Condvar::new(), + helper: OnceLock::new(), + }); + let proxy_ = Arc::clone(&proxy); + let helper = proxy + .client + .clone() + .into_helper_thread(move |token| { + if let Ok(token) = token { + let mut data = proxy_.data.lock(); + if data.pending > 0 { + // Give the token to a waiting thread + token.drop_without_releasing(); + assert!(data.used > 0); + data.used += 1; + data.pending -= 1; + proxy_.wake_pending.notify_one(); + } else { + // The token is no longer needed, drop it. + drop(data); + drop(token); + } + } + }) + .expect("failed to create helper thread"); + proxy.helper.set(helper).unwrap(); + proxy + } + + pub fn acquire_thread(&self) { + let mut data = self.data.lock(); + + if data.used == 0 { + // There was a free token around. This can + // happen when all threads release their token. + assert_eq!(data.pending, 0); + data.used += 1; + } else { + // Request a token from the helper thread. We can't directly use `acquire_raw` + // as we also need to be able to wait for the final token in the process which + // does not get a corresponding `release_raw` call. + self.helper.get().unwrap().request_token(); + data.pending += 1; + self.wake_pending.wait(&mut data); + } + } + + pub fn release_thread(&self) { + let mut data = self.data.lock(); + + if data.pending > 0 { + // Give the token to a waiting thread + data.pending -= 1; + self.wake_pending.notify_one(); + } else { + data.used -= 1; + + // Release the token unless it's the last one in the process + if data.used > 0 { + drop(data); + self.client.release_raw().ok(); + } + } + } } diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index dfd9bd32076..e0df1b232e1 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -59,8 +59,8 @@ macro_rules! already_send { // These structures are already `Send`. already_send!( [std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File] - [rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler] - [crate::owned_slice::OwnedSlice] + [rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread] + [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); macro_rules! impl_dyn_send { @@ -134,8 +134,8 @@ macro_rules! already_sync { already_sync!( [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8] [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File] - [jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler] - [crate::owned_slice::OwnedSlice] + [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] + [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); // Use portable AtomicU64 for targets without native 64-bit atomics -- cgit 1.4.1-3-g733a5 From ef9403371f2d4759cd7b0d0a276e63aae2ce68fe Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Sat, 2 Mar 2024 18:32:42 +0100 Subject: Drop AST on a separate thread and prefetch `hir_crate` --- compiler/rustc_ast_lowering/src/lib.rs | 10 ++++++++-- compiler/rustc_data_structures/src/sync.rs | 2 +- compiler/rustc_data_structures/src/sync/parallel.rs | 11 +++++++++++ compiler/rustc_interface/src/passes.rs | 6 ++++++ 4 files changed, 26 insertions(+), 3 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_ast_lowering/src/lib.rs b/compiler/rustc_ast_lowering/src/lib.rs index 1e14b4d6723..8b1c63cd21d 100644 --- a/compiler/rustc_ast_lowering/src/lib.rs +++ b/compiler/rustc_ast_lowering/src/lib.rs @@ -49,6 +49,7 @@ use rustc_attr_parsing::{AttributeParser, OmitDoc}; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::sorted_map::SortedMap; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; +use rustc_data_structures::sync::spawn; use rustc_data_structures::tagged_ptr::TaggedRef; use rustc_errors::{DiagArgFromDisplay, DiagCtxtHandle, StashKey}; use rustc_hir::def::{DefKind, LifetimeRes, Namespace, PartialRes, PerNS, Res}; @@ -454,9 +455,14 @@ pub fn lower_to_hir(tcx: TyCtxt<'_>, (): ()) -> hir::Crate<'_> { .lower_node(def_id); } - // Drop AST to free memory drop(ast_index); - sess.time("drop_ast", || drop(krate)); + + // Drop AST to free memory. It can be expensive so try to drop it on a separate thread. + let prof = sess.prof.clone(); + spawn(move || { + let _timer = prof.verbose_generic_activity("drop_ast"); + drop(krate); + }); // Don't hash unless necessary, because it's expensive. let opt_hir_hash = diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 616a18a72ab..80d49effbf8 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; pub use self::lock::{Lock, LockGuard, Mode}; pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; pub use self::parallel::{ - join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in, + join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in, }; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs index ba3c85ef5b1..64db39cc4c6 100644 --- a/compiler/rustc_data_structures/src/sync/parallel.rs +++ b/compiler/rustc_data_structures/src/sync/parallel.rs @@ -93,6 +93,17 @@ macro_rules! parallel { }; } +pub fn spawn(func: impl FnOnce() + DynSend + 'static) { + if mode::is_dyn_thread_safe() { + let func = FromDyn::from(func); + rayon_core::spawn(|| { + (func.into_inner())(); + }); + } else { + func() + } +} + // This function only works when `mode::is_dyn_thread_safe()`. pub fn scope<'scope, OP, R>(op: OP) -> R where diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index 66d2a79b93a..aad63f4edcf 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -900,6 +900,12 @@ fn run_required_analyses(tcx: TyCtxt<'_>) { // is not defined. So we need to cfg it out. #[cfg(all(not(doc), debug_assertions))] rustc_passes::hir_id_validator::check_crate(tcx); + + // Prefetch this to prevent multiple threads from blocking on it later. + // This is needed since the `hir_id_validator::check_crate` call above is not guaranteed + // to use `hir_crate`. + tcx.ensure_done().hir_crate(()); + let sess = tcx.sess; sess.time("misc_checking_1", || { parallel!( -- cgit 1.4.1-3-g733a5