about summary refs log tree commit diff
path: root/compiler/rustc_data_structures/src
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2023-05-13 13:47:53 +0000
committerbors <bors@rust-lang.org>2023-05-13 13:47:53 +0000
commitdd8ec9c88d4d87986cbf2083c398ab6c52dc3f80 (patch)
tree22ad92e8e8006bb13148563bd267e6a09b552a88 /compiler/rustc_data_structures/src
parentebf2b375e16b3b1422d48892d235d83a1b451802 (diff)
parentd7e3e5bede187d113fa01c4d8b8c16a2bd4f721c (diff)
downloadrust-dd8ec9c88d4d87986cbf2083c398ab6c52dc3f80.tar.gz
rust-dd8ec9c88d4d87986cbf2083c398ab6c52dc3f80.zip
Auto merge of #107586 - SparrowLii:parallel-query, r=cjgillot
Introduce `DynSend` and `DynSync` auto trait for parallel compiler

part of parallel-rustc #101566

This PR introduces `DynSend / DynSync` trait and `FromDyn / IntoDyn` structure in rustc_data_structure::marker. `FromDyn` can dynamically check data structures for thread safety when switching to parallel environments (such as calling `par_for_each_in`). This happens only when `-Z threads > 1` so it doesn't affect single-threaded mode's compile efficiency.

r? `@cjgillot`
Diffstat (limited to 'compiler/rustc_data_structures/src')
-rw-r--r--compiler/rustc_data_structures/src/lib.rs2
-rw-r--r--compiler/rustc_data_structures/src/marker.rs257
-rw-r--r--compiler/rustc_data_structures/src/owned_slice/tests.rs4
-rw-r--r--compiler/rustc_data_structures/src/sync.rs238
4 files changed, 464 insertions, 37 deletions
diff --git a/compiler/rustc_data_structures/src/lib.rs b/compiler/rustc_data_structures/src/lib.rs
index 004017ec5f3..5b9b0e106d2 100644
--- a/compiler/rustc_data_structures/src/lib.rs
+++ b/compiler/rustc_data_structures/src/lib.rs
@@ -26,6 +26,7 @@
 #![feature(test)]
 #![feature(thread_id_value)]
 #![feature(vec_into_raw_parts)]
+#![feature(allocator_api)]
 #![feature(get_mut_unchecked)]
 #![feature(lint_reasons)]
 #![feature(unwrap_infallible)]
@@ -77,6 +78,7 @@ pub mod sorted_map;
 pub mod stable_hasher;
 mod atomic_ref;
 pub mod fingerprint;
+pub mod marker;
 pub mod profiling;
 pub mod sharded;
 pub mod stack;
diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs
new file mode 100644
index 00000000000..f8c06f9a814
--- /dev/null
+++ b/compiler/rustc_data_structures/src/marker.rs
@@ -0,0 +1,257 @@
+cfg_if!(
+    if #[cfg(not(parallel_compiler))] {
+        pub auto trait DynSend {}
+        pub auto trait DynSync {}
+
+        impl<T> DynSend for T {}
+        impl<T> DynSync for T {}
+    } else {
+        #[rustc_on_unimplemented(
+            message = "`{Self}` doesn't implement `DynSend`. \
+            Add it to `rustc_data_structures::marker` or use `IntoDynSyncSend` if it's already `Send`"
+        )]
+        // This is an auto trait for types which can be sent across threads if `sync::is_dyn_thread_safe()`
+        // is true. These types can be wrapped in a `FromDyn` to get a `Send` type. Wrapping a
+        // `Send` type in `IntoDynSyncSend` will create a `DynSend` type.
+        pub unsafe auto trait DynSend {}
+
+        #[rustc_on_unimplemented(
+            message = "`{Self}` doesn't implement `DynSync`. \
+            Add it to `rustc_data_structures::marker` or use `IntoDynSyncSend` if it's already `Sync`"
+        )]
+        // This is an auto trait for types which can be shared across threads if `sync::is_dyn_thread_safe()`
+        // is true. These types can be wrapped in a `FromDyn` to get a `Sync` type. Wrapping a
+        // `Sync` type in `IntoDynSyncSend` will create a `DynSync` type.
+        pub unsafe auto trait DynSync {}
+
+        // Same with `Sync` and `Send`.
+        unsafe impl<T: DynSync + ?Sized> DynSend for &T {}
+
+        macro_rules! impls_dyn_send_neg {
+            ($([$t1: ty $(where $($generics1: tt)*)?])*) => {
+                $(impl$(<$($generics1)*>)? !DynSend for $t1 {})*
+            };
+        }
+
+        // Consistent with `std`
+        impls_dyn_send_neg!(
+            [std::env::Args]
+            [std::env::ArgsOs]
+            [*const T where T: ?Sized]
+            [*mut T where T: ?Sized]
+            [std::ptr::NonNull<T> where T: ?Sized]
+            [std::rc::Rc<T> where T: ?Sized]
+            [std::rc::Weak<T> where T: ?Sized]
+            [std::sync::MutexGuard<'_, T> where T: ?Sized]
+            [std::sync::RwLockReadGuard<'_, T> where T: ?Sized]
+            [std::sync::RwLockWriteGuard<'_, T> where T: ?Sized]
+            [std::io::StdoutLock<'_>]
+            [std::io::StderrLock<'_>]
+        );
+        cfg_if!(
+            // Consistent with `std`
+            // `os_imp::Env` is `!Send` in these platforms
+            if #[cfg(any(unix, target_os = "hermit", target_os = "wasi", target_os = "solid_asp3"))] {
+                impl !DynSend for std::env::VarsOs {}
+            }
+        );
+
+        macro_rules! already_send {
+            ($([$ty: ty])*) => {
+                $(unsafe impl DynSend for $ty where $ty: Send {})*
+            };
+        }
+
+        // These structures are already `Send`.
+        already_send!(
+            [std::backtrace::Backtrace]
+            [std::io::Stdout]
+            [std::io::Stderr]
+            [std::io::Error]
+            [std::fs::File]
+            [rustc_arena::DroplessArena]
+            [crate::memmap::Mmap]
+            [crate::profiling::SelfProfiler]
+            [crate::owned_slice::OwnedSlice]
+        );
+
+        macro_rules! impl_dyn_send {
+            ($($($attr: meta)* [$ty: ty where $($generics2: tt)*])*) => {
+                $(unsafe impl<$($generics2)*> DynSend for $ty {})*
+            };
+        }
+
+        impl_dyn_send!(
+            [std::sync::atomic::AtomicPtr<T> where T]
+            [std::sync::Mutex<T> where T: ?Sized+ DynSend]
+            [std::sync::mpsc::Sender<T> where T: DynSend]
+            [std::sync::Arc<T> where T: ?Sized + DynSync + DynSend]
+            [std::sync::LazyLock<T, F> where T: DynSend, F: DynSend]
+            [std::collections::HashSet<K, S> where K: DynSend, S: DynSend]
+            [std::collections::HashMap<K, V, S> where K: DynSend, V: DynSend, S: DynSend]
+            [std::collections::BTreeMap<K, V, A> where K: DynSend, V: DynSend, A: std::alloc::Allocator + Clone + DynSend]
+            [Vec<T, A> where T: DynSend, A: std::alloc::Allocator + DynSend]
+            [Box<T, A> where T: ?Sized + DynSend, A: std::alloc::Allocator + DynSend]
+            [crate::sync::Lock<T> where T: DynSend]
+            [crate::sync::RwLock<T> where T: DynSend]
+            [crate::tagged_ptr::CopyTaggedPtr<P, T, CP> where P: Send + crate::tagged_ptr::Pointer, T: Send + crate::tagged_ptr::Tag, const CP: bool]
+            [rustc_arena::TypedArena<T> where T: DynSend]
+            [indexmap::IndexSet<V, S> where V: DynSend, S: DynSend]
+            [indexmap::IndexMap<K, V, S> where K: DynSend, V: DynSend, S: DynSend]
+            [thin_vec::ThinVec<T> where T: DynSend]
+            [smallvec::SmallVec<A> where A: smallvec::Array + DynSend]
+        );
+
+        macro_rules! impls_dyn_sync_neg {
+            ($([$t1: ty $(where $($generics1: tt)*)?])*) => {
+                $(impl$(<$($generics1)*>)? !DynSync for $t1 {})*
+            };
+        }
+
+        // Consistent with `std`
+        impls_dyn_sync_neg!(
+            [std::env::Args]
+            [std::env::ArgsOs]
+            [*const T where T: ?Sized]
+            [*mut T where T: ?Sized]
+            [std::cell::Cell<T> where T: ?Sized]
+            [std::cell::RefCell<T> where T: ?Sized]
+            [std::cell::UnsafeCell<T> where T: ?Sized]
+            [std::ptr::NonNull<T> where T: ?Sized]
+            [std::rc::Rc<T> where T: ?Sized]
+            [std::rc::Weak<T> where T: ?Sized]
+            [std::cell::OnceCell<T> where T]
+            [std::sync::mpsc::Receiver<T> where T]
+            [std::sync::mpsc::Sender<T> where T]
+        );
+        cfg_if!(
+            // Consistent with `std`
+            // `os_imp::Env` is `!Sync` in these platforms
+            if #[cfg(any(unix, target_os = "hermit", target_os = "wasi", target_os = "solid_asp3"))] {
+                impl !DynSync for std::env::VarsOs {}
+            }
+        );
+
+        macro_rules! already_sync {
+            ($([$ty: ty])*) => {
+                $(unsafe impl DynSync for $ty where $ty: Sync {})*
+            };
+        }
+
+        // These structures are already `Sync`.
+        already_sync!(
+            [std::sync::atomic::AtomicBool]
+            [std::sync::atomic::AtomicUsize]
+            [std::sync::atomic::AtomicU8]
+            [std::sync::atomic::AtomicU32]
+            [std::sync::atomic::AtomicU64]
+            [std::backtrace::Backtrace]
+            [std::io::Error]
+            [std::fs::File]
+            [jobserver_crate::Client]
+            [crate::memmap::Mmap]
+            [crate::profiling::SelfProfiler]
+            [crate::owned_slice::OwnedSlice]
+        );
+
+        macro_rules! impl_dyn_sync {
+            ($($($attr: meta)* [$ty: ty where $($generics2: tt)*])*) => {
+                $(unsafe impl<$($generics2)*> DynSync for $ty {})*
+            };
+        }
+
+        impl_dyn_sync!(
+            [std::sync::atomic::AtomicPtr<T> where T]
+            [std::sync::OnceLock<T> where T: DynSend + DynSync]
+            [std::sync::Mutex<T> where T: ?Sized + DynSend]
+            [std::sync::Arc<T> where T: ?Sized + DynSync + DynSend]
+            [std::sync::LazyLock<T, F> where T: DynSend + DynSync, F: DynSend]
+            [std::collections::HashSet<K, S> where K: DynSync, S: DynSync]
+            [std::collections::HashMap<K, V, S> where K: DynSync, V: DynSync, S: DynSync]
+            [std::collections::BTreeMap<K, V, A> where K: DynSync, V: DynSync, A: std::alloc::Allocator + Clone + DynSync]
+            [Vec<T, A> where T: DynSync, A: std::alloc::Allocator + DynSync]
+            [Box<T, A> where T: ?Sized + DynSync, A: std::alloc::Allocator + DynSync]
+            [crate::sync::Lock<T> where T: DynSend]
+            [crate::sync::RwLock<T> where T: DynSend + DynSync]
+            [crate::sync::OneThread<T> where T]
+            [crate::sync::WorkerLocal<T> where T: DynSend]
+            [crate::intern::Interned<'a, T> where 'a, T: DynSync]
+            [crate::tagged_ptr::CopyTaggedPtr<P, T, CP> where P: Sync + crate::tagged_ptr::Pointer, T: Sync + crate::tagged_ptr::Tag, const CP: bool]
+            [parking_lot::lock_api::Mutex<R, T> where R: DynSync, T: ?Sized + DynSend]
+            [parking_lot::lock_api::RwLock<R, T> where R: DynSync, T: ?Sized + DynSend + DynSync]
+            [indexmap::IndexSet<V, S> where V: DynSync, S: DynSync]
+            [indexmap::IndexMap<K, V, S> where K: DynSync, V: DynSync, S: DynSync]
+            [smallvec::SmallVec<A> where A: smallvec::Array + DynSync]
+            [thin_vec::ThinVec<T> where T: DynSync]
+        );
+    }
+);
+
+pub fn assert_dyn_sync<T: ?Sized + DynSync>() {}
+pub fn assert_dyn_send<T: ?Sized + DynSend>() {}
+pub fn assert_dyn_send_val<T: ?Sized + DynSend>(_t: &T) {}
+pub fn assert_dyn_send_sync_val<T: ?Sized + DynSync + DynSend>(_t: &T) {}
+
+#[derive(Copy, Clone)]
+pub struct FromDyn<T>(T);
+
+impl<T> FromDyn<T> {
+    #[inline(always)]
+    pub fn from(val: T) -> Self {
+        // Check that `sync::is_dyn_thread_safe()` is true on creation so we can
+        // implement `Send` and `Sync` for this structure when `T`
+        // implements `DynSend` and `DynSync` respectively.
+        #[cfg(parallel_compiler)]
+        assert!(crate::sync::is_dyn_thread_safe());
+        FromDyn(val)
+    }
+
+    #[inline(always)]
+    pub fn into_inner(self) -> T {
+        self.0
+    }
+}
+
+// `FromDyn` is `Send` if `T` is `DynSend`, since it ensures that sync::is_dyn_thread_safe() is true.
+#[cfg(parallel_compiler)]
+unsafe impl<T: DynSend> Send for FromDyn<T> {}
+
+// `FromDyn` is `Sync` if `T` is `DynSync`, since it ensures that sync::is_dyn_thread_safe() is true.
+#[cfg(parallel_compiler)]
+unsafe impl<T: DynSync> Sync for FromDyn<T> {}
+
+impl<T> std::ops::Deref for FromDyn<T> {
+    type Target = T;
+
+    #[inline(always)]
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+// A wrapper to convert a struct that is already a `Send` or `Sync` into
+// an instance of `DynSend` and `DynSync`, since the compiler cannot infer
+// it automatically in some cases. (e.g. Box<dyn Send / Sync>)
+#[derive(Copy, Clone)]
+pub struct IntoDynSyncSend<T: ?Sized>(pub T);
+
+#[cfg(parallel_compiler)]
+unsafe impl<T: ?Sized + Send> DynSend for IntoDynSyncSend<T> {}
+#[cfg(parallel_compiler)]
+unsafe impl<T: ?Sized + Sync> DynSync for IntoDynSyncSend<T> {}
+
+impl<T> std::ops::Deref for IntoDynSyncSend<T> {
+    type Target = T;
+
+    #[inline(always)]
+    fn deref(&self) -> &T {
+        &self.0
+    }
+}
+
+impl<T> std::ops::DerefMut for IntoDynSyncSend<T> {
+    #[inline(always)]
+    fn deref_mut(&mut self) -> &mut T {
+        &mut self.0
+    }
+}
diff --git a/compiler/rustc_data_structures/src/owned_slice/tests.rs b/compiler/rustc_data_structures/src/owned_slice/tests.rs
index e715fb55362..e151b8c2de0 100644
--- a/compiler/rustc_data_structures/src/owned_slice/tests.rs
+++ b/compiler/rustc_data_structures/src/owned_slice/tests.rs
@@ -69,6 +69,6 @@ fn drop_drops() {
 
 #[test]
 fn send_sync() {
-    crate::sync::assert_send::<OwnedSlice>();
-    crate::sync::assert_sync::<OwnedSlice>();
+    crate::sync::assert_dyn_send::<OwnedSlice>();
+    crate::sync::assert_dyn_sync::<OwnedSlice>();
 }
diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs
index e73ca56efa0..8a778866a77 100644
--- a/compiler/rustc_data_structures/src/sync.rs
+++ b/compiler/rustc_data_structures/src/sync.rs
@@ -39,6 +39,7 @@
 //!
 //! [^2] `MTLockRef` is a typedef.
 
+pub use crate::marker::*;
 use crate::owned_slice::OwnedSlice;
 use std::collections::HashMap;
 use std::hash::{BuildHasher, Hash};
@@ -55,6 +56,42 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec};
 
 mod vec;
 
+mod mode {
+    use super::Ordering;
+    use std::sync::atomic::AtomicU8;
+
+    const UNINITIALIZED: u8 = 0;
+    const DYN_NOT_THREAD_SAFE: u8 = 1;
+    const DYN_THREAD_SAFE: u8 = 2;
+
+    static DYN_THREAD_SAFE_MODE: AtomicU8 = AtomicU8::new(UNINITIALIZED);
+
+    // Whether thread safety is enabled (due to running under multiple threads).
+    #[inline]
+    pub fn is_dyn_thread_safe() -> bool {
+        match DYN_THREAD_SAFE_MODE.load(Ordering::Relaxed) {
+            DYN_NOT_THREAD_SAFE => false,
+            DYN_THREAD_SAFE => true,
+            _ => panic!("uninitialized dyn_thread_safe mode!"),
+        }
+    }
+
+    // Only set by the `-Z threads` compile option
+    pub fn set_dyn_thread_safe_mode(mode: bool) {
+        let set: u8 = if mode { DYN_THREAD_SAFE } else { DYN_NOT_THREAD_SAFE };
+        let previous = DYN_THREAD_SAFE_MODE.compare_exchange(
+            UNINITIALIZED,
+            set,
+            Ordering::Relaxed,
+            Ordering::Relaxed,
+        );
+
+        // Check that the mode was either uninitialized or was already set to the requested mode.
+        assert!(previous.is_ok() || previous == Err(set));
+    }
+}
+
+pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
 cfg_if! {
     if #[cfg(not(parallel_compiler))] {
         pub unsafe auto trait Send {}
@@ -149,7 +186,7 @@ cfg_if! {
 
         #[macro_export]
         macro_rules! parallel {
-            ($($blocks:tt),*) => {
+            ($($blocks:block),*) => {
                 // We catch panics here ensuring that all the blocks execute.
                 // This makes behavior consistent with the parallel compiler.
                 let mut panic = None;
@@ -168,12 +205,6 @@ cfg_if! {
             }
         }
 
-        pub use Iterator as ParallelIterator;
-
-        pub fn par_iter<T: IntoIterator>(t: T) -> T::IntoIter {
-            t.into_iter()
-        }
-
         pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
             // We catch panics here ensuring that all the loop iterations execute.
             // This makes behavior consistent with the parallel compiler.
@@ -190,6 +221,29 @@ cfg_if! {
             }
         }
 
+        pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
+            t: T,
+            mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
+        ) -> C {
+            // We catch panics here ensuring that all the loop iterations execute.
+            let mut panic = None;
+            let r = t.into_iter().filter_map(|i| {
+                match catch_unwind(AssertUnwindSafe(|| map(i))) {
+                    Ok(r) => Some(r),
+                    Err(p) => {
+                        if panic.is_none() {
+                            panic = Some(p);
+                        }
+                        None
+                    }
+                }
+            }).collect();
+            if let Some(panic) = panic {
+                resume_unwind(panic);
+            }
+            r
+        }
+
         pub type MetadataRef = OwnedSlice;
 
         pub use std::rc::Rc as Lrc;
@@ -302,46 +356,165 @@ cfg_if! {
         use parking_lot::RwLock as InnerRwLock;
 
         use std::thread;
-        pub use rayon::{join, scope};
+
+        #[inline]
+        pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
+        where
+            A: FnOnce() -> RA + DynSend,
+            B: FnOnce() -> RB + DynSend,
+        {
+            if mode::is_dyn_thread_safe() {
+                let oper_a = FromDyn::from(oper_a);
+                let oper_b = FromDyn::from(oper_b);
+                let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
+                (a.into_inner(), b.into_inner())
+            } else {
+                (oper_a(), oper_b())
+            }
+        }
+
+        // This function only works when `mode::is_dyn_thread_safe()`.
+        pub fn scope<'scope, OP, R>(op: OP) -> R
+        where
+            OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
+            R: DynSend,
+        {
+            let op = FromDyn::from(op);
+            rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
+        }
 
         /// Runs a list of blocks in parallel. The first block is executed immediately on
         /// the current thread. Use that for the longest running block.
         #[macro_export]
         macro_rules! parallel {
-            (impl $fblock:tt [$($c:tt,)*] [$block:tt $(, $rest:tt)*]) => {
+            (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
                 parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
             };
-            (impl $fblock:tt [$($blocks:tt,)*] []) => {
+            (impl $fblock:block [$($blocks:expr,)*] []) => {
                 ::rustc_data_structures::sync::scope(|s| {
+                    $(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
+                    s.spawn(move |_| block.into_inner()());)*
+                    (|| $fblock)();
+                });
+            };
+            ($fblock:block, $($blocks:block),*) => {
+                if rustc_data_structures::sync::is_dyn_thread_safe() {
+                    // Reverse the order of the later blocks since Rayon executes them in reverse order
+                    // when using a single thread. This ensures the execution order matches that
+                    // of a single threaded rustc.
+                    parallel!(impl $fblock [] [$($blocks),*]);
+                } else {
+                    // We catch panics here ensuring that all the blocks execute.
+                    // This makes behavior consistent with the parallel compiler.
+                    let mut panic = None;
+                    if let Err(p) = ::std::panic::catch_unwind(
+                        ::std::panic::AssertUnwindSafe(|| $fblock)
+                    ) {
+                        if panic.is_none() {
+                            panic = Some(p);
+                        }
+                    }
                     $(
-                        s.spawn(|_| $blocks);
+                        if let Err(p) = ::std::panic::catch_unwind(
+                            ::std::panic::AssertUnwindSafe(|| $blocks)
+                        ) {
+                            if panic.is_none() {
+                                panic = Some(p);
+                            }
+                        }
                     )*
-                    $fblock;
-                })
-            };
-            ($fblock:tt, $($blocks:tt),*) => {
-                // Reverse the order of the later blocks since Rayon executes them in reverse order
-                // when using a single thread. This ensures the execution order matches that
-                // of a single threaded rustc
-                parallel!(impl $fblock [] [$($blocks),*]);
+                    if let Some(panic) = panic {
+                        ::std::panic::resume_unwind(panic);
+                    }
+                }
             };
         }
 
-        pub use rayon::iter::ParallelIterator;
-        use rayon::iter::IntoParallelIterator;
+        use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
+
+        pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
+            t: T,
+            for_each: impl Fn(I) + DynSync + DynSend
+        ) {
+            if mode::is_dyn_thread_safe() {
+                let for_each = FromDyn::from(for_each);
+                let panic: Lock<Option<_>> = Lock::new(None);
+                t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
+                    let mut l = panic.lock();
+                    if l.is_none() {
+                        *l = Some(p)
+                    }
+                });
 
-        pub fn par_iter<T: IntoParallelIterator>(t: T) -> T::Iter {
-            t.into_par_iter()
+                if let Some(panic) = panic.into_inner() {
+                    resume_unwind(panic);
+                }
+            } else {
+                // We catch panics here ensuring that all the loop iterations execute.
+                // This makes behavior consistent with the parallel compiler.
+                let mut panic = None;
+                t.into_iter().for_each(|i| {
+                    if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
+                        if panic.is_none() {
+                            panic = Some(p);
+                        }
+                    }
+                });
+                if let Some(panic) = panic {
+                    resume_unwind(panic);
+                }
+            }
         }
 
-        pub fn par_for_each_in<T: IntoParallelIterator>(
+        pub fn par_map<
+            I,
+            T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
+            R: std::marker::Send,
+            C: FromIterator<R> + FromParallelIterator<R>
+        >(
             t: T,
-            for_each: impl Fn(T::Item) + Sync + Send,
-        ) {
-            let ps: Vec<_> = t.into_par_iter().map(|i| catch_unwind(AssertUnwindSafe(|| for_each(i)))).collect();
-            ps.into_iter().for_each(|p| if let Err(panic) = p {
-                resume_unwind(panic)
-            });
+            map: impl Fn(I) -> R + DynSync + DynSend
+        ) -> C {
+            if mode::is_dyn_thread_safe() {
+                let panic: Lock<Option<_>> = Lock::new(None);
+                let map = FromDyn::from(map);
+                // We catch panics here ensuring that all the loop iterations execute.
+                let r = t.into_par_iter().filter_map(|i| {
+                    match catch_unwind(AssertUnwindSafe(|| map(i))) {
+                        Ok(r) => Some(r),
+                        Err(p) => {
+                            let mut l = panic.lock();
+                            if l.is_none() {
+                                *l = Some(p);
+                            }
+                            None
+                        },
+                    }
+                }).collect();
+
+                if let Some(panic) = panic.into_inner() {
+                    resume_unwind(panic);
+                }
+                r
+            } else {
+                // We catch panics here ensuring that all the loop iterations execute.
+                let mut panic = None;
+                let r = t.into_iter().filter_map(|i| {
+                    match catch_unwind(AssertUnwindSafe(|| map(i))) {
+                        Ok(r) => Some(r),
+                        Err(p) => {
+                            if panic.is_none() {
+                                panic = Some(p);
+                            }
+                            None
+                        }
+                    }
+                }).collect();
+                if let Some(panic) = panic {
+                    resume_unwind(panic);
+                }
+                r
+            }
         }
 
         pub type MetadataRef = OwnedSlice;
@@ -352,11 +525,6 @@ cfg_if! {
     }
 }
 
-pub fn assert_sync<T: ?Sized + Sync>() {}
-pub fn assert_send<T: ?Sized + Send>() {}
-pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
-pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
-
 #[derive(Default)]
 #[cfg_attr(parallel_compiler, repr(align(64)))]
 pub struct CacheAligned<T>(pub T);