about summary refs log tree commit diff
path: root/compiler/rustc_data_structures/src
diff options
context:
space:
mode:
authorLaurențiu Nicola <lnicola@users.noreply.github.com>2025-05-01 07:33:30 +0000
committerGitHub <noreply@github.com>2025-05-01 07:33:30 +0000
commit00d2f60efd516bc7ea658bd0a6de5e2f1f1df322 (patch)
tree9375885343e27ab26b20dfe5a3714e00d5d378d0 /compiler/rustc_data_structures/src
parente7502210ce5ab2d92cb87c372dab51b637ba6df4 (diff)
parent1c5de64814d72effc6890ca823fa4d248041a0bd (diff)
downloadrust-00d2f60efd516bc7ea658bd0a6de5e2f1f1df322.tar.gz
rust-00d2f60efd516bc7ea658bd0a6de5e2f1f1df322.zip
Merge pull request #19726 from lnicola/sync-from-rust
Sync from downstream again
Diffstat (limited to 'compiler/rustc_data_structures/src')
-rw-r--r--compiler/rustc_data_structures/src/jobserver.rs94
-rw-r--r--compiler/rustc_data_structures/src/marker.rs8
-rw-r--r--compiler/rustc_data_structures/src/sync.rs2
-rw-r--r--compiler/rustc_data_structures/src/sync/parallel.rs11
4 files changed, 105 insertions, 10 deletions
diff --git a/compiler/rustc_data_structures/src/jobserver.rs b/compiler/rustc_data_structures/src/jobserver.rs
index 1204f2d692d..3ed1ea7543f 100644
--- a/compiler/rustc_data_structures/src/jobserver.rs
+++ b/compiler/rustc_data_structures/src/jobserver.rs
@@ -1,7 +1,8 @@
-use std::sync::{LazyLock, OnceLock};
+use std::sync::{Arc, LazyLock, OnceLock};
 
 pub use jobserver_crate::{Acquired, Client, HelperThread};
 use jobserver_crate::{FromEnv, FromEnvErrorKind};
+use parking_lot::{Condvar, Mutex};
 
 // We can only call `from_env_ext` once per process
 
@@ -71,10 +72,93 @@ pub fn client() -> Client {
     GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
 }
 
-pub fn acquire_thread() {
-    GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
+struct ProxyData {
+    /// The number of tokens assigned to threads.
+    /// If this is 0, a single token is still assigned to this process, but is unused.
+    used: u16,
+
+    /// The number of threads requesting a token
+    pending: u16,
+}
+
+/// This is a jobserver proxy used to ensure that we hold on to at least one token.
+pub struct Proxy {
+    client: Client,
+    data: Mutex<ProxyData>,
+
+    /// Threads which are waiting on a token will wait on this.
+    wake_pending: Condvar,
+
+    helper: OnceLock<HelperThread>,
 }
 
-pub fn release_thread() {
-    GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
+impl Proxy {
+    pub fn new() -> Arc<Self> {
+        let proxy = Arc::new(Proxy {
+            client: client(),
+            data: Mutex::new(ProxyData { used: 1, pending: 0 }),
+            wake_pending: Condvar::new(),
+            helper: OnceLock::new(),
+        });
+        let proxy_ = Arc::clone(&proxy);
+        let helper = proxy
+            .client
+            .clone()
+            .into_helper_thread(move |token| {
+                if let Ok(token) = token {
+                    let mut data = proxy_.data.lock();
+                    if data.pending > 0 {
+                        // Give the token to a waiting thread
+                        token.drop_without_releasing();
+                        assert!(data.used > 0);
+                        data.used += 1;
+                        data.pending -= 1;
+                        proxy_.wake_pending.notify_one();
+                    } else {
+                        // The token is no longer needed, drop it.
+                        drop(data);
+                        drop(token);
+                    }
+                }
+            })
+            .expect("failed to create helper thread");
+        proxy.helper.set(helper).unwrap();
+        proxy
+    }
+
+    pub fn acquire_thread(&self) {
+        let mut data = self.data.lock();
+
+        if data.used == 0 {
+            // There was a free token around. This can
+            // happen when all threads release their token.
+            assert_eq!(data.pending, 0);
+            data.used += 1;
+        } else {
+            // Request a token from the helper thread. We can't directly use `acquire_raw`
+            // as we also need to be able to wait for the final token in the process which
+            // does not get a corresponding `release_raw` call.
+            self.helper.get().unwrap().request_token();
+            data.pending += 1;
+            self.wake_pending.wait(&mut data);
+        }
+    }
+
+    pub fn release_thread(&self) {
+        let mut data = self.data.lock();
+
+        if data.pending > 0 {
+            // Give the token to a waiting thread
+            data.pending -= 1;
+            self.wake_pending.notify_one();
+        } else {
+            data.used -= 1;
+
+            // Release the token unless it's the last one in the process
+            if data.used > 0 {
+                drop(data);
+                self.client.release_raw().ok();
+            }
+        }
+    }
 }
diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs
index dfd9bd32076..e0df1b232e1 100644
--- a/compiler/rustc_data_structures/src/marker.rs
+++ b/compiler/rustc_data_structures/src/marker.rs
@@ -59,8 +59,8 @@ macro_rules! already_send {
 // These structures are already `Send`.
 already_send!(
     [std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
-        [rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
-        [crate::owned_slice::OwnedSlice]
+        [rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
+        [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
 );
 
 macro_rules! impl_dyn_send {
@@ -134,8 +134,8 @@ macro_rules! already_sync {
 already_sync!(
     [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
         [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
-        [jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
-        [crate::owned_slice::OwnedSlice]
+        [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
+        [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
 );
 
 // Use portable AtomicU64 for targets without native 64-bit atomics
diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs
index 616a18a72ab..80d49effbf8 100644
--- a/compiler/rustc_data_structures/src/sync.rs
+++ b/compiler/rustc_data_structures/src/sync.rs
@@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
 pub use self::lock::{Lock, LockGuard, Mode};
 pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
 pub use self::parallel::{
-    join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in,
+    join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
 };
 pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
 pub use self::worker_local::{Registry, WorkerLocal};
diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs
index ba3c85ef5b1..64db39cc4c6 100644
--- a/compiler/rustc_data_structures/src/sync/parallel.rs
+++ b/compiler/rustc_data_structures/src/sync/parallel.rs
@@ -93,6 +93,17 @@ macro_rules! parallel {
         };
     }
 
+pub fn spawn(func: impl FnOnce() + DynSend + 'static) {
+    if mode::is_dyn_thread_safe() {
+        let func = FromDyn::from(func);
+        rayon_core::spawn(|| {
+            (func.into_inner())();
+        });
+    } else {
+        func()
+    }
+}
+
 // This function only works when `mode::is_dyn_thread_safe()`.
 pub fn scope<'scope, OP, R>(op: OP) -> R
 where