about summary refs log tree commit diff
path: root/compiler/rustc_data_structures/src/sync
diff options
context:
space:
mode:
authorMatthias Krüger <matthias.krueger@famsik.de>2023-09-11 21:16:20 +0200
committerGitHub <noreply@github.com>2023-09-11 21:16:20 +0200
commitf3cc59b741b576d4e791e6ae4c5b4811f9935801 (patch)
treeadac5054f0fcb9c552f6b2ff52e37346b4f814aa /compiler/rustc_data_structures/src/sync
parent0a199e4e93d2c7cfa7e0bee831e5aed03250fab4 (diff)
parentfe614712dded0a861fbcbf654663128318eb6dfb (diff)
downloadrust-f3cc59b741b576d4e791e6ae4c5b4811f9935801.tar.gz
rust-f3cc59b741b576d4e791e6ae4c5b4811f9935801.zip
Rollup merge of #115548 - Zoxc:parallel-extract, r=wesleywiser
Extract parallel operations in `rustc_data_structures::sync` into a new `parallel` submodule

This extracts parallel operations in `rustc_data_structures::sync` into a new `parallel` submodule. This cuts down on the size of the large `cfg_if!` in `sync` and makes it easier to compare between serial and parallel variants.
Diffstat (limited to 'compiler/rustc_data_structures/src/sync')
-rw-r--r--compiler/rustc_data_structures/src/sync/parallel.rs188
1 files changed, 188 insertions, 0 deletions
diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs
new file mode 100644
index 00000000000..1944ddfb710
--- /dev/null
+++ b/compiler/rustc_data_structures/src/sync/parallel.rs
@@ -0,0 +1,188 @@
+//! This module defines parallel operations that are implemented in
+//! one way for the serial compiler, and another way the parallel compiler.
+
+#![allow(dead_code)]
+
+use parking_lot::Mutex;
+use std::any::Any;
+use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
+
+#[cfg(not(parallel_compiler))]
+pub use disabled::*;
+#[cfg(parallel_compiler)]
+pub use enabled::*;
+
+/// A guard used to hold panics that occur during a parallel section to later by unwound.
+/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
+/// hiding errors by ensuring that everything in the section has completed executing before
+/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
+/// output match the parallel compiler for testing purposes.
+pub struct ParallelGuard {
+    panic: Mutex<Option<Box<dyn Any + Send + 'static>>>,
+}
+
+impl ParallelGuard {
+    pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
+        catch_unwind(AssertUnwindSafe(f))
+            .map_err(|err| {
+                *self.panic.lock() = Some(err);
+            })
+            .ok()
+    }
+}
+
+/// This gives access to a fresh parallel guard in the closure and will unwind any panics
+/// caught in it after the closure returns.
+#[inline]
+pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
+    let guard = ParallelGuard { panic: Mutex::new(None) };
+    let ret = f(&guard);
+    if let Some(panic) = guard.panic.into_inner() {
+        resume_unwind(panic);
+    }
+    ret
+}
+
+mod disabled {
+    use crate::sync::parallel_guard;
+
+    #[macro_export]
+    #[cfg(not(parallel_compiler))]
+    macro_rules! parallel {
+        ($($blocks:block),*) => {{
+            $crate::sync::parallel_guard(|guard| {
+                $(guard.run(|| $blocks);)*
+            });
+        }}
+    }
+
+    pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
+    where
+        A: FnOnce() -> RA,
+        B: FnOnce() -> RB,
+    {
+        let (a, b) = parallel_guard(|guard| {
+            let a = guard.run(oper_a);
+            let b = guard.run(oper_b);
+            (a, b)
+        });
+        (a.unwrap(), b.unwrap())
+    }
+
+    pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item)) {
+        parallel_guard(|guard| {
+            t.into_iter().for_each(|i| {
+                guard.run(|| for_each(i));
+            });
+        })
+    }
+
+    pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
+        t: T,
+        mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
+    ) -> C {
+        parallel_guard(|guard| t.into_iter().filter_map(|i| guard.run(|| map(i))).collect())
+    }
+}
+
+#[cfg(parallel_compiler)]
+mod enabled {
+    use crate::sync::{mode, parallel_guard, DynSend, DynSync, FromDyn};
+
+    /// Runs a list of blocks in parallel. The first block is executed immediately on
+    /// the current thread. Use that for the longest running block.
+    #[macro_export]
+    macro_rules! parallel {
+        (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
+            parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
+        };
+        (impl $fblock:block [$($blocks:expr,)*] []) => {
+            ::rustc_data_structures::sync::scope(|s| {
+                $(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
+                s.spawn(move |_| block.into_inner()());)*
+                (|| $fblock)();
+            });
+        };
+        ($fblock:block, $($blocks:block),*) => {
+            if rustc_data_structures::sync::is_dyn_thread_safe() {
+                // Reverse the order of the later blocks since Rayon executes them in reverse order
+                // when using a single thread. This ensures the execution order matches that
+                // of a single threaded rustc.
+                parallel!(impl $fblock [] [$($blocks),*]);
+            } else {
+                $crate::sync::parallel_guard(|guard| {
+                    guard.run(|| $fblock);
+                    $(guard.run(|| $blocks);)*
+                });
+            }
+        };
+    }
+
+    // This function only works when `mode::is_dyn_thread_safe()`.
+    pub fn scope<'scope, OP, R>(op: OP) -> R
+    where
+        OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
+        R: DynSend,
+    {
+        let op = FromDyn::from(op);
+        rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
+    }
+
+    #[inline]
+    pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
+    where
+        A: FnOnce() -> RA + DynSend,
+        B: FnOnce() -> RB + DynSend,
+    {
+        if mode::is_dyn_thread_safe() {
+            let oper_a = FromDyn::from(oper_a);
+            let oper_b = FromDyn::from(oper_b);
+            let (a, b) = rayon::join(
+                move || FromDyn::from(oper_a.into_inner()()),
+                move || FromDyn::from(oper_b.into_inner()()),
+            );
+            (a.into_inner(), b.into_inner())
+        } else {
+            super::disabled::join(oper_a, oper_b)
+        }
+    }
+
+    use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
+
+    pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
+        t: T,
+        for_each: impl Fn(I) + DynSync + DynSend,
+    ) {
+        parallel_guard(|guard| {
+            if mode::is_dyn_thread_safe() {
+                let for_each = FromDyn::from(for_each);
+                t.into_par_iter().for_each(|i| {
+                    guard.run(|| for_each(i));
+                });
+            } else {
+                t.into_iter().for_each(|i| {
+                    guard.run(|| for_each(i));
+                });
+            }
+        });
+    }
+
+    pub fn par_map<
+        I,
+        T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
+        R: std::marker::Send,
+        C: FromIterator<R> + FromParallelIterator<R>,
+    >(
+        t: T,
+        map: impl Fn(I) -> R + DynSync + DynSend,
+    ) -> C {
+        parallel_guard(|guard| {
+            if mode::is_dyn_thread_safe() {
+                let map = FromDyn::from(map);
+                t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
+            } else {
+                t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
+            }
+        })
+    }
+}