From f458b112f88e1dbcd5072733c79e25328f9f24f9 Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Thu, 17 Aug 2023 11:07:50 +0200 Subject: Optimize `lock_shards` --- compiler/rustc_data_structures/src/sharded.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/sharded.rs b/compiler/rustc_data_structures/src/sharded.rs index 52ab5a7fb14..0f769c1f3bf 100644 --- a/compiler/rustc_data_structures/src/sharded.rs +++ b/compiler/rustc_data_structures/src/sharded.rs @@ -2,9 +2,12 @@ use crate::fx::{FxHashMap, FxHasher}; #[cfg(parallel_compiler)] use crate::sync::{is_dyn_thread_safe, CacheAligned}; use crate::sync::{Lock, LockGuard}; +#[cfg(parallel_compiler)] +use itertools::Either; use std::borrow::Borrow; use std::collections::hash_map::RawEntryMut; use std::hash::{Hash, Hasher}; +use std::iter; use std::mem; // 32 shards is sufficient to reduce contention on an 8-core Ryzen 7 1700, @@ -70,19 +73,27 @@ impl Sharded { } } - pub fn lock_shards(&self) -> Vec> { + #[inline] + pub fn lock_shards(&self) -> impl Iterator> { match self { - Self::Single(single) => vec![single.lock()], + #[cfg(not(parallel_compiler))] + Self::Single(single) => iter::once(single.lock()), + #[cfg(parallel_compiler)] + Self::Single(single) => Either::Left(iter::once(single.lock())), #[cfg(parallel_compiler)] - Self::Shards(shards) => shards.iter().map(|shard| shard.0.lock()).collect(), + Self::Shards(shards) => Either::Right(shards.iter().map(|shard| shard.0.lock())), } } - pub fn try_lock_shards(&self) -> Option>> { + #[inline] + pub fn try_lock_shards(&self) -> impl Iterator>> { match self { - Self::Single(single) => Some(vec![single.try_lock()?]), + #[cfg(not(parallel_compiler))] + Self::Single(single) => iter::once(single.try_lock()), + #[cfg(parallel_compiler)] + Self::Single(single) => Either::Left(iter::once(single.try_lock())), #[cfg(parallel_compiler)] - Self::Shards(shards) => shards.iter().map(|shard| shard.0.try_lock()).collect(), + Self::Shards(shards) => Either::Right(shards.iter().map(|shard| shard.0.try_lock())), } } } @@ -101,7 +112,7 @@ pub type ShardedHashMap = Sharded>; impl ShardedHashMap { pub fn len(&self) -> usize { - self.lock_shards().iter().map(|shard| shard.len()).sum() + self.lock_shards().map(|shard| shard.len()).sum() } } -- cgit 1.4.1-3-g733a5 From 5739349e96e2a34dc9dd18e58589b1b3afcaa271 Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Sat, 31 Oct 2020 03:14:32 +0100 Subject: Use conditional synchronization for Lock --- compiler/rustc_data_structures/src/marker.rs | 2 - compiler/rustc_data_structures/src/sync.rs | 105 ++------ compiler/rustc_data_structures/src/sync/lock.rs | 276 +++++++++++++++++++++ .../rustc_data_structures/src/sync/worker_local.rs | 6 +- compiler/rustc_errors/src/emitter.rs | 8 +- compiler/rustc_errors/src/json.rs | 8 +- compiler/rustc_errors/src/lib.rs | 10 +- compiler/rustc_interface/src/util.rs | 36 ++- compiler/rustc_session/src/session.rs | 10 +- src/librustdoc/core.rs | 6 +- src/tools/rustfmt/src/parse/session.rs | 14 +- 11 files changed, 355 insertions(+), 126 deletions(-) create mode 100644 compiler/rustc_data_structures/src/sync/lock.rs (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index f8c06f9a814..b067f9d4502 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -92,7 +92,6 @@ cfg_if!( [std::collections::BTreeMap where K: DynSend, V: DynSend, A: std::alloc::Allocator + Clone + DynSend] [Vec where T: DynSend, A: std::alloc::Allocator + DynSend] [Box where T: ?Sized + DynSend, A: std::alloc::Allocator + DynSend] - [crate::sync::Lock where T: DynSend] [crate::sync::RwLock where T: DynSend] [crate::tagged_ptr::CopyTaggedPtr where P: Send + crate::tagged_ptr::Pointer, T: Send + crate::tagged_ptr::Tag, const CP: bool] [rustc_arena::TypedArena where T: DynSend] @@ -171,7 +170,6 @@ cfg_if!( [std::collections::BTreeMap where K: DynSync, V: DynSync, A: std::alloc::Allocator + Clone + DynSync] [Vec where T: DynSync, A: std::alloc::Allocator + DynSync] [Box where T: ?Sized + DynSync, A: std::alloc::Allocator + DynSync] - [crate::sync::Lock where T: DynSend] [crate::sync::RwLock where T: DynSend + DynSync] [crate::sync::OneThread where T] [crate::sync::WorkerLocal where T: DynSend] diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 25a08237346..083aa6cf697 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -26,7 +26,8 @@ //! | `AtomicU64` | `Cell` | `atomic::AtomicU64` | //! | `AtomicUsize` | `Cell` | `atomic::AtomicUsize` | //! | | | | -//! | `Lock` | `RefCell` | `parking_lot::Mutex` | +//! | `Lock` | `RefCell` | `RefCell` or | +//! | | | `parking_lot::Mutex` | //! | `RwLock` | `RefCell` | `parking_lot::RwLock` | //! | `MTLock` [^1] | `T` | `Lock` | //! | `MTLockRef<'a, T>` [^2] | `&'a mut MTLock` | `&'a MTLock` | @@ -45,6 +46,9 @@ use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut}; use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; +mod lock; +pub use lock::{Lock, LockGuard}; + mod worker_local; pub use worker_local::{Registry, WorkerLocal}; @@ -75,6 +79,13 @@ mod mode { } } + // Whether thread safety might be enabled. + #[inline] + #[cfg(parallel_compiler)] + pub fn might_be_dyn_thread_safe() -> bool { + DYN_THREAD_SAFE_MODE.load(Ordering::Relaxed) != DYN_NOT_THREAD_SAFE + } + // Only set by the `-Z threads` compile option pub fn set_dyn_thread_safe_mode(mode: bool) { let set: u8 = if mode { DYN_THREAD_SAFE } else { DYN_NOT_THREAD_SAFE }; @@ -94,14 +105,15 @@ pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; cfg_if! { if #[cfg(not(parallel_compiler))] { + use std::ops::Add; + use std::cell::Cell; + pub unsafe auto trait Send {} pub unsafe auto trait Sync {} unsafe impl Send for T {} unsafe impl Sync for T {} - use std::ops::Add; - /// This is a single threaded variant of `AtomicU64`, `AtomicUsize`, etc. /// It has explicit ordering arguments and is only intended for use with /// the native atomic types. @@ -255,15 +267,11 @@ cfg_if! { pub use std::cell::Ref as MappedReadGuard; pub use std::cell::RefMut as WriteGuard; pub use std::cell::RefMut as MappedWriteGuard; - pub use std::cell::RefMut as LockGuard; pub use std::cell::RefMut as MappedLockGuard; pub use std::cell::OnceCell; use std::cell::RefCell as InnerRwLock; - use std::cell::RefCell as InnerLock; - - use std::cell::Cell; pub type MTLockRef<'a, T> = &'a mut MTLock; @@ -305,6 +313,8 @@ cfg_if! { } } } else { + use parking_lot::Mutex; + pub use std::marker::Send as Send; pub use std::marker::Sync as Sync; @@ -313,7 +323,6 @@ cfg_if! { pub use parking_lot::RwLockWriteGuard as WriteGuard; pub use parking_lot::MappedRwLockWriteGuard as MappedWriteGuard; - pub use parking_lot::MutexGuard as LockGuard; pub use parking_lot::MappedMutexGuard as MappedLockGuard; pub use std::sync::OnceLock as OnceCell; @@ -355,7 +364,6 @@ cfg_if! { } } - use parking_lot::Mutex as InnerLock; use parking_lot::RwLock as InnerRwLock; use std::thread; @@ -441,7 +449,7 @@ cfg_if! { ) { if mode::is_dyn_thread_safe() { let for_each = FromDyn::from(for_each); - let panic: Lock> = Lock::new(None); + let panic: Mutex> = Mutex::new(None); t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { let mut l = panic.lock(); if l.is_none() { @@ -479,7 +487,7 @@ cfg_if! { map: impl Fn(I) -> R + DynSync + DynSend ) -> C { if mode::is_dyn_thread_safe() { - let panic: Lock> = Lock::new(None); + let panic: Mutex> = Mutex::new(None); let map = FromDyn::from(map); // We catch panics here ensuring that all the loop iterations execute. let r = t.into_par_iter().filter_map(|i| { @@ -542,81 +550,6 @@ impl HashMapExt for HashMap } } -#[derive(Debug)] -pub struct Lock(InnerLock); - -impl Lock { - #[inline(always)] - pub fn new(inner: T) -> Self { - Lock(InnerLock::new(inner)) - } - - #[inline(always)] - pub fn into_inner(self) -> T { - self.0.into_inner() - } - - #[inline(always)] - pub fn get_mut(&mut self) -> &mut T { - self.0.get_mut() - } - - #[cfg(parallel_compiler)] - #[inline(always)] - pub fn try_lock(&self) -> Option> { - self.0.try_lock() - } - - #[cfg(not(parallel_compiler))] - #[inline(always)] - pub fn try_lock(&self) -> Option> { - self.0.try_borrow_mut().ok() - } - - #[cfg(parallel_compiler)] - #[inline(always)] - #[track_caller] - pub fn lock(&self) -> LockGuard<'_, T> { - if ERROR_CHECKING { - self.0.try_lock().expect("lock was already held") - } else { - self.0.lock() - } - } - - #[cfg(not(parallel_compiler))] - #[inline(always)] - #[track_caller] - pub fn lock(&self) -> LockGuard<'_, T> { - self.0.borrow_mut() - } - - #[inline(always)] - #[track_caller] - pub fn with_lock R, R>(&self, f: F) -> R { - f(&mut *self.lock()) - } - - #[inline(always)] - #[track_caller] - pub fn borrow(&self) -> LockGuard<'_, T> { - self.lock() - } - - #[inline(always)] - #[track_caller] - pub fn borrow_mut(&self) -> LockGuard<'_, T> { - self.lock() - } -} - -impl Default for Lock { - #[inline] - fn default() -> Self { - Lock::new(T::default()) - } -} - #[derive(Debug, Default)] pub struct RwLock(InnerRwLock); diff --git a/compiler/rustc_data_structures/src/sync/lock.rs b/compiler/rustc_data_structures/src/sync/lock.rs new file mode 100644 index 00000000000..62cd1b993de --- /dev/null +++ b/compiler/rustc_data_structures/src/sync/lock.rs @@ -0,0 +1,276 @@ +//! This module implements a lock which only uses synchronization if `might_be_dyn_thread_safe` is true. +//! It implements `DynSend` and `DynSync` instead of the typical `Send` and `Sync` traits. +//! +//! When `cfg(parallel_compiler)` is not set, the lock is instead a wrapper around `RefCell`. + +#[cfg(not(parallel_compiler))] +use std::cell::RefCell; +#[cfg(parallel_compiler)] +use { + crate::cold_path, + crate::sync::DynSend, + crate::sync::DynSync, + parking_lot::lock_api::RawMutex, + std::cell::Cell, + std::cell::UnsafeCell, + std::fmt, + std::intrinsics::{likely, unlikely}, + std::marker::PhantomData, + std::mem::ManuallyDrop, + std::ops::{Deref, DerefMut}, +}; + +#[cfg(not(parallel_compiler))] +pub use std::cell::RefMut as LockGuard; + +#[cfg(not(parallel_compiler))] +#[derive(Debug)] +pub struct Lock(RefCell); + +#[cfg(not(parallel_compiler))] +impl Lock { + #[inline(always)] + pub fn new(inner: T) -> Self { + Lock(RefCell::new(inner)) + } + + #[inline(always)] + pub fn into_inner(self) -> T { + self.0.into_inner() + } + + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + #[inline(always)] + pub fn try_lock(&self) -> Option> { + self.0.try_borrow_mut().ok() + } + + #[inline(always)] + #[track_caller] + pub fn lock(&self) -> LockGuard<'_, T> { + self.0.borrow_mut() + } +} + +/// A guard holding mutable access to a `Lock` which is in a locked state. +#[cfg(parallel_compiler)] +#[must_use = "if unused the Lock will immediately unlock"] +pub struct LockGuard<'a, T> { + lock: &'a Lock, + marker: PhantomData<&'a mut T>, +} + +#[cfg(parallel_compiler)] +impl<'a, T: 'a> Deref for LockGuard<'a, T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + // SAFETY: We have shared access to the mutable access owned by this type, + // so we can give out a shared reference. + unsafe { &*self.lock.data.get() } + } +} + +#[cfg(parallel_compiler)] +impl<'a, T: 'a> DerefMut for LockGuard<'a, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + // SAFETY: We have mutable access to the data so we can give out a mutable reference. + unsafe { &mut *self.lock.data.get() } + } +} + +#[cfg(parallel_compiler)] +impl<'a, T: 'a> Drop for LockGuard<'a, T> { + #[inline] + fn drop(&mut self) { + // SAFETY: We know that the lock is in a locked + // state because it is a invariant of this type. + unsafe { self.lock.raw.unlock() }; + } +} + +#[cfg(parallel_compiler)] +union LockRawUnion { + /// Indicates if the cell is locked. Only used if `LockRaw.sync` is false. + cell: ManuallyDrop>, + + /// A lock implementation that's only used if `LockRaw.sync` is true. + lock: ManuallyDrop, +} + +/// A raw lock which only uses synchronization if `might_be_dyn_thread_safe` is true. +/// It contains no associated data and is used in the implementation of `Lock` which does have such data. +/// +/// A manual implementation of a tagged union is used with the `sync` field and the `LockRawUnion` instead +/// of using enums as it results in better code generation. +#[cfg(parallel_compiler)] +struct LockRaw { + /// Indicates if synchronization is used via `opt.lock` if true, + /// or if a non-thread safe cell is used via `opt.cell`. This is set on initialization and never changed. + sync: bool, + opt: LockRawUnion, +} + +#[cfg(parallel_compiler)] +impl LockRaw { + fn new() -> Self { + if unlikely(super::mode::might_be_dyn_thread_safe()) { + // Create the lock with synchronization enabled using the `RawMutex` type. + LockRaw { + sync: true, + opt: LockRawUnion { lock: ManuallyDrop::new(parking_lot::RawMutex::INIT) }, + } + } else { + // Create the lock with synchronization disabled. + LockRaw { sync: false, opt: LockRawUnion { cell: ManuallyDrop::new(Cell::new(false)) } } + } + } + + #[inline(always)] + fn try_lock(&self) -> bool { + // SAFETY: This is safe since the union fields are used in accordance with `self.sync`. + unsafe { + if likely(!self.sync) { + if self.opt.cell.get() { + false + } else { + self.opt.cell.set(true); + true + } + } else { + self.opt.lock.try_lock() + } + } + } + + #[inline(always)] + fn lock(&self) { + if super::ERROR_CHECKING { + // We're in the debugging mode, so assert that the lock is not held so we + // get a panic instead of waiting for the lock. + assert_eq!(self.try_lock(), true, "lock must not be hold"); + } else { + // SAFETY: This is safe since the union fields are used in accordance with `self.sync`. + unsafe { + if likely(!self.sync) { + if unlikely(self.opt.cell.replace(true)) { + cold_path(|| panic!("lock was already held")) + } + } else { + self.opt.lock.lock(); + } + } + } + } + + /// This unlocks the lock. + /// + /// Safety + /// This method may only be called if the lock is currently held. + #[inline(always)] + unsafe fn unlock(&self) { + // SAFETY: The union use is safe since the union fields are used in accordance with + // `self.sync` and the `unlock` method precondition is upheld by the caller. + unsafe { + if likely(!self.sync) { + debug_assert_eq!(self.opt.cell.get(), true); + self.opt.cell.set(false); + } else { + self.opt.lock.unlock(); + } + } + } +} + +/// A lock which only uses synchronization if `might_be_dyn_thread_safe` is true. +/// It implements `DynSend` and `DynSync` instead of the typical `Send` and `Sync`. +#[cfg(parallel_compiler)] +pub struct Lock { + raw: LockRaw, + data: UnsafeCell, +} + +#[cfg(parallel_compiler)] +impl Lock { + #[inline(always)] + pub fn new(inner: T) -> Self { + Lock { raw: LockRaw::new(), data: UnsafeCell::new(inner) } + } + + #[inline(always)] + pub fn into_inner(self) -> T { + self.data.into_inner() + } + + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.data.get_mut() + } + + #[inline(always)] + pub fn try_lock(&self) -> Option> { + if self.raw.try_lock() { Some(LockGuard { lock: self, marker: PhantomData }) } else { None } + } + + #[inline(always)] + pub fn lock(&self) -> LockGuard<'_, T> { + self.raw.lock(); + LockGuard { lock: self, marker: PhantomData } + } +} + +impl Lock { + #[inline(always)] + #[track_caller] + pub fn with_lock R, R>(&self, f: F) -> R { + f(&mut *self.lock()) + } + + #[inline(always)] + #[track_caller] + pub fn borrow(&self) -> LockGuard<'_, T> { + self.lock() + } + + #[inline(always)] + #[track_caller] + pub fn borrow_mut(&self) -> LockGuard<'_, T> { + self.lock() + } +} + +#[cfg(parallel_compiler)] +unsafe impl DynSend for Lock {} +#[cfg(parallel_compiler)] +unsafe impl DynSync for Lock {} + +#[cfg(parallel_compiler)] +impl fmt::Debug for Lock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.try_lock() { + Some(guard) => f.debug_struct("Lock").field("data", &&*guard).finish(), + None => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + f.debug_struct("Lock").field("data", &LockedPlaceholder).finish() + } + } + } +} + +impl Default for Lock { + #[inline] + fn default() -> Self { + Lock::new(T::default()) + } +} diff --git a/compiler/rustc_data_structures/src/sync/worker_local.rs b/compiler/rustc_data_structures/src/sync/worker_local.rs index 27d687c3cfe..1f838cc4648 100644 --- a/compiler/rustc_data_structures/src/sync/worker_local.rs +++ b/compiler/rustc_data_structures/src/sync/worker_local.rs @@ -1,4 +1,4 @@ -use crate::sync::Lock; +use parking_lot::Mutex; use std::cell::Cell; use std::cell::OnceCell; use std::ops::Deref; @@ -35,7 +35,7 @@ impl RegistryId { struct RegistryData { thread_limit: usize, - threads: Lock, + threads: Mutex, } /// Represents a list of threads which can access worker locals. @@ -65,7 +65,7 @@ thread_local! { impl Registry { /// Creates a registry which can hold up to `thread_limit` threads. pub fn new(thread_limit: usize) -> Self { - Registry(Arc::new(RegistryData { thread_limit, threads: Lock::new(0) })) + Registry(Arc::new(RegistryData { thread_limit, threads: Mutex::new(0) })) } /// Gets the registry associated with the current thread. Panics if there's no such registry. diff --git a/compiler/rustc_errors/src/emitter.rs b/compiler/rustc_errors/src/emitter.rs index 0cae06881b1..1811738c011 100644 --- a/compiler/rustc_errors/src/emitter.rs +++ b/compiler/rustc_errors/src/emitter.rs @@ -24,7 +24,7 @@ use rustc_lint_defs::pluralize; use derive_setters::Setters; use rustc_data_structures::fx::{FxHashMap, FxIndexMap}; -use rustc_data_structures::sync::Lrc; +use rustc_data_structures::sync::{DynSend, IntoDynSyncSend, Lrc}; use rustc_error_messages::{FluentArgs, SpanLabel}; use rustc_span::hygiene::{ExpnKind, MacroKind}; use std::borrow::Cow; @@ -188,6 +188,8 @@ impl Margin { const ANONYMIZED_LINE_NUM: &str = "LL"; +pub type DynEmitter = dyn Emitter + DynSend; + /// Emitter trait for emitting errors. pub trait Emitter: Translate { /// Emit a structured diagnostic. @@ -625,7 +627,7 @@ impl ColorConfig { #[derive(Setters)] pub struct EmitterWriter { #[setters(skip)] - dst: Destination, + dst: IntoDynSyncSend, sm: Option>, fluent_bundle: Option>, #[setters(skip)] @@ -655,7 +657,7 @@ impl EmitterWriter { fn create(dst: Destination, fallback_bundle: LazyFallbackBundle) -> EmitterWriter { EmitterWriter { - dst, + dst: IntoDynSyncSend(dst), sm: None, fluent_bundle: None, fallback_bundle, diff --git a/compiler/rustc_errors/src/json.rs b/compiler/rustc_errors/src/json.rs index b8f58e3057c..390bf28df09 100644 --- a/compiler/rustc_errors/src/json.rs +++ b/compiler/rustc_errors/src/json.rs @@ -22,7 +22,7 @@ use crate::{ }; use rustc_lint_defs::Applicability; -use rustc_data_structures::sync::Lrc; +use rustc_data_structures::sync::{IntoDynSyncSend, Lrc}; use rustc_error_messages::FluentArgs; use rustc_span::hygiene::ExpnData; use rustc_span::Span; @@ -38,7 +38,7 @@ use serde::Serialize; mod tests; pub struct JsonEmitter { - dst: Box, + dst: IntoDynSyncSend>, registry: Option, sm: Lrc, fluent_bundle: Option>, @@ -66,7 +66,7 @@ impl JsonEmitter { terminal_url: TerminalUrl, ) -> JsonEmitter { JsonEmitter { - dst: Box::new(io::BufWriter::new(io::stderr())), + dst: IntoDynSyncSend(Box::new(io::BufWriter::new(io::stderr()))), registry, sm: source_map, fluent_bundle, @@ -120,7 +120,7 @@ impl JsonEmitter { terminal_url: TerminalUrl, ) -> JsonEmitter { JsonEmitter { - dst, + dst: IntoDynSyncSend(dst), registry, sm: source_map, fluent_bundle, diff --git a/compiler/rustc_errors/src/lib.rs b/compiler/rustc_errors/src/lib.rs index b7e1b0c8ad1..39f440e31e0 100644 --- a/compiler/rustc_errors/src/lib.rs +++ b/compiler/rustc_errors/src/lib.rs @@ -30,11 +30,11 @@ pub use emitter::ColorConfig; use rustc_lint_defs::LintExpectationId; use Level::*; -use emitter::{is_case_difference, Emitter, EmitterWriter}; +use emitter::{is_case_difference, DynEmitter, Emitter, EmitterWriter}; use registry::Registry; use rustc_data_structures::fx::{FxHashMap, FxHashSet, FxIndexMap, FxIndexSet}; use rustc_data_structures::stable_hasher::{Hash128, StableHasher}; -use rustc_data_structures::sync::{self, IntoDynSyncSend, Lock, Lrc}; +use rustc_data_structures::sync::{Lock, Lrc}; use rustc_data_structures::AtomicRef; pub use rustc_error_messages::{ fallback_fluent_bundle, fluent_bundle, DelayDm, DiagnosticMessage, FluentBundle, @@ -428,7 +428,7 @@ struct HandlerInner { err_count: usize, warn_count: usize, deduplicated_err_count: usize, - emitter: IntoDynSyncSend>, + emitter: Box, delayed_span_bugs: Vec, delayed_good_path_bugs: Vec, /// This flag indicates that an expected diagnostic was emitted and suppressed. @@ -594,7 +594,7 @@ impl Handler { self } - pub fn with_emitter(emitter: Box) -> Self { + pub fn with_emitter(emitter: Box) -> Self { Self { inner: Lock::new(HandlerInner { flags: HandlerFlags { can_emit_warnings: true, ..Default::default() }, @@ -603,7 +603,7 @@ impl Handler { warn_count: 0, deduplicated_err_count: 0, deduplicated_warn_count: 0, - emitter: IntoDynSyncSend(emitter), + emitter, delayed_span_bugs: Vec::new(), delayed_good_path_bugs: Vec::new(), suppressed_expected_diag: false, diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index ad35dbbc8f9..5c9b39cdbe3 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -137,10 +137,8 @@ fn get_stack_size() -> Option { env::var_os("RUST_MIN_STACK").is_none().then_some(STACK_SIZE) } -#[cfg(not(parallel_compiler))] -pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( +pub(crate) fn run_in_thread_with_globals R + Send, R: Send>( edition: Edition, - _threads: usize, f: F, ) -> R { // The "thread pool" is a single spawned thread in the non-parallel @@ -171,18 +169,37 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( }) } +#[cfg(not(parallel_compiler))] +pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( + edition: Edition, + _threads: usize, + f: F, +) -> R { + run_in_thread_with_globals(edition, f) +} + #[cfg(parallel_compiler)] pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( edition: Edition, threads: usize, f: F, ) -> R { - use rustc_data_structures::jobserver; + use rustc_data_structures::{jobserver, sync::FromDyn}; use rustc_middle::ty::tls; use rustc_query_impl::QueryCtxt; use rustc_query_system::query::{deadlock, QueryContext}; let registry = sync::Registry::new(threads); + + if !sync::is_dyn_thread_safe() { + return run_in_thread_with_globals(edition, || { + // Register the thread for use with the `WorkerLocal` type. + registry.register(); + + f() + }); + } + let mut builder = rayon::ThreadPoolBuilder::new() .thread_name(|_| "rustc".to_string()) .acquire_thread_handler(jobserver::acquire_thread) @@ -191,13 +208,13 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( .deadlock_handler(|| { // On deadlock, creates a new thread and forwards information in thread // locals to it. The new thread runs the deadlock handler. - let query_map = tls::with(|tcx| { + let query_map = FromDyn::from(tls::with(|tcx| { QueryCtxt::new(tcx) .try_collect_active_jobs() .expect("active jobs shouldn't be locked in deadlock handler") - }); + })); let registry = rayon_core::Registry::current(); - thread::spawn(move || deadlock(query_map, ®istry)); + thread::spawn(move || deadlock(query_map.into_inner(), ®istry)); }); if let Some(size) = get_stack_size() { builder = builder.stack_size(size); @@ -209,6 +226,7 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( // `Send` in the parallel compiler. rustc_span::create_session_globals_then(edition, || { rustc_span::with_session_globals(|session_globals| { + let session_globals = FromDyn::from(session_globals); builder .build_scoped( // Initialize each new worker thread when created. @@ -216,7 +234,9 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( // Register the thread for use with the `WorkerLocal` type. registry.register(); - rustc_span::set_session_globals_then(session_globals, || thread.run()) + rustc_span::set_session_globals_then(session_globals.into_inner(), || { + thread.run() + }) }, // Run `f` on the first thread in the thread pool. move |pool: &rayon::ThreadPool| pool.install(f), diff --git a/compiler/rustc_session/src/session.rs b/compiler/rustc_session/src/session.rs index 086ce4e6964..86db2edab7b 100644 --- a/compiler/rustc_session/src/session.rs +++ b/compiler/rustc_session/src/session.rs @@ -17,10 +17,10 @@ use rustc_data_structures::fx::{FxHashMap, FxIndexSet}; use rustc_data_structures::jobserver::{self, Client}; use rustc_data_structures::profiling::{duration_to_secs_str, SelfProfiler, SelfProfilerRef}; use rustc_data_structures::sync::{ - self, AtomicU64, AtomicUsize, Lock, Lrc, OneThread, Ordering, Ordering::SeqCst, + AtomicU64, AtomicUsize, Lock, Lrc, OneThread, Ordering, Ordering::SeqCst, }; use rustc_errors::annotate_snippet_emitter_writer::AnnotateSnippetEmitterWriter; -use rustc_errors::emitter::{Emitter, EmitterWriter, HumanReadableErrorType}; +use rustc_errors::emitter::{DynEmitter, EmitterWriter, HumanReadableErrorType}; use rustc_errors::json::JsonEmitter; use rustc_errors::registry::Registry; use rustc_errors::{ @@ -1251,7 +1251,7 @@ fn default_emitter( source_map: Lrc, bundle: Option>, fallback_bundle: LazyFallbackBundle, -) -> Box { +) -> Box { let macro_backtrace = sopts.unstable_opts.macro_backtrace; let track_diagnostics = sopts.unstable_opts.track_diagnostics; let terminal_url = match sopts.unstable_opts.terminal_urls { @@ -1717,12 +1717,12 @@ impl EarlyErrorHandler { } } -fn mk_emitter(output: ErrorOutputType) -> Box { +fn mk_emitter(output: ErrorOutputType) -> Box { // FIXME(#100717): early errors aren't translated at the moment, so this is fine, but it will // need to reference every crate that might emit an early error for translation to work. let fallback_bundle = fallback_fluent_bundle(vec![rustc_errors::DEFAULT_LOCALE_RESOURCE], false); - let emitter: Box = match output { + let emitter: Box = match output { config::ErrorOutputType::HumanReadable(kind) => { let (short, color_config) = kind.unzip(); Box::new(EmitterWriter::stderr(color_config, fallback_bundle).short_message(short)) diff --git a/src/librustdoc/core.rs b/src/librustdoc/core.rs index 4c8dab61f0c..eadff37b592 100644 --- a/src/librustdoc/core.rs +++ b/src/librustdoc/core.rs @@ -1,7 +1,7 @@ use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -use rustc_data_structures::sync::{self, Lrc}; +use rustc_data_structures::sync::Lrc; use rustc_data_structures::unord::UnordSet; -use rustc_errors::emitter::{Emitter, EmitterWriter}; +use rustc_errors::emitter::{DynEmitter, EmitterWriter}; use rustc_errors::json::JsonEmitter; use rustc_errors::TerminalUrl; use rustc_feature::UnstableFeatures; @@ -133,7 +133,7 @@ pub(crate) fn new_handler( rustc_driver::DEFAULT_LOCALE_RESOURCES.to_vec(), false, ); - let emitter: Box = match error_format { + let emitter: Box = match error_format { ErrorOutputType::HumanReadable(kind) => { let (short, color_config) = kind.unzip(); Box::new( diff --git a/src/tools/rustfmt/src/parse/session.rs b/src/tools/rustfmt/src/parse/session.rs index 945e3e42fdd..d815d69d6ba 100644 --- a/src/tools/rustfmt/src/parse/session.rs +++ b/src/tools/rustfmt/src/parse/session.rs @@ -1,8 +1,8 @@ use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; -use rustc_data_structures::sync::{Lrc, Send}; -use rustc_errors::emitter::{Emitter, EmitterWriter}; +use rustc_data_structures::sync::{IntoDynSyncSend, Lrc}; +use rustc_errors::emitter::{DynEmitter, Emitter, EmitterWriter}; use rustc_errors::translation::Translate; use rustc_errors::{ColorConfig, Diagnostic, Handler, Level as DiagnosticLevel}; use rustc_session::parse::ParseSess as RawParseSess; @@ -48,15 +48,15 @@ impl Emitter for SilentEmitter { fn emit_diagnostic(&mut self, _db: &Diagnostic) {} } -fn silent_emitter() -> Box { +fn silent_emitter() -> Box { Box::new(SilentEmitter {}) } /// Emit errors against every files expect ones specified in the `ignore_path_set`. struct SilentOnIgnoredFilesEmitter { - ignore_path_set: Lrc, + ignore_path_set: IntoDynSyncSend>, source_map: Lrc, - emitter: Box, + emitter: Box, has_non_ignorable_parser_errors: bool, can_reset: Lrc, } @@ -145,7 +145,7 @@ fn default_handler( has_non_ignorable_parser_errors: false, source_map, emitter, - ignore_path_set, + ignore_path_set: IntoDynSyncSend(ignore_path_set), can_reset, })) } @@ -396,7 +396,7 @@ mod tests { has_non_ignorable_parser_errors: false, source_map, emitter: Box::new(emitter_writer), - ignore_path_set, + ignore_path_set: IntoDynSyncSend(ignore_path_set), can_reset, } } -- cgit 1.4.1-3-g733a5 From 51eb8427bf3887323a77d4dd56bf4557f80fdabc Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Sat, 19 Aug 2023 17:26:54 +0200 Subject: Make parallel! an expression --- compiler/rustc_data_structures/src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 083aa6cf697..c46ae9a65da 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -203,7 +203,7 @@ cfg_if! { #[macro_export] macro_rules! parallel { - ($($blocks:block),*) => { + ($($blocks:block),*) => {{ // We catch panics here ensuring that all the blocks execute. // This makes behavior consistent with the parallel compiler. let mut panic = None; @@ -219,7 +219,7 @@ cfg_if! { if let Some(panic) = panic { ::std::panic::resume_unwind(panic); } - } + }} } pub fn par_for_each_in(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) { -- cgit 1.4.1-3-g733a5 From b56acac41d2abbc9afc91fec370b574be49d5c6e Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Fri, 3 Jan 2020 01:51:30 +0100 Subject: Add `ParallelGuard` type to handle unwinding in parallel sections --- compiler/rustc_data_structures/src/sync.rs | 189 +++++++++++------------------ 1 file changed, 68 insertions(+), 121 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index c46ae9a65da..8defcd39ecd 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -41,6 +41,7 @@ //! [^2] `MTLockRef` is a typedef. pub use crate::marker::*; +use std::any::Any; use std::collections::HashMap; use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut}; @@ -103,6 +104,37 @@ mod mode { pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; +/// A guard used to hold panics that occur during a parallel section to later by unwound. +/// This is used for the parallel compiler to prevent fatal errors from non-deterministically +/// hiding errors by ensuring that everything in the section has completed executing before +/// continuing with unwinding. It's also used for the non-parallel code to ensure error message +/// output match the parallel compiler for testing purposes. +pub struct ParallelGuard { + panic: Lock>>, +} + +impl ParallelGuard { + #[inline] + pub fn new() -> Self { + ParallelGuard { panic: Lock::new(None) } + } + + pub fn run(&self, f: impl FnOnce() -> R) -> Option { + catch_unwind(AssertUnwindSafe(f)) + .map_err(|err| { + *self.panic.lock() = Some(err); + }) + .ok() + } + + #[inline] + pub fn unwind(self) { + if let Some(panic) = self.panic.into_inner() { + resume_unwind(panic); + } + } +} + cfg_if! { if #[cfg(not(parallel_compiler))] { use std::ops::Add; @@ -198,66 +230,37 @@ cfg_if! { where A: FnOnce() -> RA, B: FnOnce() -> RB { - (oper_a(), oper_b()) + let guard = ParallelGuard::new(); + let a = guard.run(oper_a); + let b = guard.run(oper_b); + guard.unwind(); + (a.unwrap(), b.unwrap()) } #[macro_export] macro_rules! parallel { ($($blocks:block),*) => {{ - // We catch panics here ensuring that all the blocks execute. - // This makes behavior consistent with the parallel compiler. - let mut panic = None; - $( - if let Err(p) = ::std::panic::catch_unwind( - ::std::panic::AssertUnwindSafe(|| $blocks) - ) { - if panic.is_none() { - panic = Some(p); - } - } - )* - if let Some(panic) = panic { - ::std::panic::resume_unwind(panic); - } + let mut guard = $crate::sync::ParallelGuard::new(); + $(guard.run(|| $blocks);)* + guard.unwind(); }} } pub fn par_for_each_in(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) { - // We catch panics here ensuring that all the loop iterations execute. - // This makes behavior consistent with the parallel compiler. - let mut panic = None; + let guard = ParallelGuard::new(); t.into_iter().for_each(|i| { - if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { - if panic.is_none() { - panic = Some(p); - } - } + guard.run(|| for_each(i)); }); - if let Some(panic) = panic { - resume_unwind(panic); - } + guard.unwind(); } pub fn par_map>( t: T, mut map: impl FnMut(<::IntoIter as Iterator>::Item) -> R, ) -> C { - // We catch panics here ensuring that all the loop iterations execute. - let mut panic = None; - let r = t.into_iter().filter_map(|i| { - match catch_unwind(AssertUnwindSafe(|| map(i))) { - Ok(r) => Some(r), - Err(p) => { - if panic.is_none() { - panic = Some(p); - } - None - } - } - }).collect(); - if let Some(panic) = panic { - resume_unwind(panic); - } + let guard = ParallelGuard::new(); + let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect(); + guard.unwind(); r } @@ -380,7 +383,11 @@ cfg_if! { let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()())); (a.into_inner(), b.into_inner()) } else { - (oper_a(), oper_b()) + let guard = ParallelGuard::new(); + let a = guard.run(oper_a); + let b = guard.run(oper_b); + guard.unwind(); + (a.unwrap(), b.unwrap()) } } @@ -415,28 +422,10 @@ cfg_if! { // of a single threaded rustc. parallel!(impl $fblock [] [$($blocks),*]); } else { - // We catch panics here ensuring that all the blocks execute. - // This makes behavior consistent with the parallel compiler. - let mut panic = None; - if let Err(p) = ::std::panic::catch_unwind( - ::std::panic::AssertUnwindSafe(|| $fblock) - ) { - if panic.is_none() { - panic = Some(p); - } - } - $( - if let Err(p) = ::std::panic::catch_unwind( - ::std::panic::AssertUnwindSafe(|| $blocks) - ) { - if panic.is_none() { - panic = Some(p); - } - } - )* - if let Some(panic) = panic { - ::std::panic::resume_unwind(panic); - } + let guard = $crate::sync::ParallelGuard::new(); + guard.run(|| $fblock); + $(guard.run(|| $blocks);)* + guard.unwind(); } }; } @@ -449,31 +438,17 @@ cfg_if! { ) { if mode::is_dyn_thread_safe() { let for_each = FromDyn::from(for_each); - let panic: Mutex> = Mutex::new(None); - t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { - let mut l = panic.lock(); - if l.is_none() { - *l = Some(p) - } + let guard = ParallelGuard::new(); + t.into_par_iter().for_each(|i| { + guard.run(|| for_each(i)); }); - - if let Some(panic) = panic.into_inner() { - resume_unwind(panic); - } + guard.unwind(); } else { - // We catch panics here ensuring that all the loop iterations execute. - // This makes behavior consistent with the parallel compiler. - let mut panic = None; + let guard = ParallelGuard::new(); t.into_iter().for_each(|i| { - if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { - if panic.is_none() { - panic = Some(p); - } - } + guard.run(|| for_each(i)); }); - if let Some(panic) = panic { - resume_unwind(panic); - } + guard.unwind(); } } @@ -487,43 +462,15 @@ cfg_if! { map: impl Fn(I) -> R + DynSync + DynSend ) -> C { if mode::is_dyn_thread_safe() { - let panic: Mutex> = Mutex::new(None); let map = FromDyn::from(map); - // We catch panics here ensuring that all the loop iterations execute. - let r = t.into_par_iter().filter_map(|i| { - match catch_unwind(AssertUnwindSafe(|| map(i))) { - Ok(r) => Some(r), - Err(p) => { - let mut l = panic.lock(); - if l.is_none() { - *l = Some(p); - } - None - }, - } - }).collect(); - - if let Some(panic) = panic.into_inner() { - resume_unwind(panic); - } + let guard = ParallelGuard::new(); + let r = t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect(); + guard.unwind(); r } else { - // We catch panics here ensuring that all the loop iterations execute. - let mut panic = None; - let r = t.into_iter().filter_map(|i| { - match catch_unwind(AssertUnwindSafe(|| map(i))) { - Ok(r) => Some(r), - Err(p) => { - if panic.is_none() { - panic = Some(p); - } - None - } - } - }).collect(); - if let Some(panic) = panic { - resume_unwind(panic); - } + let guard = ParallelGuard::new(); + let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect(); + guard.unwind(); r } } -- cgit 1.4.1-3-g733a5 From d36393b839719959e1f2ce90d8805e53b131f759 Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Thu, 24 Aug 2023 02:52:16 +0200 Subject: Use `Mutex` to avoid issue with conditional locks --- compiler/rustc_data_structures/src/sync.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 8defcd39ecd..5fe1e8f8c4f 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -41,6 +41,7 @@ //! [^2] `MTLockRef` is a typedef. pub use crate::marker::*; +use parking_lot::Mutex; use std::any::Any; use std::collections::HashMap; use std::hash::{BuildHasher, Hash}; @@ -110,13 +111,13 @@ pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; /// continuing with unwinding. It's also used for the non-parallel code to ensure error message /// output match the parallel compiler for testing purposes. pub struct ParallelGuard { - panic: Lock>>, + panic: Mutex>>, } impl ParallelGuard { #[inline] pub fn new() -> Self { - ParallelGuard { panic: Lock::new(None) } + ParallelGuard { panic: Mutex::new(None) } } pub fn run(&self, f: impl FnOnce() -> R) -> Option { @@ -316,8 +317,6 @@ cfg_if! { } } } else { - use parking_lot::Mutex; - pub use std::marker::Send as Send; pub use std::marker::Sync as Sync; -- cgit 1.4.1-3-g733a5 From c303c8abdddaae5508346f3a31c651744d005dfd Mon Sep 17 00:00:00 2001 From: John Kåre Alsaker Date: Fri, 25 Aug 2023 22:16:21 +0200 Subject: Use a `parallel_guard` function to handle the parallel guard --- compiler/rustc_data_structures/src/sync.rs | 115 ++++++++++++++--------------- 1 file changed, 55 insertions(+), 60 deletions(-) (limited to 'compiler/rustc_data_structures') diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 5fe1e8f8c4f..2ec509b9118 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -115,11 +115,6 @@ pub struct ParallelGuard { } impl ParallelGuard { - #[inline] - pub fn new() -> Self { - ParallelGuard { panic: Mutex::new(None) } - } - pub fn run(&self, f: impl FnOnce() -> R) -> Option { catch_unwind(AssertUnwindSafe(f)) .map_err(|err| { @@ -127,13 +122,18 @@ impl ParallelGuard { }) .ok() } +} - #[inline] - pub fn unwind(self) { - if let Some(panic) = self.panic.into_inner() { - resume_unwind(panic); - } +/// This gives access to a fresh parallel guard in the closure and will unwind any panics +/// caught in it after the closure returns. +#[inline] +pub fn parallel_guard(f: impl FnOnce(&ParallelGuard) -> R) -> R { + let guard = ParallelGuard { panic: Mutex::new(None) }; + let ret = f(&guard); + if let Some(panic) = guard.panic.into_inner() { + resume_unwind(panic); } + ret } cfg_if! { @@ -231,38 +231,38 @@ cfg_if! { where A: FnOnce() -> RA, B: FnOnce() -> RB { - let guard = ParallelGuard::new(); - let a = guard.run(oper_a); - let b = guard.run(oper_b); - guard.unwind(); + let (a, b) = parallel_guard(|guard| { + let a = guard.run(oper_a); + let b = guard.run(oper_b); + (a, b) + }); (a.unwrap(), b.unwrap()) } #[macro_export] macro_rules! parallel { ($($blocks:block),*) => {{ - let mut guard = $crate::sync::ParallelGuard::new(); - $(guard.run(|| $blocks);)* - guard.unwind(); + $crate::sync::parallel_guard(|guard| { + $(guard.run(|| $blocks);)* + }); }} } pub fn par_for_each_in(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) { - let guard = ParallelGuard::new(); - t.into_iter().for_each(|i| { - guard.run(|| for_each(i)); - }); - guard.unwind(); + parallel_guard(|guard| { + t.into_iter().for_each(|i| { + guard.run(|| for_each(i)); + }); + }) } pub fn par_map>( t: T, mut map: impl FnMut(<::IntoIter as Iterator>::Item) -> R, ) -> C { - let guard = ParallelGuard::new(); - let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect(); - guard.unwind(); - r + parallel_guard(|guard| { + t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() + }) } pub use std::rc::Rc as Lrc; @@ -382,10 +382,11 @@ cfg_if! { let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()())); (a.into_inner(), b.into_inner()) } else { - let guard = ParallelGuard::new(); - let a = guard.run(oper_a); - let b = guard.run(oper_b); - guard.unwind(); + let (a, b) = parallel_guard(|guard| { + let a = guard.run(oper_a); + let b = guard.run(oper_b); + (a, b) + }); (a.unwrap(), b.unwrap()) } } @@ -421,10 +422,10 @@ cfg_if! { // of a single threaded rustc. parallel!(impl $fblock [] [$($blocks),*]); } else { - let guard = $crate::sync::ParallelGuard::new(); - guard.run(|| $fblock); - $(guard.run(|| $blocks);)* - guard.unwind(); + $crate::sync::parallel_guard(|guard| { + guard.run(|| $fblock); + $(guard.run(|| $blocks);)* + }); } }; } @@ -435,20 +436,18 @@ cfg_if! { t: T, for_each: impl Fn(I) + DynSync + DynSend ) { - if mode::is_dyn_thread_safe() { - let for_each = FromDyn::from(for_each); - let guard = ParallelGuard::new(); - t.into_par_iter().for_each(|i| { - guard.run(|| for_each(i)); - }); - guard.unwind(); - } else { - let guard = ParallelGuard::new(); - t.into_iter().for_each(|i| { - guard.run(|| for_each(i)); - }); - guard.unwind(); - } + parallel_guard(|guard| { + if mode::is_dyn_thread_safe() { + let for_each = FromDyn::from(for_each); + t.into_par_iter().for_each(|i| { + guard.run(|| for_each(i)); + }); + } else { + t.into_iter().for_each(|i| { + guard.run(|| for_each(i)); + }); + } + }); } pub fn par_map< @@ -460,18 +459,14 @@ cfg_if! { t: T, map: impl Fn(I) -> R + DynSync + DynSend ) -> C { - if mode::is_dyn_thread_safe() { - let map = FromDyn::from(map); - let guard = ParallelGuard::new(); - let r = t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect(); - guard.unwind(); - r - } else { - let guard = ParallelGuard::new(); - let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect(); - guard.unwind(); - r - } + parallel_guard(|guard| { + if mode::is_dyn_thread_safe() { + let map = FromDyn::from(map); + t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect() + } else { + t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() + } + }) } /// This makes locks panic if they are already held. -- cgit 1.4.1-3-g733a5