about summary refs log tree commit diff
path: root/compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
diff options
context:
space:
mode:
authorbjorn3 <17426603+bjorn3@users.noreply.github.com>2022-08-24 18:40:58 +0200
committerbjorn3 <17426603+bjorn3@users.noreply.github.com>2022-08-24 18:40:58 +0200
commit7b9c8c87819d0d122f56017f5e093ea8c082dc69 (patch)
tree59f1afde620220a353f036efc7b95ea17dab9e67 /compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
parent4a24f08ba43166cfee86d868b3fe8612aec6faca (diff)
parente9d1a0a7b0b28dd422f1a790ccde532acafbf193 (diff)
downloadrust-7b9c8c87819d0d122f56017f5e093ea8c082dc69.tar.gz
rust-7b9c8c87819d0d122f56017f5e093ea8c082dc69.zip
Merge commit 'e9d1a0a7b0b28dd422f1a790ccde532acafbf193' into sync_cg_clif-2022-08-24
Diffstat (limited to 'compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs')
-rw-r--r--compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs168
1 files changed, 168 insertions, 0 deletions
diff --git a/compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs b/compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
new file mode 100644
index 00000000000..dfde9792046
--- /dev/null
+++ b/compiler/rustc_codegen_cranelift/src/concurrency_limiter.rs
@@ -0,0 +1,168 @@
+use std::sync::{Arc, Condvar, Mutex};
+
+use rustc_session::Session;
+
+use jobserver::HelperThread;
+
+// FIXME don't panic when a worker thread panics
+
+pub(super) struct ConcurrencyLimiter {
+    helper_thread: Option<HelperThread>,
+    state: Arc<Mutex<state::ConcurrencyLimiterState>>,
+    available_token_condvar: Arc<Condvar>,
+}
+
+impl ConcurrencyLimiter {
+    pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
+        let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
+        let available_token_condvar = Arc::new(Condvar::new());
+
+        let state_helper = state.clone();
+        let available_token_condvar_helper = available_token_condvar.clone();
+        let helper_thread = sess
+            .jobserver
+            .clone()
+            .into_helper_thread(move |token| {
+                let mut state = state_helper.lock().unwrap();
+                state.add_new_token(token.unwrap());
+                available_token_condvar_helper.notify_one();
+            })
+            .unwrap();
+        ConcurrencyLimiter {
+            helper_thread: Some(helper_thread),
+            state,
+            available_token_condvar: Arc::new(Condvar::new()),
+        }
+    }
+
+    pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
+        let mut state = self.state.lock().unwrap();
+        loop {
+            state.assert_invariants();
+
+            if state.try_start_job() {
+                return ConcurrencyLimiterToken {
+                    state: self.state.clone(),
+                    available_token_condvar: self.available_token_condvar.clone(),
+                };
+            }
+
+            self.helper_thread.as_mut().unwrap().request_token();
+            state = self.available_token_condvar.wait(state).unwrap();
+        }
+    }
+
+    pub(super) fn job_already_done(&mut self) {
+        let mut state = self.state.lock().unwrap();
+        state.job_already_done();
+    }
+}
+
+impl Drop for ConcurrencyLimiter {
+    fn drop(&mut self) {
+        //
+        self.helper_thread.take();
+
+        // Assert that all jobs have finished
+        let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
+        state.assert_done();
+    }
+}
+
+#[derive(Debug)]
+pub(super) struct ConcurrencyLimiterToken {
+    state: Arc<Mutex<state::ConcurrencyLimiterState>>,
+    available_token_condvar: Arc<Condvar>,
+}
+
+impl Drop for ConcurrencyLimiterToken {
+    fn drop(&mut self) {
+        let mut state = self.state.lock().unwrap();
+        state.job_finished();
+        self.available_token_condvar.notify_one();
+    }
+}
+
+mod state {
+    use jobserver::Acquired;
+
+    #[derive(Debug)]
+    pub(super) struct ConcurrencyLimiterState {
+        pending_jobs: usize,
+        active_jobs: usize,
+
+        // None is used to represent the implicit token, Some to represent explicit tokens
+        tokens: Vec<Option<Acquired>>,
+    }
+
+    impl ConcurrencyLimiterState {
+        pub(super) fn new(pending_jobs: usize) -> Self {
+            ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
+        }
+
+        pub(super) fn assert_invariants(&self) {
+            // There must be no excess active jobs
+            assert!(self.active_jobs <= self.pending_jobs);
+
+            // There may not be more active jobs than there are tokens
+            assert!(self.active_jobs <= self.tokens.len());
+        }
+
+        pub(super) fn assert_done(&self) {
+            assert_eq!(self.pending_jobs, 0);
+            assert_eq!(self.active_jobs, 0);
+        }
+
+        pub(super) fn add_new_token(&mut self, token: Acquired) {
+            self.tokens.push(Some(token));
+            self.drop_excess_capacity();
+        }
+
+        pub(super) fn try_start_job(&mut self) -> bool {
+            if self.active_jobs < self.tokens.len() {
+                // Using existing token
+                self.job_started();
+                return true;
+            }
+
+            false
+        }
+
+        pub(super) fn job_started(&mut self) {
+            self.assert_invariants();
+            self.active_jobs += 1;
+            self.drop_excess_capacity();
+            self.assert_invariants();
+        }
+
+        pub(super) fn job_finished(&mut self) {
+            self.assert_invariants();
+            self.pending_jobs -= 1;
+            self.active_jobs -= 1;
+            self.assert_invariants();
+            self.drop_excess_capacity();
+            self.assert_invariants();
+        }
+
+        pub(super) fn job_already_done(&mut self) {
+            self.assert_invariants();
+            self.pending_jobs -= 1;
+            self.assert_invariants();
+            self.drop_excess_capacity();
+            self.assert_invariants();
+        }
+
+        fn drop_excess_capacity(&mut self) {
+            self.assert_invariants();
+
+            // Drop all tokens that can never be used anymore
+            self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));
+
+            // Keep some excess tokens to satisfy requests faster
+            const MAX_EXTRA_CAPACITY: usize = 2;
+            self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
+
+            self.assert_invariants();
+        }
+    }
+}