diff options
| author | Celina G. Val <celinval@amazon.com> | 2025-06-11 10:43:59 -0700 | 
|---|---|---|
| committer | Celina G. Val <celinval@amazon.com> | 2025-06-11 10:43:59 -0700 | 
| commit | 35c5144394c1b93784867d330f694fa7c8f480e3 (patch) | |
| tree | f2713cd009219f483303be69593f9b335b28c164 /compiler/rustc_thread_pool | |
| parent | bdb04d6c4fff0970bc2f1fad775bec807d9470ee (diff) | |
| download | rust-35c5144394c1b93784867d330f694fa7c8f480e3.tar.gz rust-35c5144394c1b93784867d330f694fa7c8f480e3.zip | |
Move rayon-core to rustc_thread_pool files as is
This commit literally copied the directory rayon-core from revision `5fadf44`. Link: https://github.com/rust-lang/rustc-rayon/tree/5fadf44/rayon-core
Diffstat (limited to 'compiler/rustc_thread_pool')
39 files changed, 8225 insertions, 0 deletions
| diff --git a/compiler/rustc_thread_pool/Cargo.toml b/compiler/rustc_thread_pool/Cargo.toml new file mode 100644 index 00000000000..f7a3d1306b4 --- /dev/null +++ b/compiler/rustc_thread_pool/Cargo.toml @@ -0,0 +1,56 @@ +[package] +name = "rustc-rayon-core" +version = "0.5.1" +authors = ["Niko Matsakis <niko@alum.mit.edu>", + "Josh Stone <cuviper@gmail.com>"] +description = "Core APIs for Rayon - fork for rustc" +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon-core/" +rust-version = "1.63" +edition = "2021" +readme = "README.md" +keywords = ["parallel", "thread", "concurrency", "join", "performance"] +categories = ["concurrency"] + +[lib] +name = "rayon_core" + +# Some dependencies may not be their latest version, in order to support older rustc. +[dependencies] +crossbeam-deque = "0.8.1" +crossbeam-utils = "0.8.0" + +[dev-dependencies] +rand = "0.9" +rand_xorshift = "0.4" +scoped-tls = "1.0" + +[target.'cfg(unix)'.dev-dependencies] +libc = "0.2" + +[[test]] +name = "stack_overflow_crash" +path = "tests/stack_overflow_crash.rs" + +# NB: having one [[test]] manually defined means we need to declare them all + +[[test]] +name = "double_init_fail" +path = "tests/double_init_fail.rs" + +[[test]] +name = "init_zero_threads" +path = "tests/init_zero_threads.rs" + +[[test]] +name = "scope_join" +path = "tests/scope_join.rs" + +[[test]] +name = "simple_panic" +path = "tests/simple_panic.rs" + +[[test]] +name = "scoped_threadpool" +path = "tests/scoped_threadpool.rs" diff --git a/compiler/rustc_thread_pool/LICENSE-APACHE b/compiler/rustc_thread_pool/LICENSE-APACHE new file mode 100644 index 00000000000..16fe87b06e8 --- /dev/null +++ b/compiler/rustc_thread_pool/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/compiler/rustc_thread_pool/LICENSE-MIT b/compiler/rustc_thread_pool/LICENSE-MIT new file mode 100644 index 00000000000..25597d5838f --- /dev/null +++ b/compiler/rustc_thread_pool/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2010 The Rust Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/compiler/rustc_thread_pool/README.md b/compiler/rustc_thread_pool/README.md new file mode 100644 index 00000000000..5b8714f5df4 --- /dev/null +++ b/compiler/rustc_thread_pool/README.md @@ -0,0 +1,13 @@ +Note: This is an unstable fork made for use in rustc + +Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. + +Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirrored in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. + +rayon-core aims to never, or almost never, have a breaking change to its API, because each revision of rayon-core also houses the global thread-pool (and hence if you have two simultaneous versions of rayon-core, you have two thread-pools). + +Please see [Rayon Docs] for details about using Rayon. + +[Rayon Docs]: https://docs.rs/rayon/ + +Rayon-core currently requires `rustc 1.63.0` or greater. diff --git a/compiler/rustc_thread_pool/src/broadcast/mod.rs b/compiler/rustc_thread_pool/src/broadcast/mod.rs new file mode 100644 index 00000000000..442891f2d28 --- /dev/null +++ b/compiler/rustc_thread_pool/src/broadcast/mod.rs @@ -0,0 +1,151 @@ +use crate::job::{ArcJob, StackJob}; +use crate::latch::{CountLatch, LatchRef}; +use crate::registry::{Registry, WorkerThread}; +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + +mod test; + +/// Executes `op` within every thread in the current threadpool. If this is +/// called from a non-Rayon thread, it will execute in the global threadpool. +/// Any attempts to use `join`, `scope`, or parallel iterators will then operate +/// within that threadpool. When the call has completed on each thread, returns +/// a vector containing all of their return values. +/// +/// For more information, see the [`ThreadPool::broadcast()`][m] method. +/// +/// [m]: struct.ThreadPool.html#method.broadcast +pub fn broadcast<OP, R>(op: OP) -> Vec<R> +where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, +{ + // We assert that current registry has not terminated. + unsafe { broadcast_in(op, &Registry::current()) } +} + +/// Spawns an asynchronous task on every thread in this thread-pool. This task +/// will run in the implicit, global scope, which means that it may outlast the +/// current stack frame -- therefore, it cannot capture any references onto the +/// stack (you will likely need a `move` closure). +/// +/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method. +/// +/// [m]: struct.ThreadPool.html#method.spawn_broadcast +pub fn spawn_broadcast<OP>(op: OP) +where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_broadcast_in(op, &Registry::current()) } +} + +/// Provides context to a closure called by `broadcast`. +pub struct BroadcastContext<'a> { + worker: &'a WorkerThread, + + /// Make sure to prevent auto-traits like `Send` and `Sync`. + _marker: PhantomData<&'a mut dyn Fn()>, +} + +impl<'a> BroadcastContext<'a> { + pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + f(BroadcastContext { + worker: unsafe { &*worker_thread }, + _marker: PhantomData, + }) + } + + /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`). + #[inline] + pub fn index(&self) -> usize { + self.worker.index() + } + + /// The number of threads receiving the broadcast in the thread pool. + /// + /// # Future compatibility note + /// + /// Future versions of Rayon might vary the number of threads over time, but + /// this method will always return the number of threads which are actually + /// receiving your particular `broadcast` call. + #[inline] + pub fn num_threads(&self) -> usize { + self.worker.registry().num_threads() + } +} + +impl<'a> fmt::Debug for BroadcastContext<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BroadcastContext") + .field("index", &self.index()) + .field("num_threads", &self.num_threads()) + .field("pool_id", &self.worker.registry().id()) + .finish() + } +} + +/// Execute `op` on every thread in the pool. It will be executed on each +/// thread when they have nothing else to do locally, before they try to +/// steal work from other threads. This function will not return until all +/// threads have completed the `op`. +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R> +where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, +{ + let f = move |injected: bool| { + debug_assert!(injected); + BroadcastContext::with(&op) + }; + + let n_threads = registry.num_threads(); + let current_thread = WorkerThread::current().as_ref(); + let tlv = crate::tlv::get(); + let latch = CountLatch::with_count(n_threads, current_thread); + let jobs: Vec<_> = (0..n_threads) + .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))) + .collect(); + let job_refs = jobs.iter().map(|job| job.as_job_ref()); + + registry.inject_broadcast(job_refs); + + // Wait for all jobs to complete, then collect the results, maybe propagating a panic. + latch.wait(current_thread); + jobs.into_iter().map(|job| job.into_result()).collect() +} + +/// Execute `op` on every thread in the pool. It will be executed on each +/// thread when they have nothing else to do locally, before they try to +/// steal work from other threads. This function returns immediately after +/// injecting the jobs. +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>) +where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, +{ + let job = ArcJob::new({ + let registry = Arc::clone(registry); + move || { + registry.catch_unwind(|| BroadcastContext::with(&op)); + registry.terminate(); // (*) permit registry to terminate now + } + }); + + let n_threads = registry.num_threads(); + let job_refs = (0..n_threads).map(|_| { + // Ensure that registry cannot terminate until this job has executed + // on each thread. This ref is decremented at the (*) above. + registry.increment_terminate_count(); + + ArcJob::as_static_job_ref(&job) + }); + + registry.inject_broadcast(job_refs); +} diff --git a/compiler/rustc_thread_pool/src/broadcast/test.rs b/compiler/rustc_thread_pool/src/broadcast/test.rs new file mode 100644 index 00000000000..00ab4ad7fe4 --- /dev/null +++ b/compiler/rustc_thread_pool/src/broadcast/test.rs @@ -0,0 +1,263 @@ +#![cfg(test)] + +use crate::ThreadPoolBuilder; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::{thread, time}; + +#[test] +fn broadcast_global() { + let v = crate::broadcast(|ctx| ctx.index()); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_broadcast_global() { + let (tx, rx) = channel(); + crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn broadcast_pool() { + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let v = pool.broadcast(|ctx| ctx.index()); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_broadcast_pool() { + let (tx, rx) = channel(); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn broadcast_self() { + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_broadcast_self() { + let (tx, rx) = channel(); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); + + let mut v: Vec<_> = rx.into_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..7)); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn broadcast_mutual() { + let count = AtomicUsize::new(0); + let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.install(|| { + pool2.broadcast(|_| { + pool1.broadcast(|_| { + count.fetch_add(1, Ordering::Relaxed); + }) + }) + }); + assert_eq!(count.into_inner(), 3 * 7); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_broadcast_mutual() { + let (tx, rx) = channel(); + let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.spawn({ + let pool1 = Arc::clone(&pool1); + move || { + pool2.spawn_broadcast(move |_| { + let tx = tx.clone(); + pool1.spawn_broadcast(move |_| tx.send(()).unwrap()) + }) + } + }); + assert_eq!(rx.into_iter().count(), 3 * 7); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn broadcast_mutual_sleepy() { + let count = AtomicUsize::new(0); + let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.install(|| { + thread::sleep(time::Duration::from_secs(1)); + pool2.broadcast(|_| { + thread::sleep(time::Duration::from_secs(1)); + pool1.broadcast(|_| { + thread::sleep(time::Duration::from_millis(100)); + count.fetch_add(1, Ordering::Relaxed); + }) + }) + }); + assert_eq!(count.into_inner(), 3 * 7); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_broadcast_mutual_sleepy() { + let (tx, rx) = channel(); + let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); + let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool1.spawn({ + let pool1 = Arc::clone(&pool1); + move || { + thread::sleep(time::Duration::from_secs(1)); + pool2.spawn_broadcast(move |_| { + let tx = tx.clone(); + thread::sleep(time::Duration::from_secs(1)); + pool1.spawn_broadcast(move |_| { + thread::sleep(time::Duration::from_millis(100)); + tx.send(()).unwrap(); + }) + }) + } + }); + assert_eq!(rx.into_iter().count(), 3 * 7); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn broadcast_panic_one() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.broadcast(|ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }) + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn spawn_broadcast_panic_one() { + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); + let pool = ThreadPoolBuilder::new() + .num_threads(7) + .panic_handler(move |e| panic_tx.send(e).unwrap()) + .build() + .unwrap(); + pool.spawn_broadcast(move |ctx| { + tx.send(()).unwrap(); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }); + drop(pool); // including panic_tx + assert_eq!(rx.into_iter().count(), 7); + assert_eq!(panic_rx.into_iter().count(), 1); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn broadcast_panic_many() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.broadcast(|ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }) + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn spawn_broadcast_panic_many() { + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); + let pool = ThreadPoolBuilder::new() + .num_threads(7) + .panic_handler(move |e| panic_tx.send(e).unwrap()) + .build() + .unwrap(); + pool.spawn_broadcast(move |ctx| { + tx.send(()).unwrap(); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }); + drop(pool); // including panic_tx + assert_eq!(rx.into_iter().count(), 7); + assert_eq!(panic_rx.into_iter().count(), 4); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn broadcast_sleep_race() { + let test_duration = time::Duration::from_secs(1); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let start = time::Instant::now(); + while start.elapsed() < test_duration { + pool.broadcast(|ctx| { + // A slight spread of sleep duration increases the chance that one + // of the threads will race in the pool's idle sleep afterward. + thread::sleep(time::Duration::from_micros(ctx.index() as u64)); + }); + } +} + +#[test] +fn broadcast_after_spawn_broadcast() { + let (tx, rx) = channel(); + + // Queue a non-blocking spawn_broadcast. + crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); + + // This blocking broadcast runs after all prior broadcasts. + crate::broadcast(|_| {}); + + // The spawn_broadcast **must** have run by now on all threads. + let mut v: Vec<_> = rx.try_iter().collect(); + v.sort_unstable(); + assert!(v.into_iter().eq(0..crate::current_num_threads())); +} + +#[test] +fn broadcast_after_spawn() { + let (tx, rx) = channel(); + + // Queue a regular spawn on a thread-local deque. + crate::registry::in_worker(move |_, _| { + crate::spawn(move || tx.send(22).unwrap()); + }); + + // Broadcast runs after the local deque is empty. + crate::broadcast(|_| {}); + + // The spawn **must** have run by now. + assert_eq!(22, rx.try_recv().unwrap()); +} diff --git a/compiler/rustc_thread_pool/src/compile_fail/mod.rs b/compiler/rustc_thread_pool/src/compile_fail/mod.rs new file mode 100644 index 00000000000..f2ec646a4d3 --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/mod.rs @@ -0,0 +1,7 @@ +// These modules contain `compile_fail` doc tests. +mod quicksort_race1; +mod quicksort_race2; +mod quicksort_race3; +mod rc_return; +mod rc_upvar; +mod scope_join_bad; diff --git a/compiler/rustc_thread_pool/src/compile_fail/quicksort_race1.rs b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race1.rs new file mode 100644 index 00000000000..5615033895a --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race1.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0524 + +fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR +} + +fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/compiler/rustc_thread_pool/src/compile_fail/quicksort_race2.rs b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race2.rs new file mode 100644 index 00000000000..020589c29a8 --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race2.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0500 + +fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR +} + +fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/compiler/rustc_thread_pool/src/compile_fail/quicksort_race3.rs b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race3.rs new file mode 100644 index 00000000000..16fbf3b824d --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/quicksort_race3.rs @@ -0,0 +1,28 @@ +/*! ```compile_fail,E0524 + +fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (_lo, hi) = v.split_at_mut(mid); + rayon_core::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR +} + +fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ diff --git a/compiler/rustc_thread_pool/src/compile_fail/rc_return.rs b/compiler/rustc_thread_pool/src/compile_fail/rc_return.rs new file mode 100644 index 00000000000..93e3a603849 --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/rc_return.rs @@ -0,0 +1,17 @@ +/** ```compile_fail,E0277 + +use std::rc::Rc; + +rayon_core::join(|| Rc::new(22), || ()); //~ ERROR + +``` */ +mod left {} + +/** ```compile_fail,E0277 + +use std::rc::Rc; + +rayon_core::join(|| (), || Rc::new(23)); //~ ERROR + +``` */ +mod right {} diff --git a/compiler/rustc_thread_pool/src/compile_fail/rc_upvar.rs b/compiler/rustc_thread_pool/src/compile_fail/rc_upvar.rs new file mode 100644 index 00000000000..d8aebcfcbf2 --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/rc_upvar.rs @@ -0,0 +1,9 @@ +/*! ```compile_fail,E0277 + +use std::rc::Rc; + +let r = Rc::new(22); +rayon_core::join(|| r.clone(), || r.clone()); +//~^ ERROR + +``` */ diff --git a/compiler/rustc_thread_pool/src/compile_fail/scope_join_bad.rs b/compiler/rustc_thread_pool/src/compile_fail/scope_join_bad.rs new file mode 100644 index 00000000000..75e4c5ca6c0 --- /dev/null +++ b/compiler/rustc_thread_pool/src/compile_fail/scope_join_bad.rs @@ -0,0 +1,24 @@ +/*! ```compile_fail,E0373 + +fn bad_scope<F>(f: F) + where F: FnOnce(&i32) + Send, +{ + rayon_core::scope(|s| { + let x = 22; + s.spawn(|_| f(&x)); //~ ERROR `x` does not live long enough + }); +} + +fn good_scope<F>(f: F) + where F: FnOnce(&i32) + Send, +{ + let x = 22; + rayon_core::scope(|s| { + s.spawn(|_| f(&x)); + }); +} + +fn main() { +} + +``` */ diff --git a/compiler/rustc_thread_pool/src/job.rs b/compiler/rustc_thread_pool/src/job.rs new file mode 100644 index 00000000000..394c7576b2c --- /dev/null +++ b/compiler/rustc_thread_pool/src/job.rs @@ -0,0 +1,277 @@ +use crate::latch::Latch; +use crate::tlv; +use crate::tlv::Tlv; +use crate::unwind; +use crossbeam_deque::{Injector, Steal}; +use std::any::Any; +use std::cell::UnsafeCell; +use std::mem; +use std::sync::Arc; + +pub(super) enum JobResult<T> { + None, + Ok(T), + Panic(Box<dyn Any + Send>), +} + +/// A `Job` is used to advertise work for other threads that they may +/// want to steal. In accordance with time honored tradition, jobs are +/// arranged in a deque, so that thieves can take from the top of the +/// deque while the main worker manages the bottom of the deque. This +/// deque is managed by the `thread_pool` module. +pub(super) trait Job { + /// Unsafe: this may be called from a different thread than the one + /// which scheduled the job, so the implementer must ensure the + /// appropriate traits are met, whether `Send`, `Sync`, or both. + unsafe fn execute(this: *const ()); +} + +/// Effectively a Job trait object. Each JobRef **must** be executed +/// exactly once, or else data may leak. +/// +/// Internally, we store the job's data in a `*const ()` pointer. The +/// true type is something like `*const StackJob<...>`, but we hide +/// it. We also carry the "execute fn" from the `Job` trait. +pub(super) struct JobRef { + pointer: *const (), + execute_fn: unsafe fn(*const ()), +} + +unsafe impl Send for JobRef {} +unsafe impl Sync for JobRef {} + +impl JobRef { + /// Unsafe: caller asserts that `data` will remain valid until the + /// job is executed. + pub(super) unsafe fn new<T>(data: *const T) -> JobRef + where + T: Job, + { + // erase types: + JobRef { + pointer: data as *const (), + execute_fn: <T as Job>::execute, + } + } + + /// Returns an opaque handle that can be saved and compared, + /// without making `JobRef` itself `Copy + Eq`. + #[inline] + pub(super) fn id(&self) -> impl Eq { + (self.pointer, self.execute_fn) + } + + #[inline] + pub(super) unsafe fn execute(self) { + (self.execute_fn)(self.pointer) + } +} + +/// A job that will be owned by a stack slot. This means that when it +/// executes it need not free any heap data, the cleanup occurs when +/// the stack frame is later popped. The function parameter indicates +/// `true` if the job was stolen -- executed on a different thread. +pub(super) struct StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) latch: L, + func: UnsafeCell<Option<F>>, + result: UnsafeCell<JobResult<R>>, + tlv: Tlv, +} + +impl<L, F, R> StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> { + StackJob { + latch, + func: UnsafeCell::new(Some(func)), + result: UnsafeCell::new(JobResult::None), + tlv, + } + } + + pub(super) unsafe fn as_job_ref(&self) -> JobRef { + JobRef::new(self) + } + + pub(super) unsafe fn run_inline(self, stolen: bool) -> R { + self.func.into_inner().unwrap()(stolen) + } + + pub(super) unsafe fn into_result(self) -> R { + self.result.into_inner().into_return_value() + } +} + +impl<L, F, R> Job for StackJob<L, F, R> +where + L: Latch + Sync, + F: FnOnce(bool) -> R + Send, + R: Send, +{ + unsafe fn execute(this: *const ()) { + let this = &*(this as *const Self); + tlv::set(this.tlv); + let abort = unwind::AbortIfPanic; + let func = (*this.func.get()).take().unwrap(); + (*this.result.get()) = JobResult::call(func); + Latch::set(&this.latch); + mem::forget(abort); + } +} + +/// Represents a job stored in the heap. Used to implement +/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply +/// invokes a closure, which then triggers the appropriate logic to +/// signal that the job executed. +/// +/// (Probably `StackJob` should be refactored in a similar fashion.) +pub(super) struct HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + job: BODY, + tlv: Tlv, +} + +impl<BODY> HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> { + Box::new(HeapJob { job, tlv }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { + JobRef::new(Box::into_raw(self)) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef + where + BODY: 'static, + { + unsafe { self.into_job_ref() } + } +} + +impl<BODY> Job for HeapJob<BODY> +where + BODY: FnOnce() + Send, +{ + unsafe fn execute(this: *const ()) { + let this = Box::from_raw(this as *mut Self); + tlv::set(this.tlv); + (this.job)(); + } +} + +/// Represents a job stored in an `Arc` -- like `HeapJob`, but may +/// be turned into multiple `JobRef`s and called multiple times. +pub(super) struct ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + job: BODY, +} + +impl<BODY> ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + pub(super) fn new(job: BODY) -> Arc<Self> { + Arc::new(ArcJob { job }) + } + + /// Creates a `JobRef` from this job -- note that this hides all + /// lifetimes, so it is up to you to ensure that this JobRef + /// doesn't outlive any data that it closes over. + pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { + JobRef::new(Arc::into_raw(Arc::clone(this))) + } + + /// Creates a static `JobRef` from this job. + pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef + where + BODY: 'static, + { + unsafe { Self::as_job_ref(this) } + } +} + +impl<BODY> Job for ArcJob<BODY> +where + BODY: Fn() + Send + Sync, +{ + unsafe fn execute(this: *const ()) { + let this = Arc::from_raw(this as *mut Self); + (this.job)(); + } +} + +impl<T> JobResult<T> { + fn call(func: impl FnOnce(bool) -> T) -> Self { + match unwind::halt_unwinding(|| func(true)) { + Ok(x) => JobResult::Ok(x), + Err(x) => JobResult::Panic(x), + } + } + + /// Convert the `JobResult` for a job that has finished (and hence + /// its JobResult is populated) into its return value. + /// + /// NB. This will panic if the job panicked. + pub(super) fn into_return_value(self) -> T { + match self { + JobResult::None => unreachable!(), + JobResult::Ok(x) => x, + JobResult::Panic(x) => unwind::resume_unwinding(x), + } + } +} + +/// Indirect queue to provide FIFO job priority. +pub(super) struct JobFifo { + inner: Injector<JobRef>, +} + +impl JobFifo { + pub(super) fn new() -> Self { + JobFifo { + inner: Injector::new(), + } + } + + pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { + // A little indirection ensures that spawns are always prioritized in FIFO order. The + // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front + // (FIFO), but either way they will end up popping from the front of this queue. + self.inner.push(job_ref); + JobRef::new(self) + } +} + +impl Job for JobFifo { + unsafe fn execute(this: *const ()) { + // We "execute" a queue by executing its first job, FIFO. + let this = &*(this as *const Self); + loop { + match this.inner.steal() { + Steal::Success(job_ref) => break job_ref.execute(), + Steal::Empty => panic!("FIFO is empty"), + Steal::Retry => {} + } + } + } +} diff --git a/compiler/rustc_thread_pool/src/join/mod.rs b/compiler/rustc_thread_pool/src/join/mod.rs new file mode 100644 index 00000000000..032eec9c4c8 --- /dev/null +++ b/compiler/rustc_thread_pool/src/join/mod.rs @@ -0,0 +1,202 @@ +use crate::job::StackJob; +use crate::latch::SpinLatch; +use crate::registry::{self, WorkerThread}; +use crate::tlv::{self, Tlv}; +use crate::unwind; +use std::any::Any; + +use crate::FnContext; + +#[cfg(test)] +mod test; + +/// Takes two closures and *potentially* runs them in parallel. It +/// returns a pair of the results from those closures. +/// +/// Conceptually, calling `join()` is similar to spawning two threads, +/// one executing each of the two closures. However, the +/// implementation is quite different and incurs very low +/// overhead. The underlying technique is called "work stealing": the +/// Rayon runtime uses a fixed pool of worker threads and attempts to +/// only execute code in parallel when there are idle CPUs to handle +/// it. +/// +/// When `join` is called from outside the thread pool, the calling +/// thread will block while the closures execute in the pool. When +/// `join` is called within the pool, the calling thread still actively +/// participates in the thread pool. It will begin by executing closure +/// A (on the current thread). While it is doing that, it will advertise +/// closure B as being available for other threads to execute. Once closure A +/// has completed, the current thread will try to execute closure B; +/// if however closure B has been stolen, then it will look for other work +/// while waiting for the thief to fully execute closure B. (This is the +/// typical work-stealing strategy). +/// +/// # Examples +/// +/// This example uses join to perform a quick-sort (note this is not a +/// particularly optimized implementation: if you **actually** want to +/// sort for real, you should prefer [the `par_sort` method] offered +/// by Rayon). +/// +/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort +/// +/// ```rust +/// # use rayon_core as rayon; +/// let mut v = vec![5, 1, 8, 22, 0, 44]; +/// quick_sort(&mut v); +/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]); +/// +/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) { +/// if v.len() > 1 { +/// let mid = partition(v); +/// let (lo, hi) = v.split_at_mut(mid); +/// rayon::join(|| quick_sort(lo), +/// || quick_sort(hi)); +/// } +/// } +/// +/// // Partition rearranges all items `<=` to the pivot +/// // item (arbitrary selected to be the last item in the slice) +/// // to the first half of the slice. It then returns the +/// // "dividing point" where the pivot is placed. +/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize { +/// let pivot = v.len() - 1; +/// let mut i = 0; +/// for j in 0..pivot { +/// if v[j] <= v[pivot] { +/// v.swap(i, j); +/// i += 1; +/// } +/// } +/// v.swap(i, pivot); +/// i +/// } +/// ``` +/// +/// # Warning about blocking I/O +/// +/// The assumption is that the closures given to `join()` are +/// CPU-bound tasks that do not perform I/O or other blocking +/// operations. If you do perform I/O, and that I/O should block +/// (e.g., waiting for a network request), the overall performance may +/// be poor. Moreover, if you cause one closure to be blocked waiting +/// on another (for example, using a channel), that could lead to a +/// deadlock. +/// +/// # Panics +/// +/// No matter what happens, both closures will always be executed. If +/// a single closure panics, whether it be the first or second +/// closure, that panic will be propagated and hence `join()` will +/// panic with the same panic value. If both closures panic, `join()` +/// will panic with the panic value from the first closure. +pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, +{ + #[inline] + fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R { + move |_| f() + } + + join_context(call(oper_a), call(oper_b)) +} + +/// Identical to `join`, except that the closures have a parameter +/// that provides context for the way the closure has been called, +/// especially indicating whether they're executing on a different +/// thread than where `join_context` was called. This will occur if +/// the second job is stolen by a different thread, or if +/// `join_context` was called from outside the thread pool to begin +/// with. +pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce(FnContext) -> RA + Send, + B: FnOnce(FnContext) -> RB + Send, + RA: Send, + RB: Send, +{ + #[inline] + fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { + move || f(FnContext::new(injected)) + } + + #[inline] + fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { + move |migrated| f(FnContext::new(migrated)) + } + + registry::in_worker(|worker_thread, injected| unsafe { + let tlv = tlv::get(); + // Create virtual wrapper for task b; this all has to be + // done here so that the stack frame can keep it all live + // long enough. + let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread)); + let job_b_ref = job_b.as_job_ref(); + let job_b_id = job_b_ref.id(); + worker_thread.push(job_b_ref); + + // Execute task a; hopefully b gets stolen in the meantime. + let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); + let result_a = match status_a { + Ok(v) => v, + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), + }; + + // Now that task A has finished, try to pop job B from the + // local stack. It may already have been popped by job A; it + // may also have been stolen. There may also be some tasks + // pushed on top of it in the stack, and we will have to pop + // those off to get to it. + while !job_b.latch.probe() { + if let Some(job) = worker_thread.take_local_job() { + if job_b_id == job.id() { + // Found it! Let's run it. + // + // Note that this could panic, but it's ok if we unwind here. + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + + let result_b = job_b.run_inline(injected); + return (result_a, result_b); + } else { + worker_thread.execute(job); + } + } else { + // Local deque is empty. Time to steal from other + // threads. + worker_thread.wait_until(&job_b.latch); + debug_assert!(job_b.latch.probe()); + break; + } + } + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + + (result_a, job_b.into_result()) + }) +} + +/// If job A panics, we still cannot return until we are sure that job +/// B is complete. This is because it may contain references into the +/// enclosing stack frame(s). +#[cold] // cold path +unsafe fn join_recover_from_panic( + worker_thread: &WorkerThread, + job_b_latch: &SpinLatch<'_>, + err: Box<dyn Any + Send>, + tlv: Tlv, +) -> ! { + worker_thread.wait_until(job_b_latch); + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + + unwind::resume_unwinding(err) +} diff --git a/compiler/rustc_thread_pool/src/join/test.rs b/compiler/rustc_thread_pool/src/join/test.rs new file mode 100644 index 00000000000..03f4ab4478d --- /dev/null +++ b/compiler/rustc_thread_pool/src/join/test.rs @@ -0,0 +1,150 @@ +//! Tests for the join code. + +use super::*; +use crate::ThreadPoolBuilder; +use rand::distr::StandardUniform; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; + +fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, hi) = v.split_at_mut(mid); + join(|| quick_sort(lo), || quick_sort(hi)); +} + +fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn seeded_rng() -> XorShiftRng { + let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); + (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); + XorShiftRng::from_seed(seed) +} + +#[test] +fn sort() { + let rng = seeded_rng(); + let mut data: Vec<u32> = rng.sample_iter(&StandardUniform).take(6 * 1024).collect(); + let mut sorted_data = data.clone(); + sorted_data.sort(); + quick_sort(&mut data); + assert_eq!(data, sorted_data); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn sort_in_pool() { + let rng = seeded_rng(); + let mut data: Vec<u32> = rng.sample_iter(&StandardUniform).take(12 * 1024).collect(); + + let pool = ThreadPoolBuilder::new().build().unwrap(); + let mut sorted_data = data.clone(); + sorted_data.sort(); + pool.install(|| quick_sort(&mut data)); + assert_eq!(data, sorted_data); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_a() { + join(|| panic!("Hello, world!"), || ()); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_b() { + join(|| (), || panic!("Hello, world!")); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_both() { + join(|| panic!("Hello, world!"), || panic!("Goodbye, world!")); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_b_still_executes() { + let mut x = false; + match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) { + Ok(_) => panic!("failed to propagate panic from closure A,"), + Err(_) => assert!(x, "closure b failed to execute"), + } +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn join_context_both() { + // If we're not in a pool, both should be marked stolen as they're injected. + let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated()); + assert!(a_migrated); + assert!(b_migrated); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn join_context_neither() { + // If we're already in a 1-thread pool, neither job should be stolen. + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (a_migrated, b_migrated) = + pool.install(|| join_context(|a| a.migrated(), |b| b.migrated())); + assert!(!a_migrated); + assert!(!b_migrated); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn join_context_second() { + use std::sync::Barrier; + + // If we're already in a 2-thread pool, the second job should be stolen. + let barrier = Barrier::new(2); + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let (a_migrated, b_migrated) = pool.install(|| { + join_context( + |a| { + barrier.wait(); + a.migrated() + }, + |b| { + barrier.wait(); + b.migrated() + }, + ) + }); + assert!(!a_migrated); + assert!(b_migrated); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn join_counter_overflow() { + const MAX: u32 = 500_000; + + let mut i = 0; + let mut j = 0; + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + + // Hammer on join a bunch of times -- used to hit overflow debug-assertions + // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797 + for _ in 0..MAX { + pool.join(|| i += 1, || j += 1); + } + + assert_eq!(i, MAX); + assert_eq!(j, MAX); +} diff --git a/compiler/rustc_thread_pool/src/latch.rs b/compiler/rustc_thread_pool/src/latch.rs new file mode 100644 index 00000000000..8903942a8ce --- /dev/null +++ b/compiler/rustc_thread_pool/src/latch.rs @@ -0,0 +1,459 @@ +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; + +use crate::registry::{Registry, WorkerThread}; + +/// We define various kinds of latches, which are all a primitive signaling +/// mechanism. A latch starts as false. Eventually someone calls `set()` and +/// it becomes true. You can test if it has been set by calling `probe()`. +/// +/// Some kinds of latches, but not all, support a `wait()` operation +/// that will wait until the latch is set, blocking efficiently. That +/// is not part of the trait since it is not possibly to do with all +/// latches. +/// +/// The intention is that `set()` is called once, but `probe()` may be +/// called any number of times. Once `probe()` returns true, the memory +/// effects that occurred before `set()` become visible. +/// +/// It'd probably be better to refactor the API into two paired types, +/// but that's a bit of work, and this is not a public API. +/// +/// ## Memory ordering +/// +/// Latches need to guarantee two things: +/// +/// - Once `probe()` returns true, all memory effects from the `set()` +/// are visible (in other words, the set should synchronize-with +/// the probe). +/// - Once `set()` occurs, the next `probe()` *will* observe it. This +/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep +/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. +pub(super) trait Latch { + /// Set the latch, signalling others. + /// + /// # WARNING + /// + /// Setting a latch triggers other threads to wake up and (in some + /// cases) complete. This may, in turn, cause memory to be + /// deallocated and so forth. One must be very careful about this, + /// and it's typically better to read all the fields you will need + /// to access *before* a latch is set! + /// + /// This function operates on `*const Self` instead of `&self` to allow it + /// to become dangling during this call. The caller must ensure that the + /// pointer is valid upon entry, and not invalidated during the call by any + /// actions other than `set` itself. + unsafe fn set(this: *const Self); +} + +pub(super) trait AsCoreLatch { + fn as_core_latch(&self) -> &CoreLatch; +} + +/// Latch is not set, owning thread is awake +const UNSET: usize = 0; + +/// Latch is not set, owning thread is going to sleep on this latch +/// (but has not yet fallen asleep). +const SLEEPY: usize = 1; + +/// Latch is not set, owning thread is asleep on this latch and +/// must be awoken. +const SLEEPING: usize = 2; + +/// Latch is set. +const SET: usize = 3; + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +#[derive(Debug)] +pub(super) struct CoreLatch { + state: AtomicUsize, +} + +impl CoreLatch { + #[inline] + fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// Invoked by owning thread as it prepares to sleep. Returns true + /// if the owning thread may proceed to fall asleep, false if the + /// latch was set in the meantime. + #[inline] + pub(super) fn get_sleepy(&self) -> bool { + self.state + .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn fall_asleep(&self) -> bool { + self.state + .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + } + + /// Invoked by owning thread as it falls asleep sleep. Returns + /// true if the owning thread should block, or false if the latch + /// was set in the meantime. + #[inline] + pub(super) fn wake_up(&self) { + if !self.probe() { + let _ = + self.state + .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); + } + } + + /// Set the latch. If this returns true, the owning thread was sleeping + /// and must be awoken. + /// + /// This is private because, typically, setting a latch involves + /// doing some wakeups; those are encapsulated in the surrounding + /// latch code. + #[inline] + unsafe fn set(this: *const Self) -> bool { + let old_state = (*this).state.swap(SET, Ordering::AcqRel); + old_state == SLEEPING + } + + /// Test if this latch has been set. + #[inline] + pub(super) fn probe(&self) -> bool { + self.state.load(Ordering::Acquire) == SET + } +} + +impl AsCoreLatch for CoreLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + self + } +} + +/// Spin latches are the simplest, most efficient kind, but they do +/// not support a `wait()` operation. They just have a boolean flag +/// that becomes true when `set()` is called. +pub(super) struct SpinLatch<'r> { + core_latch: CoreLatch, + registry: &'r Arc<Registry>, + target_worker_index: usize, + cross: bool, +} + +impl<'r> SpinLatch<'r> { + /// Creates a new spin latch that is owned by `thread`. This means + /// that `thread` is the only thread that should be blocking on + /// this latch -- it also means that when the latch is set, we + /// will wake `thread` if it is sleeping. + #[inline] + pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + core_latch: CoreLatch::new(), + registry: thread.registry(), + target_worker_index: thread.index(), + cross: false, + } + } + + /// Creates a new spin latch for cross-threadpool blocking. Notably, we + /// need to make sure the registry is kept alive after setting, so we can + /// safely call the notification. + #[inline] + pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { + SpinLatch { + cross: true, + ..SpinLatch::new(thread) + } + } + + #[inline] + pub(super) fn probe(&self) -> bool { + self.core_latch.probe() + } +} + +impl<'r> AsCoreLatch for SpinLatch<'r> { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +impl<'r> Latch for SpinLatch<'r> { + #[inline] + unsafe fn set(this: *const Self) { + let cross_registry; + + let registry: &Registry = if (*this).cross { + // Ensure the registry stays alive while we notify it. + // Otherwise, it would be possible that we set the spin + // latch and the other thread sees it and exits, causing + // the registry to be deallocated, all before we get a + // chance to invoke `registry.notify_worker_latch_is_set`. + cross_registry = Arc::clone((*this).registry); + &cross_registry + } else { + // If this is not a "cross-registry" spin-latch, then the + // thread which is performing `set` is itself ensuring + // that the registry stays alive. However, that doesn't + // include this *particular* `Arc` handle if the waiting + // thread then exits, so we must completely dereference it. + (*this).registry + }; + let target_worker_index = (*this).target_worker_index; + + // NOTE: Once we `set`, the target may proceed and invalidate `this`! + if CoreLatch::set(&(*this).core_latch) { + // Subtle: at this point, we can no longer read from + // `self`, because the thread owning this spin latch may + // have awoken and deallocated the latch. Therefore, we + // only use fields whose values we already read. + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +/// A Latch starts as false and eventually becomes true. You can block +/// until it becomes true. +#[derive(Debug)] +pub(super) struct LockLatch { + m: Mutex<bool>, + v: Condvar, +} + +impl LockLatch { + #[inline] + pub(super) fn new() -> LockLatch { + LockLatch { + m: Mutex::new(false), + v: Condvar::new(), + } + } + + /// Block until latch is set, then resets this lock latch so it can be reused again. + pub(super) fn wait_and_reset(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + *guard = false; + } + + /// Block until latch is set. + pub(super) fn wait(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + } +} + +impl Latch for LockLatch { + #[inline] + unsafe fn set(this: *const Self) { + let mut guard = (*this).m.lock().unwrap(); + *guard = true; + (*this).v.notify_all(); + } +} + +/// Once latches are used to implement one-time blocking, primarily +/// for the termination flag of the threads in the pool. +/// +/// Note: like a `SpinLatch`, once-latches are always associated with +/// some registry that is probing them, which must be tickled when +/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a +/// reference to that registry. This is because in some cases the +/// registry owns the once-latch, and that would create a cycle. So a +/// `OnceLatch` must be given a reference to its owning registry when +/// it is set. For this reason, it does not implement the `Latch` +/// trait (but it doesn't have to, as it is not used in those generic +/// contexts). +#[derive(Debug)] +pub(super) struct OnceLatch { + core_latch: CoreLatch, +} + +impl OnceLatch { + #[inline] + pub(super) fn new() -> OnceLatch { + Self { + core_latch: CoreLatch::new(), + } + } + + /// Set the latch, then tickle the specific worker thread, + /// which should be the one that owns this latch. + #[inline] + pub(super) unsafe fn set_and_tickle_one( + this: *const Self, + registry: &Registry, + target_worker_index: usize, + ) { + if CoreLatch::set(&(*this).core_latch) { + registry.notify_worker_latch_is_set(target_worker_index); + } + } +} + +impl AsCoreLatch for OnceLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + &self.core_latch + } +} + +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. +#[derive(Debug)] +pub(super) struct CountLatch { + counter: AtomicUsize, + kind: CountLatchKind, +} + +enum CountLatchKind { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CoreLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc<Registry>, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: LockLatch }, +} + +impl std::fmt::Debug for CountLatchKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CountLatchKind::Stealing { latch, .. } => { + f.debug_tuple("Stealing").field(latch).finish() + } + CountLatchKind::Blocking { latch, .. } => { + f.debug_tuple("Blocking").field(latch).finish() + } + } + } +} + +impl CountLatch { + pub(super) fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + Self { + counter: AtomicUsize::new(count), + kind: match owner { + Some(owner) => CountLatchKind::Stealing { + latch: CoreLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => CountLatchKind::Blocking { + latch: LockLatch::new(), + }, + }, + } + } + + #[inline] + pub(super) fn increment(&self) { + let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); + debug_assert!(old_counter != 0); + } + + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match &self.kind { + CountLatchKind::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + CountLatchKind::Blocking { latch } => latch.wait(), + } + } +} + +impl Latch for CountLatch { + #[inline] + unsafe fn set(this: *const Self) { + if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { + // NOTE: Once we call `set` on the internal `latch`, + // the target may proceed and invalidate `this`! + match (*this).kind { + CountLatchKind::Stealing { + ref latch, + ref registry, + worker_index, + } => { + let registry = Arc::clone(registry); + if CoreLatch::set(latch) { + registry.notify_worker_latch_is_set(worker_index); + } + } + CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), + } + } + } +} + +/// `&L` without any implication of `dereferenceable` for `Latch::set` +pub(super) struct LatchRef<'a, L> { + inner: *const L, + marker: PhantomData<&'a L>, +} + +impl<L> LatchRef<'_, L> { + pub(super) fn new(inner: &L) -> LatchRef<'_, L> { + LatchRef { + inner, + marker: PhantomData, + } + } +} + +unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} + +impl<L> Deref for LatchRef<'_, L> { + type Target = L; + + fn deref(&self) -> &L { + // SAFETY: if we have &self, the inner latch is still alive + unsafe { &*self.inner } + } +} + +impl<L: Latch> Latch for LatchRef<'_, L> { + #[inline] + unsafe fn set(this: *const Self) { + L::set((*this).inner); + } +} diff --git a/compiler/rustc_thread_pool/src/lib.rs b/compiler/rustc_thread_pool/src/lib.rs new file mode 100644 index 00000000000..72064547e52 --- /dev/null +++ b/compiler/rustc_thread_pool/src/lib.rs @@ -0,0 +1,922 @@ +//! Rayon-core houses the core stable APIs of Rayon. +//! +//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there. +//! +//! [`join`] is used to take two closures and potentially run them in parallel. +//! - It will run in parallel if task B gets stolen before task A can finish. +//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B. +//! +//! [`scope`] creates a scope in which you can run any number of parallel tasks. +//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed. +//! The scope will exist until all tasks spawned within the scope have been completed. +//! +//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function. +//! +//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one. +//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque, +//! where it becomes available for work stealing from other threads in the local threadpool. +//! +//! [`join`]: fn.join.html +//! [`scope`]: fn.scope.html +//! [`scope()`]: fn.scope.html +//! [`spawn`]: fn.spawn.html +//! [`ThreadPool`]: struct.threadpool.html +//! [`install()`]: struct.ThreadPool.html#method.install +//! [`spawn()`]: struct.ThreadPool.html#method.spawn +//! [`join()`]: struct.ThreadPool.html#method.join +//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +//! +//! # Global fallback when threading is unsupported +//! +//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that +//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi` +//! targets are notable examples of this. Rather than panicking on the unsupported error when +//! creating the implicit global threadpool, Rayon configures a fallback mode instead. +//! +//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting +//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since +//! there is no other thread to share the work. However, since the pool is not running independent +//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower- +//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate +//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local` +//! can also volunteer execution time. +//! +//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback. +//! +//! # Restricting multiple versions +//! +//! In order to ensure proper coordination between threadpools, and especially +//! to make sure there's only one global threadpool, `rayon-core` is actively +//! restricted from building multiple versions of itself into a single target. +//! You may see a build error like this in violation: +//! +//! ```text +//! error: native library `rayon-core` is being linked to by more +//! than one package, and can only be linked to by one package +//! ``` +//! +//! While we strive to keep `rayon-core` semver-compatible, it's still +//! possible to arrive at this situation if different crates have overly +//! restrictive tilde or inequality requirements for `rayon-core`. The +//! conflicting requirements will need to be resolved before the build will +//! succeed. + +#![warn(rust_2018_idioms)] + +use std::any::Any; +use std::env; +use std::error::Error; +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::str::FromStr; +use std::thread; + +#[macro_use] +mod private; + +mod broadcast; +mod job; +mod join; +mod latch; +mod registry; +mod scope; +mod sleep; +mod spawn; +mod thread_pool; +mod unwind; +mod worker_local; + +mod compile_fail; +mod test; + +pub mod tlv; + +pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; +pub use self::join::{join, join_context}; +pub use self::registry::ThreadBuilder; +pub use self::registry::{mark_blocked, mark_unblocked, Registry}; +pub use self::scope::{in_place_scope, scope, Scope}; +pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; +pub use self::spawn::{spawn, spawn_fifo}; +pub use self::thread_pool::current_thread_has_pending_tasks; +pub use self::thread_pool::current_thread_index; +pub use self::thread_pool::ThreadPool; +pub use self::thread_pool::{yield_local, yield_now, Yield}; +pub use worker_local::WorkerLocal; + +use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; + +/// Returns the maximum number of threads that Rayon supports in a single thread-pool. +/// +/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting +/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum. +/// +/// The value may vary between different targets, and is subject to change in new Rayon versions. +pub fn max_num_threads() -> usize { + // We are limited by the bits available in the sleep counter's `AtomicUsize`. + crate::sleep::THREADS_MAX +} + +/// Returns the number of threads in the current registry. If this +/// code is executing within a Rayon thread-pool, then this will be +/// the number of threads for the thread-pool of the current +/// thread. Otherwise, it will be the number of threads for the global +/// thread-pool. +/// +/// This can be useful when trying to judge how many times to split +/// parallel work (the parallel iterator traits use this value +/// internally for this purpose). +/// +/// # Future compatibility note +/// +/// Note that unless this thread-pool was created with a +/// builder that specifies the number of threads, then this +/// number may vary over time in future versions (see [the +/// `num_threads()` method for details][snt]). +/// +/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads +pub fn current_num_threads() -> usize { + crate::registry::Registry::current_num_threads() +} + +/// Error when initializing a thread pool. +#[derive(Debug)] +pub struct ThreadPoolBuildError { + kind: ErrorKind, +} + +#[derive(Debug)] +enum ErrorKind { + GlobalPoolAlreadyInitialized, + IOError(io::Error), +} + +/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool. +/// ## Creating a ThreadPool +/// The following creates a thread pool with 22 threads. +/// +/// ```rust +/// # use rayon_core as rayon; +/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap(); +/// ``` +/// +/// To instead configure the global thread pool, use [`build_global()`]: +/// +/// ```rust +/// # use rayon_core as rayon; +/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); +/// ``` +/// +/// [`ThreadPool`]: struct.ThreadPool.html +/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global +pub struct ThreadPoolBuilder<S = DefaultSpawn> { + /// The number of threads in the rayon thread pool. + /// If zero will use the RAYON_NUM_THREADS environment variable. + /// If RAYON_NUM_THREADS is invalid or zero will use the default. + num_threads: usize, + + /// Custom closure, if any, to handle a panic that we cannot propagate + /// anywhere else. + panic_handler: Option<Box<PanicHandler>>, + + /// Closure to compute the name of a thread. + get_thread_name: Option<Box<dyn FnMut(usize) -> String>>, + + /// The stack size for the created worker threads + stack_size: Option<usize>, + + /// Closure invoked on deadlock. + deadlock_handler: Option<Box<DeadlockHandler>>, + + /// Closure invoked on worker thread start. + start_handler: Option<Box<StartHandler>>, + + /// Closure invoked on worker thread exit. + exit_handler: Option<Box<ExitHandler>>, + + /// Closure invoked to spawn threads. + spawn_handler: S, + + /// Closure invoked when starting computations in a thread. + acquire_thread_handler: Option<Box<AcquireThreadHandler>>, + + /// Closure invoked when blocking in a thread. + release_thread_handler: Option<Box<ReleaseThreadHandler>>, + + /// If false, worker threads will execute spawned jobs in a + /// "depth-first" fashion. If true, they will do a "breadth-first" + /// fashion. Depth-first is the default. + breadth_first: bool, +} + +/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. +/// +/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +#[deprecated(note = "Use `ThreadPoolBuilder`")] +#[derive(Default)] +pub struct Configuration { + builder: ThreadPoolBuilder, +} + +/// The type for a panic handling closure. Note that this same closure +/// may be invoked multiple times in parallel. +type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; + +/// The type for a closure that gets invoked when the Rayon thread pool deadlocks +type DeadlockHandler = dyn Fn() + Send + Sync; + +/// The type for a closure that gets invoked when a thread starts. The +/// closure is passed the index of the thread on which it is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type StartHandler = dyn Fn(usize) + Send + Sync; + +/// The type for a closure that gets invoked when a thread exits. The +/// closure is passed the index of the thread on which it is invoked. +/// Note that this same closure may be invoked multiple times in parallel. +type ExitHandler = dyn Fn(usize) + Send + Sync; + +// NB: We can't `#[derive(Default)]` because `S` is left ambiguous. +impl Default for ThreadPoolBuilder { + fn default() -> Self { + ThreadPoolBuilder { + num_threads: 0, + panic_handler: None, + get_thread_name: None, + stack_size: None, + start_handler: None, + exit_handler: None, + deadlock_handler: None, + acquire_thread_handler: None, + release_thread_handler: None, + spawn_handler: DefaultSpawn, + breadth_first: false, + } + } +} + +/// The type for a closure that gets invoked before starting computations in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type AcquireThreadHandler = dyn Fn() + Send + Sync; + +/// The type for a closure that gets invoked before blocking in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type ReleaseThreadHandler = dyn Fn() + Send + Sync; + +impl ThreadPoolBuilder { + /// Creates and returns a valid rayon thread pool builder, but does not initialize it. + pub fn new() -> Self { + Self::default() + } +} + +/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the +/// default spawn and those set by [`spawn_handler`](#method.spawn_handler). +impl<S> ThreadPoolBuilder<S> +where + S: ThreadSpawn, +{ + /// Creates a new `ThreadPool` initialized using this configuration. + pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> { + ThreadPool::build(self) + } + + /// Initializes the global thread pool. This initialization is + /// **optional**. If you do not call this function, the thread pool + /// will be automatically initialized with the default + /// configuration. Calling `build_global` is not recommended, except + /// in two scenarios: + /// + /// - You wish to change the default configuration. + /// - You are running a benchmark, in which case initializing may + /// yield slightly more consistent results, since the worker threads + /// will already be ready to go even in the first iteration. But + /// this cost is minimal. + /// + /// Initialization of the global thread pool happens exactly + /// once. Once started, the configuration cannot be + /// changed. Therefore, if you call `build_global` a second time, it + /// will return an error. An `Ok` result indicates that this + /// is the first initialization of the thread pool. + pub fn build_global(self) -> Result<(), ThreadPoolBuildError> { + let registry = registry::init_global_registry(self)?; + registry.wait_until_primed(); + Ok(()) + } +} + +impl ThreadPoolBuilder { + /// Creates a scoped `ThreadPool` initialized using this configuration. + /// + /// This is a convenience function for building a pool using [`std::thread::scope`] + /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). + /// The threads in this pool will start by calling `wrapper`, which should + /// do initialization and continue by calling `ThreadBuilder::run()`. + /// + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html + /// + /// # Examples + /// + /// A scoped pool may be useful in combination with scoped thread-local variables. + /// + /// ``` + /// # use rayon_core as rayon; + /// + /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>); + /// + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool_data = vec![1, 2, 3]; + /// + /// // We haven't assigned any TLS data yet. + /// assert!(!POOL_DATA.is_set()); + /// + /// rayon::ThreadPoolBuilder::new() + /// .build_scoped( + /// // Borrow `pool_data` in TLS for each thread. + /// |thread| POOL_DATA.set(&pool_data, || thread.run()), + /// // Do some work that needs the TLS data. + /// |pool| pool.install(|| assert!(POOL_DATA.is_set())), + /// )?; + /// + /// // Once we've returned, `pool_data` is no longer borrowed. + /// drop(pool_data); + /// Ok(()) + /// } + /// ``` + pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> + where + W: Fn(ThreadBuilder) + Sync, // expected to call `run()` + F: FnOnce(&ThreadPool) -> R, + { + std::thread::scope(|scope| { + let pool = self + .spawn_handler(|thread| { + let mut builder = std::thread::Builder::new(); + if let Some(name) = thread.name() { + builder = builder.name(name.to_string()); + } + if let Some(size) = thread.stack_size() { + builder = builder.stack_size(size); + } + builder.spawn_scoped(scope, || wrapper(thread))?; + Ok(()) + }) + .build()?; + let result = unwind::halt_unwinding(|| with_pool(&pool)); + pool.wait_until_stopped(); + match result { + Ok(result) => Ok(result), + Err(err) => unwind::resume_unwinding(err), + } + }) + } +} + +impl<S> ThreadPoolBuilder<S> { + /// Sets a custom function for spawning threads. + /// + /// Note that the threads will not exit until after the pool is dropped. It + /// is up to the caller to wait for thread termination if that is important + /// for any invariants. For instance, threads created in [`std::thread::scope`] + /// will be joined before that scope returns, and this will block indefinitely + /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate + /// until the entire process exits! + /// + /// # Examples + /// + /// A minimal spawn handler just needs to call `run()` from an independent thread. + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// std::thread::spawn(|| thread.run()); + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my custom thread!")); + /// Ok(()) + /// } + /// ``` + /// + /// The default spawn handler sets the name and stack size if given, and propagates + /// any errors from the thread builder. + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// let mut b = std::thread::Builder::new(); + /// if let Some(name) = thread.name() { + /// b = b.name(name.to_owned()); + /// } + /// if let Some(stack_size) = thread.stack_size() { + /// b = b.stack_size(stack_size); + /// } + /// b.spawn(|| thread.run())?; + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my fully custom thread!")); + /// Ok(()) + /// } + /// ``` + /// + /// This can also be used for a pool of scoped threads like [`crossbeam::scope`], + /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in + /// [`build_scoped`](#method.build_scoped). + /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html + /// + /// ``` + /// # use rayon_core as rayon; + /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { + /// std::thread::scope(|scope| { + /// let pool = rayon::ThreadPoolBuilder::new() + /// .spawn_handler(|thread| { + /// let mut builder = std::thread::Builder::new(); + /// if let Some(name) = thread.name() { + /// builder = builder.name(name.to_string()); + /// } + /// if let Some(size) = thread.stack_size() { + /// builder = builder.stack_size(size); + /// } + /// builder.spawn_scoped(scope, || { + /// // Add any scoped initialization here, then run! + /// thread.run() + /// })?; + /// Ok(()) + /// }) + /// .build()?; + /// + /// pool.install(|| println!("Hello from my custom scoped thread!")); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> + where + F: FnMut(ThreadBuilder) -> io::Result<()>, + { + ThreadPoolBuilder { + spawn_handler: CustomSpawn::new(spawn), + // ..self + num_threads: self.num_threads, + panic_handler: self.panic_handler, + get_thread_name: self.get_thread_name, + stack_size: self.stack_size, + start_handler: self.start_handler, + exit_handler: self.exit_handler, + deadlock_handler: self.deadlock_handler, + acquire_thread_handler: self.acquire_thread_handler, + release_thread_handler: self.release_thread_handler, + breadth_first: self.breadth_first, + } + } + + /// Returns a reference to the current spawn handler. + fn get_spawn_handler(&mut self) -> &mut S { + &mut self.spawn_handler + } + + /// Get the number of threads that will be used for the thread + /// pool. See `num_threads()` for more information. + fn get_num_threads(&self) -> usize { + if self.num_threads > 0 { + self.num_threads + } else { + let default = || { + thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }; + + match env::var("RAYON_NUM_THREADS") + .ok() + .and_then(|s| usize::from_str(&s).ok()) + { + Some(x @ 1..) => return x, + Some(0) => return default(), + _ => {} + } + + // Support for deprecated `RAYON_RS_NUM_CPUS`. + match env::var("RAYON_RS_NUM_CPUS") + .ok() + .and_then(|s| usize::from_str(&s).ok()) + { + Some(x @ 1..) => x, + _ => default(), + } + } + } + + /// Get the thread name for the thread with the given index. + fn get_thread_name(&mut self, index: usize) -> Option<String> { + let f = self.get_thread_name.as_mut()?; + Some(f(index)) + } + + /// Sets a closure which takes a thread index and returns + /// the thread's name. + pub fn thread_name<F>(mut self, closure: F) -> Self + where + F: FnMut(usize) -> String + 'static, + { + self.get_thread_name = Some(Box::new(closure)); + self + } + + /// Sets the number of threads to be used in the rayon threadpool. + /// + /// If you specify a non-zero number of threads using this + /// function, then the resulting thread-pools are guaranteed to + /// start at most this number of threads. + /// + /// If `num_threads` is 0, or you do not call this function, then + /// the Rayon runtime will select the number of threads + /// automatically. At present, this is based on the + /// `RAYON_NUM_THREADS` environment variable (if set), + /// or the number of logical CPUs (otherwise). + /// In the future, however, the default behavior may + /// change to dynamically add or remove threads as needed. + /// + /// **Future compatibility warning:** Given the default behavior + /// may change in the future, if you wish to rely on a fixed + /// number of threads, you should use this function to specify + /// that number. To reproduce the current default behavior, you + /// may wish to use [`std::thread::available_parallelism`] + /// to query the number of CPUs dynamically. + /// + /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one + /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment + /// variable. If both variables are specified, `RAYON_NUM_THREADS` will + /// be preferred. + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = num_threads; + self + } + + /// Returns a copy of the current panic handler. + fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> { + self.panic_handler.take() + } + + /// Normally, whenever Rayon catches a panic, it tries to + /// propagate it to someplace sensible, to try and reflect the + /// semantics of sequential execution. But in some cases, + /// particularly with the `spawn()` APIs, there is no + /// obvious place where we should propagate the panic to. + /// In that case, this panic handler is invoked. + /// + /// If no panic handler is set, the default is to abort the + /// process, under the principle that panics should not go + /// unobserved. + /// + /// If the panic handler itself panics, this will abort the + /// process. To prevent this, wrap the body of your panic handler + /// in a call to `std::panic::catch_unwind()`. + pub fn panic_handler<H>(mut self, panic_handler: H) -> Self + where + H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, + { + self.panic_handler = Some(Box::new(panic_handler)); + self + } + + /// Get the stack size of the worker threads + fn get_stack_size(&self) -> Option<usize> { + self.stack_size + } + + /// Sets the stack size of the worker threads + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// **(DEPRECATED)** Suggest to worker threads that they execute + /// spawned jobs in a "breadth-first" fashion. + /// + /// Typically, when a worker thread is idle or blocked, it will + /// attempt to execute the job from the *top* of its local deque of + /// work (i.e., the job most recently spawned). If this flag is set + /// to true, however, workers will prefer to execute in a + /// *breadth-first* fashion -- that is, they will search for jobs at + /// the *bottom* of their local deque. (At present, workers *always* + /// steal from the bottom of other workers' deques, regardless of + /// the setting of this flag.) + /// + /// If you think of the tasks as a tree, where a parent task + /// spawns its children in the tree, then this flag loosely + /// corresponds to doing a breadth-first traversal of the tree, + /// whereas the default would be to do a depth-first traversal. + /// + /// **Note that this is an "execution hint".** Rayon's task + /// execution is highly dynamic and the precise order in which + /// independent tasks are executed is not intended to be + /// guaranteed. + /// + /// This `breadth_first()` method is now deprecated per [RFC #1], + /// and in the future its effect may be removed. Consider using + /// [`scope_fifo()`] for a similar effect. + /// + /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md + /// [`scope_fifo()`]: fn.scope_fifo.html + #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")] + pub fn breadth_first(mut self) -> Self { + self.breadth_first = true; + self + } + + fn get_breadth_first(&self) -> bool { + self.breadth_first + } + + /// Takes the current acquire thread callback, leaving `None`. + fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> { + self.acquire_thread_handler.take() + } + + /// Set a callback to be invoked when starting computations in a thread. + pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); + self + } + + /// Takes the current release thread callback, leaving `None`. + fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> { + self.release_thread_handler.take() + } + + /// Set a callback to be invoked when blocking in thread. + pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.release_thread_handler = Some(Box::new(release_thread_handler)); + self + } + + /// Takes the current deadlock callback, leaving `None`. + fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> { + self.deadlock_handler.take() + } + + /// Set a callback to be invoked on current deadlock. + pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.deadlock_handler = Some(Box::new(deadlock_handler)); + self + } + + /// Takes the current thread start callback, leaving `None`. + fn take_start_handler(&mut self) -> Option<Box<StartHandler>> { + self.start_handler.take() + } + + /// Sets a callback to be invoked on thread start. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + /// If that handler returns, then startup will continue normally. + pub fn start_handler<H>(mut self, start_handler: H) -> Self + where + H: Fn(usize) + Send + Sync + 'static, + { + self.start_handler = Some(Box::new(start_handler)); + self + } + + /// Returns a current thread exit callback, leaving `None`. + fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> { + self.exit_handler.take() + } + + /// Sets a callback to be invoked on thread exit. + /// + /// The closure is passed the index of the thread on which it is invoked. + /// Note that this same closure may be invoked multiple times in parallel. + /// If this closure panics, the panic will be passed to the panic handler. + /// If that handler returns, then the thread will exit normally. + pub fn exit_handler<H>(mut self, exit_handler: H) -> Self + where + H: Fn(usize) + Send + Sync + 'static, + { + self.exit_handler = Some(Box::new(exit_handler)); + self + } +} + +#[allow(deprecated)] +impl Configuration { + /// Creates and return a valid rayon thread pool configuration, but does not initialize it. + pub fn new() -> Configuration { + Configuration { + builder: ThreadPoolBuilder::new(), + } + } + + /// Deprecated in favor of `ThreadPoolBuilder::build`. + pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> { + self.builder.build().map_err(Box::from) + } + + /// Deprecated in favor of `ThreadPoolBuilder::thread_name`. + pub fn thread_name<F>(mut self, closure: F) -> Self + where + F: FnMut(usize) -> String + 'static, + { + self.builder = self.builder.thread_name(closure); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::num_threads`. + pub fn num_threads(mut self, num_threads: usize) -> Configuration { + self.builder = self.builder.num_threads(num_threads); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`. + pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration + where + H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, + { + self.builder = self.builder.panic_handler(panic_handler); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::stack_size`. + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.builder = self.builder.stack_size(stack_size); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`. + pub fn breadth_first(mut self) -> Self { + self.builder = self.builder.breadth_first(); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::start_handler`. + pub fn start_handler<H>(mut self, start_handler: H) -> Configuration + where + H: Fn(usize) + Send + Sync + 'static, + { + self.builder = self.builder.start_handler(start_handler); + self + } + + /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`. + pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration + where + H: Fn(usize) + Send + Sync + 'static, + { + self.builder = self.builder.exit_handler(exit_handler); + self + } + + /// Returns a ThreadPoolBuilder with identical parameters. + fn into_builder(self) -> ThreadPoolBuilder { + self.builder + } +} + +impl ThreadPoolBuildError { + fn new(kind: ErrorKind) -> ThreadPoolBuildError { + ThreadPoolBuildError { kind } + } + + fn is_unsupported(&self) -> bool { + matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported) + } +} + +const GLOBAL_POOL_ALREADY_INITIALIZED: &str = + "The global thread pool has already been initialized."; + +impl Error for ThreadPoolBuildError { + #[allow(deprecated)] + fn description(&self) -> &str { + match self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, + ErrorKind::IOError(ref e) => e.description(), + } + } + + fn source(&self) -> Option<&(dyn Error + 'static)> { + match &self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => None, + ErrorKind::IOError(e) => Some(e), + } + } +} + +impl fmt::Display for ThreadPoolBuildError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.kind { + ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), + ErrorKind::IOError(e) => e.fmt(f), + } + } +} + +/// Deprecated in favor of `ThreadPoolBuilder::build_global`. +#[deprecated(note = "use `ThreadPoolBuilder::build_global`")] +#[allow(deprecated)] +pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> { + config.into_builder().build_global().map_err(Box::from) +} + +impl<S> fmt::Debug for ThreadPoolBuilder<S> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ThreadPoolBuilder { + ref num_threads, + ref get_thread_name, + ref panic_handler, + ref stack_size, + ref deadlock_handler, + ref start_handler, + ref exit_handler, + ref acquire_thread_handler, + ref release_thread_handler, + spawn_handler: _, + ref breadth_first, + } = *self; + + // Just print `Some(<closure>)` or `None` to the debug + // output. + struct ClosurePlaceholder; + impl fmt::Debug for ClosurePlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("<closure>") + } + } + let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); + let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); + let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); + let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); + let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); + + f.debug_struct("ThreadPoolBuilder") + .field("num_threads", num_threads) + .field("get_thread_name", &get_thread_name) + .field("panic_handler", &panic_handler) + .field("stack_size", &stack_size) + .field("deadlock_handler", &deadlock_handler) + .field("start_handler", &start_handler) + .field("exit_handler", &exit_handler) + .field("acquire_thread_handler", &acquire_thread_handler) + .field("release_thread_handler", &release_thread_handler) + .field("breadth_first", &breadth_first) + .finish() + } +} + +#[allow(deprecated)] +impl fmt::Debug for Configuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.builder.fmt(f) + } +} + +/// Provides the calling context to a closure called by `join_context`. +#[derive(Debug)] +pub struct FnContext { + migrated: bool, + + /// disable `Send` and `Sync`, just for a little future-proofing. + _marker: PhantomData<*mut ()>, +} + +impl FnContext { + #[inline] + fn new(migrated: bool) -> Self { + FnContext { + migrated, + _marker: PhantomData, + } + } +} + +impl FnContext { + /// Returns `true` if the closure was called from a different thread + /// than it was provided from. + #[inline] + pub fn migrated(&self) -> bool { + self.migrated + } +} diff --git a/compiler/rustc_thread_pool/src/private.rs b/compiler/rustc_thread_pool/src/private.rs new file mode 100644 index 00000000000..c85e77b9cbb --- /dev/null +++ b/compiler/rustc_thread_pool/src/private.rs @@ -0,0 +1,26 @@ +//! The public parts of this private module are used to create traits +//! that cannot be implemented outside of our own crate. This way we +//! can feel free to extend those traits without worrying about it +//! being a breaking change for other implementations. + +/// If this type is pub but not publicly reachable, third parties +/// can't name it and can't implement traits using it. +#[allow(missing_debug_implementations)] +pub struct PrivateMarker; + +macro_rules! private_decl { + () => { + /// This trait is private; this method exists to make it + /// impossible to implement outside the crate. + #[doc(hidden)] + fn __rayon_private__(&self) -> crate::private::PrivateMarker; + }; +} + +macro_rules! private_impl { + () => { + fn __rayon_private__(&self) -> crate::private::PrivateMarker { + crate::private::PrivateMarker + } + }; +} diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs new file mode 100644 index 00000000000..781b6827b82 --- /dev/null +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -0,0 +1,1048 @@ +use crate::job::{JobFifo, JobRef, StackJob}; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; +use crate::sleep::Sleep; +use crate::tlv::Tlv; +use crate::unwind; +use crate::{ + AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, + ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, +}; +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use std::cell::Cell; +use std::collections::hash_map::DefaultHasher; +use std::fmt; +use std::hash::Hasher; +use std::io; +use std::mem; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; +use std::thread; + +/// Thread builder used for customization via +/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). +pub struct ThreadBuilder { + name: Option<String>, + stack_size: Option<usize>, + worker: Worker<JobRef>, + stealer: Stealer<JobRef>, + registry: Arc<Registry>, + index: usize, +} + +impl ThreadBuilder { + /// Gets the index of this thread in the pool, within `0..num_threads`. + pub fn index(&self) -> usize { + self.index + } + + /// Gets the string that was specified by `ThreadPoolBuilder::name()`. + pub fn name(&self) -> Option<&str> { + self.name.as_deref() + } + + /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. + pub fn stack_size(&self) -> Option<usize> { + self.stack_size + } + + /// Executes the main loop for this thread. This will not return until the + /// thread pool is dropped. + pub fn run(self) { + unsafe { main_loop(self) } + } +} + +impl fmt::Debug for ThreadBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadBuilder") + .field("pool", &self.registry.id()) + .field("index", &self.index) + .field("name", &self.name) + .field("stack_size", &self.stack_size) + .finish() + } +} + +/// Generalized trait for spawning a thread in the `Registry`. +/// +/// This trait is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +pub trait ThreadSpawn { + private_decl! {} + + /// Spawn a thread with the `ThreadBuilder` parameters, and then + /// call `ThreadBuilder::run()`. + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; +} + +/// Spawns a thread in the "normal" way with `std::thread::Builder`. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug, Default)] +pub struct DefaultSpawn; + +impl ThreadSpawn for DefaultSpawn { + private_impl! {} + + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + let mut b = thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + b.spawn(|| thread.run())?; + Ok(()) + } +} + +/// Spawns a thread with a user's custom callback. +/// +/// This type is pub-in-private -- E0445 forces us to make it public, +/// but we don't actually want to expose these details in the API. +#[derive(Debug)] +pub struct CustomSpawn<F>(F); + +impl<F> CustomSpawn<F> +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + pub(super) fn new(spawn: F) -> Self { + CustomSpawn(spawn) + } +} + +impl<F> ThreadSpawn for CustomSpawn<F> +where + F: FnMut(ThreadBuilder) -> io::Result<()>, +{ + private_impl! {} + + #[inline] + fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + (self.0)(thread) + } +} + +pub struct Registry { + thread_infos: Vec<ThreadInfo>, + sleep: Sleep, + injected_jobs: Injector<JobRef>, + broadcasts: Mutex<Vec<Worker<JobRef>>>, + panic_handler: Option<Box<PanicHandler>>, + pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>, + start_handler: Option<Box<StartHandler>>, + exit_handler: Option<Box<ExitHandler>>, + pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>, + pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>, + + // When this latch reaches 0, it means that all work on this + // registry must be complete. This is ensured in the following ways: + // + // - if this is the global registry, there is a ref-count that never + // gets released. + // - if this is a user-created thread-pool, then so long as the thread-pool + // exists, it holds a reference. + // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, + // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't + // return until the blocking job is complete, that ref will continue to be held. + // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. + // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) + // and that job will keep the pool alive. + terminate_count: AtomicUsize, +} + +/// //////////////////////////////////////////////////////////////////////// +/// Initialization + +static mut THE_REGISTRY: Option<Arc<Registry>> = None; +static THE_REGISTRY_SET: Once = Once::new(); + +/// Starts the worker threads (if that has not already happened). If +/// initialization has not already occurred, use the default +/// configuration. +pub(super) fn global_registry() -> &'static Arc<Registry> { + set_global_registry(default_global_registry) + .or_else(|err| { + // SAFETY: we only create a shared reference to `THE_REGISTRY` after the `call_once` + // that initializes it, and there will be no more mutable accesses at all. + debug_assert!(THE_REGISTRY_SET.is_completed()); + let the_registry = unsafe { &*ptr::addr_of!(THE_REGISTRY) }; + the_registry.as_ref().ok_or(err) + }) + .expect("The global thread pool has not been initialized.") +} + +/// Starts the worker threads (if that has not already happened) with +/// the given builder. +pub(super) fn init_global_registry<S>( + builder: ThreadPoolBuilder<S>, +) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> +where + S: ThreadSpawn, +{ + set_global_registry(|| Registry::new(builder)) +} + +/// Starts the worker threads (if that has not already happened) +/// by creating a registry with the given callback. +fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> +where + F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, +{ + let mut result = Err(ThreadPoolBuildError::new( + ErrorKind::GlobalPoolAlreadyInitialized, + )); + + THE_REGISTRY_SET.call_once(|| { + result = registry().map(|registry: Arc<Registry>| { + // SAFETY: this is the only mutable access to `THE_REGISTRY`, thanks to `Once`, and + // `global_registry()` only takes a shared reference **after** this `call_once`. + unsafe { + ptr::addr_of_mut!(THE_REGISTRY).write(Some(registry)); + (*ptr::addr_of!(THE_REGISTRY)).as_ref().unwrap_unchecked() + } + }) + }); + + result +} + +fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { + let result = Registry::new(ThreadPoolBuilder::new()); + + // If we're running in an environment that doesn't support threads at all, we can fall back to + // using the current thread alone. This is crude, and probably won't work for non-blocking + // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine. + // + // Notably, this allows current WebAssembly targets to work even though their threading support + // is stubbed out, and we won't have to change anything if they do add real threading. + let unsupported = matches!(&result, Err(e) if e.is_unsupported()); + if unsupported && WorkerThread::current().is_null() { + let builder = ThreadPoolBuilder::new() + .num_threads(1) + .spawn_handler(|thread| { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + unsafe { + WorkerThread::set_current(worker_thread); + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + } + + Ok(()) + }); + + let fallback_result = Registry::new(builder); + if fallback_result.is_ok() { + return fallback_result; + } + } + + result +} + +struct Terminator<'a>(&'a Arc<Registry>); + +impl<'a> Drop for Terminator<'a> { + fn drop(&mut self) { + self.0.terminate() + } +} + +impl Registry { + pub(super) fn new<S>( + mut builder: ThreadPoolBuilder<S>, + ) -> Result<Arc<Self>, ThreadPoolBuildError> + where + S: ThreadSpawn, + { + // Soft-limit the number of threads that we can actually support. + let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads()); + + let breadth_first = builder.get_breadth_first(); + + let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = if breadth_first { + Worker::new_fifo() + } else { + Worker::new_lifo() + }; + + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads) + .map(|_| { + let worker = Worker::new_fifo(); + let stealer = worker.stealer(); + (worker, stealer) + }) + .unzip(); + + let registry = Arc::new(Registry { + thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), + sleep: Sleep::new(n_threads), + injected_jobs: Injector::new(), + broadcasts: Mutex::new(broadcasts), + terminate_count: AtomicUsize::new(1), + panic_handler: builder.take_panic_handler(), + deadlock_handler: builder.take_deadlock_handler(), + start_handler: builder.take_start_handler(), + exit_handler: builder.take_exit_handler(), + acquire_thread_handler: builder.take_acquire_thread_handler(), + release_thread_handler: builder.take_release_thread_handler(), + }); + + // If we return early or panic, make sure to terminate existing threads. + let t1000 = Terminator(®istry); + + for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() { + let thread = ThreadBuilder { + name: builder.get_thread_name(index), + stack_size: builder.get_stack_size(), + registry: Arc::clone(®istry), + worker, + stealer, + index, + }; + if let Err(e) = builder.get_spawn_handler().spawn(thread) { + return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); + } + } + + // Returning normally now, without termination. + mem::forget(t1000); + + Ok(registry) + } + + pub fn current() -> Arc<Registry> { + unsafe { + let worker_thread = WorkerThread::current(); + let registry = if worker_thread.is_null() { + global_registry() + } else { + &(*worker_thread).registry + }; + Arc::clone(registry) + } + } + + /// Returns the number of threads in the current registry. This + /// is better than `Registry::current().num_threads()` because it + /// avoids incrementing the `Arc`. + pub(super) fn current_num_threads() -> usize { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + global_registry().num_threads() + } else { + (*worker_thread).registry.num_threads() + } + } + } + + /// Returns the current `WorkerThread` if it's part of this `Registry`. + pub(super) fn current_thread(&self) -> Option<&WorkerThread> { + unsafe { + let worker = WorkerThread::current().as_ref()?; + if worker.registry().id() == self.id() { + Some(worker) + } else { + None + } + } + } + + /// Returns an opaque identifier for this registry. + pub(super) fn id(&self) -> RegistryId { + // We can rely on `self` not to change since we only ever create + // registries that are boxed up in an `Arc` (see `new()` above). + RegistryId { + addr: self as *const Self as usize, + } + } + + pub(super) fn num_threads(&self) -> usize { + self.thread_infos.len() + } + + pub(super) fn catch_unwind(&self, f: impl FnOnce()) { + if let Err(err) = unwind::halt_unwinding(f) { + // If there is no handler, or if that handler itself panics, then we abort. + let abort_guard = unwind::AbortIfPanic; + if let Some(ref handler) = self.panic_handler { + handler(err); + mem::forget(abort_guard); + } + } + } + + /// Waits for the worker threads to get up and running. This is + /// meant to be used for benchmarking purposes, primarily, so that + /// you can get more consistent numbers by having everything + /// "ready to go". + pub(super) fn wait_until_primed(&self) { + for info in &self.thread_infos { + info.primed.wait(); + } + } + + /// Waits for the worker threads to stop. This is used for testing + /// -- so we can check that termination actually works. + pub(super) fn wait_until_stopped(&self) { + self.release_thread(); + for info in &self.thread_infos { + info.stopped.wait(); + } + self.acquire_thread(); + } + + pub(crate) fn acquire_thread(&self) { + if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { + acquire_thread_handler(); + } + } + + pub(crate) fn release_thread(&self) { + if let Some(ref release_thread_handler) = self.release_thread_handler { + release_thread_handler(); + } + } + + /// //////////////////////////////////////////////////////////////////////// + /// MAIN LOOP + /// + /// So long as all of the worker threads are hanging out in their + /// top-level loop, there is no work to be done. + + /// Push a job into the given `registry`. If we are running on a + /// worker thread for the registry, this will push onto the + /// deque. Else, it will inject from the outside (which is slower). + pub(super) fn inject_or_push(&self, job_ref: JobRef) { + let worker_thread = WorkerThread::current(); + unsafe { + if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { + (*worker_thread).push(job_ref); + } else { + self.inject(job_ref); + } + } + } + + /// Push a job into the "external jobs" queue; it will be taken by + /// whatever worker has nothing to do. Use this if you know that + /// you are not on a worker of this registry. + pub(super) fn inject(&self, injected_job: JobRef) { + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject() sees state.terminate as true" + ); + + let queue_was_empty = self.injected_jobs.is_empty(); + + self.injected_jobs.push(injected_job); + self.sleep.new_injected_jobs(1, queue_was_empty); + } + + pub(crate) fn has_injected_job(&self) -> bool { + !self.injected_jobs.is_empty() + } + + fn pop_injected_job(&self) -> Option<JobRef> { + loop { + match self.injected_jobs.steal() { + Steal::Success(job) => return Some(job), + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + /// Push a job into each thread's own "external jobs" queue; it will be + /// executed only on that thread, when it has nothing else to do locally, + /// before it tries to steal other work. + /// + /// **Panics** if not given exactly as many jobs as there are threads. + pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) { + assert_eq!(self.num_threads(), injected_jobs.len()); + { + let broadcasts = self.broadcasts.lock().unwrap(); + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject_broadcast()` later, since they dropped their + // `ThreadPool`. + debug_assert_ne!( + self.terminate_count.load(Ordering::Acquire), + 0, + "inject_broadcast() sees state.terminate as true" + ); + + assert_eq!(broadcasts.len(), injected_jobs.len()); + for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) { + worker.push(job_ref); + } + } + for i in 0..self.num_threads() { + self.sleep.notify_worker_latch_is_set(i); + } + } + + /// If already in a worker-thread of this registry, just execute `op`. + /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` + /// completes and return its return value. If `op` panics, that panic will + /// be propagated as well. The second argument indicates `true` if injection + /// was performed, `false` if executed directly. + pub(super) fn in_worker<OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() { + self.in_worker_cold(op) + } else if (*worker_thread).registry().id() != self.id() { + self.in_worker_cross(&*worker_thread, op) + } else { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*worker_thread, false) + } + } + } + + #[cold] + unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); + + LOCK_LATCH.with(|l| { + // This thread isn't a member of *any* thread pool, so just block. + debug_assert!(WorkerThread::current().is_null()); + let job = StackJob::new( + Tlv::null(), + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + LatchRef::new(l), + ); + self.inject(job.as_job_ref()); + self.release_thread(); + job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + self.acquire_thread(); + + job.into_result() + }) + } + + #[cold] + unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R + where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, + { + // This thread is a member of a different pool, so let it process + // other work while waiting for this `op` to complete. + debug_assert!(current_thread.registry().id() != self.id()); + let latch = SpinLatch::cross(current_thread); + let job = StackJob::new( + Tlv::null(), + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + latch, + ); + self.inject(job.as_job_ref()); + current_thread.wait_until(&job.latch); + job.into_result() + } + + /// Increments the terminate counter. This increment should be + /// balanced by a call to `terminate`, which will decrement. This + /// is used when spawning asynchronous work, which needs to + /// prevent the registry from terminating so long as it is active. + /// + /// Note that blocking functions such as `join` and `scope` do not + /// need to concern themselves with this fn; their context is + /// responsible for ensuring the current thread-pool will not + /// terminate until they return. + /// + /// The global thread-pool always has an outstanding reference + /// (the initial one). Custom thread-pools have one outstanding + /// reference that is dropped when the `ThreadPool` is dropped: + /// since installing the thread-pool blocks until any joins/scopes + /// complete, this ensures that joins/scopes are covered. + /// + /// The exception is `::spawn()`, which can create a job outside + /// of any blocking scope. In that case, the job itself holds a + /// terminate count and is responsible for invoking `terminate()` + /// when finished. + pub(super) fn increment_terminate_count(&self) { + let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); + debug_assert!(previous != 0, "registry ref count incremented from zero"); + assert!(previous != usize::MAX, "overflow in registry ref count"); + } + + /// Signals that the thread-pool which owns this registry has been + /// dropped. The worker threads will gradually terminate, once any + /// extant work is completed. + pub(super) fn terminate(&self) { + if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { + for (i, thread_info) in self.thread_infos.iter().enumerate() { + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + } + } + } + + /// Notify the worker that the latch they are sleeping on has been "set". + pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { + self.sleep.notify_worker_latch_is_set(target_worker_index); + } +} + +/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler +/// if no other worker thread is active +#[inline] +pub fn mark_blocked() { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + unsafe { + let registry = &(*worker_thread).registry; + registry.sleep.mark_blocked(®istry.deadlock_handler) + } +} + +/// Mark a previously blocked Rayon worker thread as unblocked +#[inline] +pub fn mark_unblocked(registry: &Registry) { + registry.sleep.mark_unblocked() +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(super) struct RegistryId { + addr: usize, +} + +struct ThreadInfo { + /// Latch set once thread has started and we are entering into the + /// main loop. Used to wait for worker threads to become primed, + /// primarily of interest for benchmarking. + primed: LockLatch, + + /// Latch is set once worker thread has completed. Used to wait + /// until workers have stopped; only used for tests. + stopped: LockLatch, + + /// The latch used to signal that terminated has been requested. + /// This latch is *set* by the `terminate` method on the + /// `Registry`, once the registry's main "terminate" counter + /// reaches zero. + terminate: OnceLatch, + + /// the "stealer" half of the worker's deque + stealer: Stealer<JobRef>, +} + +impl ThreadInfo { + fn new(stealer: Stealer<JobRef>) -> ThreadInfo { + ThreadInfo { + primed: LockLatch::new(), + stopped: LockLatch::new(), + terminate: OnceLatch::new(), + stealer, + } + } +} + +/// //////////////////////////////////////////////////////////////////////// +/// WorkerThread identifiers + +pub(super) struct WorkerThread { + /// the "worker" half of our local deque + worker: Worker<JobRef>, + + /// the "stealer" half of the worker's broadcast deque + stealer: Stealer<JobRef>, + + /// local queue used for `spawn_fifo` indirection + fifo: JobFifo, + + pub(crate) index: usize, + + /// A weak random number generator. + rng: XorShift64Star, + + pub(crate) registry: Arc<Registry>, +} + +// This is a bit sketchy, but basically: the WorkerThread is +// allocated on the stack of the worker on entry and stored into this +// thread local variable. So it will remain valid at least until the +// worker is fully unwound. Using an unsafe pointer avoids the need +// for a RefCell<T> etc. +thread_local! { + static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) }; +} + +impl From<ThreadBuilder> for WorkerThread { + fn from(thread: ThreadBuilder) -> Self { + Self { + worker: thread.worker, + stealer: thread.stealer, + fifo: JobFifo::new(), + index: thread.index, + rng: XorShift64Star::new(), + registry: thread.registry, + } + } +} + +impl Drop for WorkerThread { + fn drop(&mut self) { + // Undo `set_current` + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().eq(&(self as *const _))); + t.set(ptr::null()); + }); + } +} + +impl WorkerThread { + /// Gets the `WorkerThread` index for the current thread; returns + /// NULL if this is not a worker thread. This pointer is valid + /// anywhere on the current thread. + #[inline] + pub(super) fn current() -> *const WorkerThread { + WORKER_THREAD_STATE.with(Cell::get) + } + + /// Sets `self` as the worker thread index for the current thread. + /// This is done during worker thread startup. + unsafe fn set_current(thread: *const WorkerThread) { + WORKER_THREAD_STATE.with(|t| { + assert!(t.get().is_null()); + t.set(thread); + }); + } + + /// Returns the registry that owns this worker thread. + #[inline] + pub(super) fn registry(&self) -> &Arc<Registry> { + &self.registry + } + + /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). + #[inline] + pub(super) fn index(&self) -> usize { + self.index + } + + #[inline] + pub(super) unsafe fn push(&self, job: JobRef) { + let queue_was_empty = self.worker.is_empty(); + self.worker.push(job); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); + } + + #[inline] + pub(super) unsafe fn push_fifo(&self, job: JobRef) { + self.push(self.fifo.push(job)); + } + + #[inline] + pub(super) fn local_deque_is_empty(&self) -> bool { + self.worker.is_empty() + } + + /// Attempts to obtain a "local" job -- typically this means + /// popping from the top of the stack, though if we are configured + /// for breadth-first execution, it would mean dequeuing from the + /// bottom. + #[inline] + pub(super) fn take_local_job(&self) -> Option<JobRef> { + let popped_job = self.worker.pop(); + + if popped_job.is_some() { + return popped_job; + } + + loop { + match self.stealer.steal() { + Steal::Success(job) => return Some(job), + Steal::Empty => return None, + Steal::Retry => {} + } + } + } + + pub(super) fn has_injected_job(&self) -> bool { + !self.stealer.is_empty() || self.registry.has_injected_job() + } + + /// Wait until the latch is set. Try to keep busy by popping and + /// stealing tasks as necessary. + #[inline] + pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) { + let latch = latch.as_core_latch(); + if !latch.probe() { + self.wait_until_cold(latch); + } + } + + #[cold] + unsafe fn wait_until_cold(&self, latch: &CoreLatch) { + // the code below should swallow all panics and hence never + // unwind; but if something does wrong, we want to abort, + // because otherwise other code in rayon may assume that the + // latch has been signaled, and that can lead to random memory + // accesses, which would be *very bad* + let abort_guard = unwind::AbortIfPanic; + + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { + self.execute(job); + continue; + } + + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, &self) + } + } + + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + break; + } + + mem::forget(abort_guard); // successful execution, do not abort + } + + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + registry.acquire_thread(); + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + + fn find_work(&self) -> Option<JobRef> { + // Try to find some work to do. We give preference first + // to things in our local deque, then in other workers + // deques, and finally to injected jobs from the + // outside. The idea is to finish what we started before + // we take on something new. + self.take_local_job() + .or_else(|| self.steal()) + .or_else(|| self.registry.pop_injected_job()) + } + + pub(super) fn yield_now(&self) -> Yield { + match self.find_work() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + pub(super) fn yield_local(&self) -> Yield { + match self.take_local_job() { + Some(job) => unsafe { + self.execute(job); + Yield::Executed + }, + None => Yield::Idle, + } + } + + #[inline] + pub(super) unsafe fn execute(&self, job: JobRef) { + job.execute(); + } + + /// Try to steal a single job and return it. + /// + /// This should only be done as a last resort, when there is no + /// local work to do. + fn steal(&self) -> Option<JobRef> { + // we only steal when we don't have any work to do locally + debug_assert!(self.local_deque_is_empty()); + + // otherwise, try to steal + let thread_infos = &self.registry.thread_infos.as_slice(); + let num_threads = thread_infos.len(); + if num_threads <= 1 { + return None; + } + + loop { + let mut retry = false; + let start = self.rng.next_usize(num_threads); + let job = (start..num_threads) + .chain(0..start) + .filter(move |&i| i != self.index) + .find_map(|victim_index| { + let victim = &thread_infos[victim_index]; + match victim.stealer.steal() { + Steal::Success(job) => Some(job), + Steal::Empty => None, + Steal::Retry => { + retry = true; + None + } + } + }); + if job.is_some() || !retry { + return job; + } + } + } +} + +/// //////////////////////////////////////////////////////////////////////// + +unsafe fn main_loop(thread: ThreadBuilder) { + let worker_thread = &WorkerThread::from(thread); + WorkerThread::set_current(worker_thread); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + + // let registry know we are ready to do work + Latch::set(®istry.thread_infos[index].primed); + + // Worker threads should not panic. If they do, just abort, as the + // internal state of the threadpool is corrupted. Note that if + // **user code** panics, we should catch that and redirect. + let abort_guard = unwind::AbortIfPanic; + + // Inform a user callback that we started a thread. + if let Some(ref handler) = registry.start_handler { + registry.catch_unwind(|| handler(index)); + } + + worker_thread.wait_until_out_of_work(); + + // Normal termination, do not abort. + mem::forget(abort_guard); + + // Inform a user callback that we exited a thread. + if let Some(ref handler) = registry.exit_handler { + registry.catch_unwind(|| handler(index)); + // We're already exiting the thread, there's nothing else to do. + } + + registry.release_thread(); +} + +/// If already in a worker-thread, just execute `op`. Otherwise, +/// execute `op` in the default thread-pool. Either way, block until +/// `op` completes and return its return value. If `op` panics, that +/// panic will be propagated as well. The second argument indicates +/// `true` if injection was performed, `false` if executed directly. +pub(super) fn in_worker<OP, R>(op: OP) -> R +where + OP: FnOnce(&WorkerThread, bool) -> R + Send, + R: Send, +{ + unsafe { + let owner_thread = WorkerThread::current(); + if !owner_thread.is_null() { + // Perfectly valid to give them a `&T`: this is the + // current thread, so we know the data structure won't be + // invalidated until we return. + op(&*owner_thread, false) + } else { + global_registry().in_worker(op) + } + } +} + +/// [xorshift*] is a fast pseudorandom number generator which will +/// even tolerate weak seeding, as long as it's not zero. +/// +/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* +struct XorShift64Star { + state: Cell<u64>, +} + +impl XorShift64Star { + fn new() -> Self { + // Any non-zero seed will do -- this uses the hash of a global counter. + let mut seed = 0; + while seed == 0 { + let mut hasher = DefaultHasher::new(); + static COUNTER: AtomicUsize = AtomicUsize::new(0); + hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); + seed = hasher.finish(); + } + + XorShift64Star { + state: Cell::new(seed), + } + } + + fn next(&self) -> u64 { + let mut x = self.state.get(); + debug_assert_ne!(x, 0); + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.state.set(x); + x.wrapping_mul(0x2545_f491_4f6c_dd1d) + } + + /// Return a value from `0..n`. + fn next_usize(&self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +} diff --git a/compiler/rustc_thread_pool/src/scope/mod.rs b/compiler/rustc_thread_pool/src/scope/mod.rs new file mode 100644 index 00000000000..364b322baad --- /dev/null +++ b/compiler/rustc_thread_pool/src/scope/mod.rs @@ -0,0 +1,783 @@ +//! Methods for custom fork-join scopes, created by the [`scope()`] +//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. +//! +//! [`scope()`]: fn.scope.html +//! [`in_place_scope()`]: fn.in_place_scope.html +//! [`join()`]: ../join/join.fn.html + +use crate::broadcast::BroadcastContext; +use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; +use crate::latch::{CountLatch, Latch}; +use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; +use crate::tlv::{self, Tlv}; +use crate::unwind; +use std::any::Any; +use std::fmt; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::Arc; + +#[cfg(test)] +mod test; + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// See [`scope()`] for more information. +/// +///[`scope()`]: fn.scope.html +pub struct Scope<'scope> { + base: ScopeBase<'scope>, +} + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// Those spawned from the same thread are prioritized in relative FIFO order. +/// See [`scope_fifo()`] for more information. +/// +///[`scope_fifo()`]: fn.scope_fifo.html +pub struct ScopeFifo<'scope> { + base: ScopeBase<'scope>, + fifos: Vec<JobFifo>, +} + +struct ScopeBase<'scope> { + /// thread registry where `scope()` was executed or where `in_place_scope()` + /// should spawn jobs. + registry: Arc<Registry>, + + /// if some job panicked, the error is stored here; it will be + /// propagated to the one who created the scope + panic: AtomicPtr<Box<dyn Any + Send + 'static>>, + + /// latch to track job counts + job_completed_latch: CountLatch, + + /// You can think of a scope as containing a list of closures to execute, + /// all of which outlive `'scope`. They're not actually required to be + /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because + /// the closures are only *moved* across threads to be executed. + #[allow(clippy::type_complexity)] + marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: Tlv, +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// `scope()` is a more flexible building block compared to `join()`, +/// since a loop can be used to spawn any number of tasks without +/// recursing. However, that flexibility comes at a performance price: +/// tasks spawned using `scope()` must be allocated onto the heap, +/// whereas `join()` can make exclusive use of the stack. **Prefer +/// `join()` (or, even better, parallel iterators) where possible.** +/// +/// # Example +/// +/// The Rayon `join()` function launches two closures and waits for them +/// to stop. One could implement `join()` using a scope like so, although +/// it would be less efficient than the real implementation: +/// +/// ```rust +/// # use rayon_core as rayon; +/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB) +/// where A: FnOnce() -> RA + Send, +/// B: FnOnce() -> RB + Send, +/// RA: Send, +/// RB: Send, +/// { +/// let mut result_a: Option<RA> = None; +/// let mut result_b: Option<RB> = None; +/// rayon::scope(|s| { +/// s.spawn(|_| result_a = Some(oper_a())); +/// s.spawn(|_| result_b = Some(oper_b())); +/// }); +/// (result_a.unwrap(), result_b.unwrap()) +/// } +/// ``` +/// +/// # A note on threading +/// +/// The closure given to `scope()` executes in the Rayon thread-pool, +/// as do those given to `spawn()`. This means that you can't access +/// thread-local variables (well, you can, but they may have +/// unexpected values). +/// +/// # Task execution +/// +/// Task execution potentially starts as soon as `spawn()` is called. +/// The task will end sometime before `scope()` returns. Note that the +/// *closure* given to scope may return much earlier. In general +/// the lifetime of a scope created like `scope(body)` goes something like this: +/// +/// - Scope begins when `scope(body)` is called +/// - Scope body `body()` is invoked +/// - Scope tasks may be spawned +/// - Scope body returns +/// - Scope tasks execute, possibly spawning more tasks +/// - Once all tasks are done, scope ends and `scope()` returns +/// +/// To see how and when tasks are joined, consider this example: +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope(|s| { +/// s.spawn(|s| { // task s.1 +/// s.spawn(|s| { // task s.1.1 +/// rayon::scope(|t| { +/// t.spawn(|_| ()); // task t.1 +/// t.spawn(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (scope `s` created) +/// +-----------------------------------------------+ (task s.2) +/// +-------+ (task s.1) | +/// | | | +/// | +---+ (task s.1.1) | +/// | | | | +/// | | | (scope `t` created) | +/// | | +----------------+ (task t.2) | +/// | | +---+ (task t.1) | | +/// | (mid) | | | | | +/// : | + <-+------------+ (scope `t` ends) | +/// : | | | +/// |<------+---+-----------------------------------+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// The point here is that everything spawned into scope `s` will +/// terminate (at latest) at the same point -- right before the +/// original call to `rayon::scope` returns. This includes new +/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new +/// scope is created (such as `t`), the things spawned into that scope +/// will be joined before that scope returns, which in turn occurs +/// before the creating task (task `s.1.1` in this case) finishes. +/// +/// There is no guaranteed order of execution for spawns in a scope, +/// given that other threads may steal tasks at any time. However, they +/// are generally prioritized in a LIFO order on the thread from which +/// they were spawned. So in this example, absent any stealing, we can +/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other +/// threads always steal from the other end of the deque, like FIFO +/// order. The idea is that "recent" tasks are most likely to be fresh +/// in the local CPU's cache, while other threads can steal older +/// "stale" tasks. For an alternate approach, consider +/// [`scope_fifo()`] instead. +/// +/// [`scope_fifo()`]: fn.scope_fifo.html +/// +/// # Accessing stack data +/// +/// In general, spawned tasks may access stack data in place that +/// outlives the scope itself. Other data must be fully owned by the +/// spawned task. +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // We can access `ok` because outlives the scope `s`. +/// println!("ok: {:?}", ok); +/// +/// // If we just try to use `bad` here, the closure will borrow `bad` +/// // (because we are just printing it out, and that only requires a +/// // borrow), which will result in a compilation error. Read on +/// // for options. +/// // println!("bad: {:?}", bad); +/// }); +/// }); +/// ``` +/// +/// As the comments example above suggest, to reference `bad` we must +/// take ownership of it. One way to do this is to detach the closure +/// from the surrounding stack frame, using the `move` keyword. This +/// will cause it to take ownership of *all* the variables it touches, +/// in this case including both `ok` *and* `bad`: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); +/// println!("bad: {:?}", bad); +/// }); +/// +/// // That closure is fine, but now we can't use `ok` anywhere else, +/// // since it is owned by the previous task: +/// // s.spawn(|_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// While this works, it could be a problem if we want to use `ok` elsewhere. +/// There are two choices. We can keep the closure as a `move` closure, but +/// instead of referencing the variable `ok`, we create a shadowed variable that +/// is a borrow of `ok` and capture *that*: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// let ok: &Vec<i32> = &ok; // shadow the original `ok` +/// s.spawn(move |_| { +/// println!("ok: {:?}", ok); // captures the shadowed version +/// println!("bad: {:?}", bad); +/// }); +/// +/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references +/// // can be shared freely. Note that we need a `move` closure here though, +/// // because otherwise we'd be trying to borrow the shadowed `ok`, +/// // and that doesn't outlive `scope`. +/// s.spawn(move |_| println!("ok: {:?}", ok)); +/// }); +/// ``` +/// +/// Another option is not to use the `move` keyword but instead to take ownership +/// of individual variables: +/// +/// ```rust +/// # use rayon_core as rayon; +/// let ok: Vec<i32> = vec![1, 2, 3]; +/// rayon::scope(|s| { +/// let bad: Vec<i32> = vec![4, 5, 6]; +/// s.spawn(|_| { +/// // Transfer ownership of `bad` into a local variable (also named `bad`). +/// // This will force the closure to take ownership of `bad` from the environment. +/// let bad = bad; +/// println!("ok: {:?}", ok); // `ok` is only borrowed. +/// println!("bad: {:?}", bad); // refers to our local variable, above. +/// }); +/// +/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// }); +/// ``` +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = Scope::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// # Task execution +/// +/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a +/// difference in the order of execution. Consider a similar example: +/// +/// [`scope()`]: fn.scope.html +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope_fifo(|s| { +/// s.spawn_fifo(|s| { // task s.1 +/// s.spawn_fifo(|s| { // task s.1.1 +/// rayon::scope_fifo(|t| { +/// t.spawn_fifo(|_| ()); // task t.1 +/// t.spawn_fifo(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn_fifo(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (FIFO scope `s` created) +/// +--------------------+ (task s.1) +/// +-------+ (task s.2) | +/// | | +---+ (task s.1.1) +/// | | | | +/// | | | | (FIFO scope `t` created) +/// | | | +----------------+ (task t.1) +/// | | | +---+ (task t.2) | +/// | (mid) | | | | | +/// : | | + <-+------------+ (scope `t` ends) +/// : | | | +/// |<------+------------+---+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on +/// the thread from which they were spawned, as opposed to `scope()`'s +/// LIFO. So in this example, we can expect `s.1` to execute before +/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in +/// FIFO order, as usual. Overall, this has roughly the same order as +/// the now-deprecated [`breadth_first`] option, except the effect is +/// isolated to a particular scope. If spawns are intermingled from any +/// combination of `scope()` and `scope_fifo()`, or from different +/// threads, their order is only specified with respect to spawns in the +/// same scope and thread. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope_fifo()` or +/// in any of the spawned jobs, that panic will be propagated and the +/// call to `scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it +/// will execute, even if the spawning task should later panic. +/// `scope_fifo()` returns once all spawned jobs have completed, and any +/// panics are propagated at that point. +pub fn scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) + }) +} + +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// This is just like `scope()` except the closure runs on the same thread +/// that calls `in_place_scope()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + do_in_place_scope(None, op) +} + +pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = Scope::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// This is just like `scope_fifo()` except the closure runs on the same thread +/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope_fifo()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + do_in_place_scope_fifo(None, op) +} + +pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = ScopeFifo::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +impl<'scope> Scope<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + Scope { base } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # Returns + /// + /// Nothing. The spawned closures cannot pass back values to the + /// caller directly, though they can write to local variables on + /// the stack (if those variables outlive the scope) or + /// communicate through shared channels. + /// + /// (The intention is to eventually integrate with Rust futures to + /// support spawns of functions that compute a value.) + /// + /// # Examples + /// + /// ```rust + /// # use rayon_core as rayon; + /// let mut value_a = None; + /// let mut value_b = None; + /// let mut value_c = None; + /// rayon::scope(|s| { + /// s.spawn(|s1| { + /// // ^ this is the same scope as `s`; this handle `s1` + /// // is intended for use by the spawned task, + /// // since scope handles cannot cross thread boundaries. + /// + /// value_a = Some(22); + /// + /// // the scope `s` will not end until all these tasks are done + /// s1.spawn(|_| { + /// value_b = Some(44); + /// }); + /// }); + /// + /// s.spawn(|_| { + /// value_c = Some(66); + /// }); + /// }); + /// assert_eq!(value_a, Some(22)); + /// assert_eq!(value_b, Some(44)); + /// assert_eq!(value_c, Some(66)); + /// ``` + /// + /// # See also + /// + /// The [`scope` function] has more extensive documentation about + /// task spawning. + /// + /// [`scope` function]: fn.scope.html + pub fn spawn<BODY>(&self, body: BODY) + where + BODY: FnOnce(&Scope<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(self.base.tlv, move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an in-place scope. + self.base.registry.inject_or_push(job_ref); + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast<BODY>(&self, body: BODY) + where + BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + ScopeBase::execute_job(&scope.base, func) + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeFifo<'scope> { + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + let num_threads = base.registry.num_threads(); + let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); + ScopeFifo { base, fifos } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # See also + /// + /// This method is akin to [`Scope::spawn()`], but with a FIFO + /// priority. The [`scope_fifo` function] has more details about + /// this distinction. + /// + /// [`Scope::spawn()`]: struct.Scope.html#method.spawn + /// [`scope_fifo` function]: fn.scope_fifo.html + pub fn spawn_fifo<BODY>(&self, body: BODY) + where + BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(self.base.tlv, move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // If we're in the pool, use our scope's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match self.base.registry.current_thread() { + Some(worker) => { + let fifo = &self.fifos[worker.index()]; + // SAFETY: this job will execute before the scope ends. + unsafe { worker.push(fifo.push(job_ref)) }; + } + None => self.base.registry.inject(job_ref), + } + } + + /// Spawns a job into every thread of the fork-join scope `self`. This job will + /// execute on each thread sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its own reference + /// to the scope `self` as argument, as well as a `BroadcastContext`. + pub fn spawn_broadcast<BODY>(&self, body: BODY) + where + BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, + { + let scope_ptr = ScopePtr(self); + let job = ArcJob::new(move || unsafe { + // SAFETY: this job will execute before the scope ends. + let scope = scope_ptr.as_ref(); + let body = &body; + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + ScopeBase::execute_job(&scope.base, func) + }); + self.base.inject_broadcast(job) + } +} + +impl<'scope> ScopeBase<'scope> { + /// Creates the base of a new scope for the given registry + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let registry = registry.unwrap_or_else(|| match owner { + Some(owner) => owner.registry(), + None => global_registry(), + }); + + ScopeBase { + registry: Arc::clone(registry), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(owner), + marker: PhantomData, + tlv: tlv::get(), + } + } + + fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef + where + FUNC: FnOnce() + Send + 'scope, + { + unsafe { + self.job_completed_latch.increment(); + job.into_job_ref() + } + } + + fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>) + where + FUNC: Fn() + Send + Sync + 'scope, + { + let n_threads = self.registry.num_threads(); + let job_refs = (0..n_threads).map(|_| unsafe { + self.job_completed_latch.increment(); + ArcJob::as_job_ref(&job) + }); + + self.registry.inject_broadcast(job_refs); + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R + where + FUNC: FnOnce() -> R, + { + let result = unsafe { Self::execute_job_closure(self, func) }; + self.job_completed_latch.wait(owner); + + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); + + self.maybe_propagate_panic(); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) + where + FUNC: FnOnce(), + { + let _: Option<()> = Self::execute_job_closure(this, func); + } + + /// Executes `func` as a job in scope. Adjusts the "job completed" + /// counters and also catches any panic and stores it into + /// `scope`. + unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> + where + FUNC: FnOnce() -> R, + { + let result = match unwind::halt_unwinding(func) { + Ok(r) => Some(r), + Err(err) => { + (*this).job_panicked(err); + None + } + }; + Latch::set(&(*this).job_completed_latch); + result + } + + fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { + // capture the first error we see, free the rest + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr + let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // ownership now transferred into self.panic + } else { + // another panic raced in ahead of us, so drop ours + let _: Box<Box<_>> = ManuallyDrop::into_inner(err); + } + } + } + + fn maybe_propagate_panic(&self) { + // propagate panic, if any occurred; at this point, all + // outstanding jobs have completed, so we can use a relaxed + // ordering: + let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); + if !panic.is_null() { + let value = unsafe { Box::from_raw(panic) }; + + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); + + unwind::resume_unwinding(*value); + } + } +} + +impl<'scope> fmt::Debug for Scope<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Scope") + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +impl<'scope> fmt::Debug for ScopeFifo<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ScopeFifo") + .field("num_fifos", &self.fifos.len()) + .field("pool_id", &self.base.registry.id()) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. +/// +/// Unsafe code is still required to dereference the pointer, but that's fine in +/// scope jobs that are guaranteed to execute before the scope ends. +struct ScopePtr<T>(*const T); + +// SAFETY: !Send for raw pointers is not for safety, just as a lint +unsafe impl<T: Sync> Send for ScopePtr<T> {} + +// SAFETY: !Sync for raw pointers is not for safety, just as a lint +unsafe impl<T: Sync> Sync for ScopePtr<T> {} + +impl<T> ScopePtr<T> { + // Helper to avoid disjoint captures of `scope_ptr.0` + unsafe fn as_ref(&self) -> &T { + &*self.0 + } +} diff --git a/compiler/rustc_thread_pool/src/scope/test.rs b/compiler/rustc_thread_pool/src/scope/test.rs new file mode 100644 index 00000000000..4505ba7c4fb --- /dev/null +++ b/compiler/rustc_thread_pool/src/scope/test.rs @@ -0,0 +1,622 @@ +use crate::unwind; +use crate::ThreadPoolBuilder; +use crate::{scope, scope_fifo, Scope, ScopeFifo}; +use rand::{Rng, SeedableRng}; +use rand_xorshift::XorShiftRng; +use std::iter::once; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Barrier, Mutex}; +use std::vec; + +#[test] +fn scope_empty() { + scope(|_| {}); +} + +#[test] +fn scope_result() { + let x = scope(|_| 22); + assert_eq!(x, 22); +} + +#[test] +fn scope_two() { + let counter = &AtomicUsize::new(0); + scope(|s| { + s.spawn(move |_| { + counter.fetch_add(1, Ordering::SeqCst); + }); + s.spawn(move |_| { + counter.fetch_add(10, Ordering::SeqCst); + }); + }); + + let v = counter.load(Ordering::SeqCst); + assert_eq!(v, 11); +} + +#[test] +fn scope_divide_and_conquer() { + let counter_p = &AtomicUsize::new(0); + scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024))); + + let counter_s = &AtomicUsize::new(0); + divide_and_conquer_seq(counter_s, 1024); + + let p = counter_p.load(Ordering::SeqCst); + let s = counter_s.load(Ordering::SeqCst); + assert_eq!(p, s); +} + +fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) { + if size > 1 { + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2)); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) { + if size > 1 { + divide_and_conquer_seq(counter, size / 2); + divide_and_conquer_seq(counter, size / 2); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } +} + +struct Tree<T: Send> { + value: T, + children: Vec<Tree<T>>, +} + +impl<T: Send> Tree<T> { + fn iter(&self) -> vec::IntoIter<&T> { + once(&self.value) + .chain(self.children.iter().flat_map(Tree::iter)) + .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow + .into_iter() + } + + fn update<OP>(&mut self, op: OP) + where + OP: Fn(&mut T) + Sync, + T: Send, + { + scope(|s| self.update_in_scope(&op, s)); + } + + fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>) + where + OP: Fn(&mut T) + Sync, + { + let Tree { + ref mut value, + ref mut children, + } = *self; + scope.spawn(move |scope| { + for child in children { + scope.spawn(move |scope| child.update_in_scope(op, scope)); + } + }); + + op(value); + } +} + +fn random_tree(depth: usize) -> Tree<u32> { + assert!(depth > 0); + let mut seed = <XorShiftRng as SeedableRng>::Seed::default(); + (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i); + let mut rng = XorShiftRng::from_seed(seed); + random_tree1(depth, &mut rng) +} + +fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> { + let children = if depth == 0 { + vec![] + } else { + (0..rng.random_range(0..4)) // somewhere between 0 and 3 children at each level + .map(|_| random_tree1(depth - 1, rng)) + .collect() + }; + + Tree { + value: rng.random_range(0..1_000_000), + children, + } +} + +#[test] +fn update_tree() { + let mut tree: Tree<u32> = random_tree(10); + let values: Vec<u32> = tree.iter().cloned().collect(); + tree.update(|v| *v += 1); + let new_values: Vec<u32> = tree.iter().cloned().collect(); + assert_eq!(values.len(), new_values.len()); + for (&i, &j) in values.iter().zip(&new_values) { + assert_eq!(i + 1, j); + } +} + +/// Check that if you have a chain of scoped tasks where T0 spawns T1 +/// spawns T2 and so forth down to Tn, the stack space should not grow +/// linearly with N. We test this by some unsafe hackery and +/// permitting an approx 10% change with a 10x input change. +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn linear_stack_growth() { + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let mut max_diff = Mutex::new(0); + let bottom_of_stack = 0; + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5)); + let diff_when_5 = *max_diff.get_mut().unwrap() as f64; + + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500)); + let diff_when_500 = *max_diff.get_mut().unwrap() as f64; + + let ratio = diff_when_5 / diff_when_500; + assert!( + ratio > 0.9 && ratio < 1.1, + "stack usage ratio out of bounds: {}", + ratio + ); + }); +} + +fn the_final_countdown<'scope>( + s: &Scope<'scope>, + bottom_of_stack: &'scope i32, + max: &'scope Mutex<usize>, + n: usize, +) { + let top_of_stack = 0; + let p = bottom_of_stack as *const i32 as usize; + let q = &top_of_stack as *const i32 as usize; + let diff = if p > q { p - q } else { q - p }; + + let mut data = max.lock().unwrap(); + *data = Ord::max(diff, *data); + + if n > 0 { + s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1)); + } +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_scope() { + scope(|_| panic!("Hello, world!")); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_spawn() { + scope(|s| s.spawn(|_| panic!("Hello, world!"))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_spawn() { + scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate_nested_scope_spawn() { + scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!"))))); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_1() { + let mut x = false; + let result = unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); // job A + s.spawn(|_| x = true); // job B, should still execute even though A panics + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_2() { + let mut x = false; + let result = unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // job B, should still execute even though A panics + s.spawn(|_| panic!("Hello, world!")); // job A + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_3() { + let mut x = false; + let result = unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| x = true); // spawned job should still execute despite later panic + panic!("Hello, world!"); + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_propagate_still_execute_4() { + let mut x = false; + let result = unwind::halt_unwinding(|| { + scope(|s| { + s.spawn(|_| panic!("Hello, world!")); + x = true; + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic in spawn tainted scope"), + } +} + +macro_rules! test_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |scope| { + for j in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_order!(scope => spawn); + let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_order!(scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_nested_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$outer_spawn(move |_| { + $inner_scope(|scope| { + for j in 0..10 { + scope.$inner_spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_lifo_order() { + // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_nested_order!(scope => spawn, scope => spawn); + let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_fifo_order() { + // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected: Vec<i32> = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_push { + ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{ + $scope.$spawn(move |_| $vec.lock().unwrap().push($i)); + }}; +} + +/// Test spawns pushing a series of numbers, interleaved +/// such that negative values are using an inner scope. +macro_rules! test_mixed_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|outer_scope| { + let vec = &vec; + spawn_push!(outer_scope.$outer_spawn, vec, 0); + $inner_scope(|inner_scope| { + spawn_push!(inner_scope.$inner_spawn, vec, -1); + spawn_push!(outer_scope.$outer_spawn, vec, 1); + spawn_push!(inner_scope.$inner_spawn, vec, -2); + spawn_push!(outer_scope.$outer_spawn, vec, 2); + spawn_push!(inner_scope.$inner_spawn, vec, -3); + }); + spawn_push!(outer_scope.$outer_spawn, vec, 3); + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_lifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope => spawn); + let expected = vec![-3, 2, -2, 1, -1, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_fifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected = vec![-1, 0, -2, 1, -3, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_lifo_fifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected = vec![-1, 2, -2, 1, -3, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected = vec![-3, 0, -2, 1, -1, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +fn static_scope() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope(|s: &Scope<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn static_scope_fifo() { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + scope_fifo(|s: &ScopeFifo<'static>| { + // While we're allowed the locally borrowed iterator, + // the spawns must be static. + for i in iter { + s.spawn_fifo(move |_| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); +} + +#[test] +fn mixed_lifetime_scope() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope(move |s: &Scope<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn mixed_lifetime_scope_fifo() { + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + scope_fifo(move |s: &ScopeFifo<'counter>| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + s.spawn_fifo(move |_| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn scope_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_fifo_spawn_broadcast() { + let sum = AtomicUsize::new(0); + let n = scope_fifo(|s| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * (n - 1) / 2); +} + +#[test] +fn scope_spawn_broadcast_nested() { + let sum = AtomicUsize::new(0); + let n = scope(|s| { + s.spawn_broadcast(|s, _| { + s.spawn_broadcast(|_, ctx| { + sum.fetch_add(ctx.index(), Ordering::Relaxed); + }); + }); + crate::current_num_threads() + }); + assert_eq!(sum.into_inner(), n * n * (n - 1) / 2); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_barrier() { + let barrier = Barrier::new(8); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + pool.in_place_scope(|s| { + s.spawn_broadcast(|_, _| { + barrier.wait(); + }); + barrier.wait(); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_panic_one() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() == 3 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_spawn_broadcast_panic_many() { + let count = AtomicUsize::new(0); + let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); + let result = crate::unwind::halt_unwinding(|| { + pool.scope(|s| { + s.spawn_broadcast(|_, ctx| { + count.fetch_add(1, Ordering::Relaxed); + if ctx.index() % 2 == 0 { + panic!("Hello, world!"); + } + }); + }); + }); + assert_eq!(count.into_inner(), 7); + assert!(result.is_err(), "broadcast panic should propagate!"); +} diff --git a/compiler/rustc_thread_pool/src/sleep/README.md b/compiler/rustc_thread_pool/src/sleep/README.md new file mode 100644 index 00000000000..e79efd15ca9 --- /dev/null +++ b/compiler/rustc_thread_pool/src/sleep/README.md @@ -0,0 +1,252 @@ +# Introduction: the sleep module + +The code in this module governs when worker threads should go to +sleep. The system used in this code was introduced in [Rayon RFC #5]. +There is also a [video walkthrough] available. Both of those may be +valuable resources to understanding the code, though naturally they +will also grow stale over time. The comments in this file are +extracted from the RFC and meant to be kept up to date. + +[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5 +[video walkthrough]: https://youtu.be/HvmQsE5M4cY + +# The `Sleep` struct + +The `Sleep` struct is embedded into each registry. It performs several functions: + +* It tracks when workers are awake or asleep. +* It decides how long a worker should look for work before it goes to sleep, + via a callback that is invoked periodically from the worker's search loop. +* It is notified when latches are set, jobs are published, or other + events occur, and it will go and wake the appropriate threads if + they are sleeping. + +# Thread states + +There are three main thread states: + +* An **active** thread is one that is actively executing a job. +* An **idle** thread is one that is searching for work to do. It will be + trying to steal work or pop work from the global injector queue. +* A **sleeping** thread is one that is blocked on a condition variable, + waiting to be awoken. + +We sometimes refer to the final two states collectively as **inactive**. +Threads begin as idle but transition to idle and finally sleeping when +they're unable to find work to do. + +## Sleepy threads + +There is one other special state worth mentioning. During the idle state, +threads can get **sleepy**. A sleepy thread is still idle, in that it is still +searching for work, but it is *about* to go to sleep after it does one more +search (or some other number, potentially). When a thread enters the sleepy +state, it signals (via the **jobs event counter**, described below) that it is +about to go to sleep. If new work is published, this will lead to the counter +being adjusted. When the thread actually goes to sleep, it will (hopefully, but +not guaranteed) see that the counter has changed and elect not to sleep, but +instead to search again. See the section on the **jobs event counter** for more +details. + +# The counters + +One of the key structs in the sleep module is `AtomicCounters`, found in +`counters.rs`. It packs three counters into one atomically managed value: + +* Two **thread counters**, which track the number of threads in a particular state. +* The **jobs event counter**, which is used to signal when new work is available. + It (sort of) tracks the number of jobs posted, but not quite, and it can rollover. + +## Thread counters + +There are two thread counters, one that tracks **inactive** threads and one that +tracks **sleeping** threads. From this, one can deduce the number of threads +that are idle by subtracting sleeping threads from inactive threads. We track +the counters in this way because it permits simpler atomic operations. One can +increment the number of sleeping threads (and thus decrease the number of idle +threads) simply by doing one atomic increment, for example. Similarly, one can +decrease the number of sleeping threads (and increase the number of idle +threads) through one atomic decrement. + +These counters are adjusted as follows: + +* When a thread enters the idle state: increment the inactive thread counter. +* When a thread enters the sleeping state: increment the sleeping thread counter. +* When a thread awakens a sleeping thread: decrement the sleeping thread counter. + * Subtle point: the thread that *awakens* the sleeping thread decrements the + counter, not the thread that is *sleeping*. This is because there is a delay + between signaling a thread to wake and the thread actually waking: + decrementing the counter when awakening the thread means that other threads + that may be posting work will see the up-to-date value that much faster. +* When a thread finds work, exiting the idle state: decrement the inactive + thread counter. + +## Jobs event counter + +The final counter is the **jobs event counter**. The role of this counter is to +help sleepy threads detect when new work is posted in a lightweight fashion. In +its simplest form, we would simply have a counter that gets incremented each +time a new job is posted. This way, when a thread gets sleepy, it could read the +counter, and then compare to see if the value has changed before it actually +goes to sleep. But this [turns out to be too expensive] in practice, so we use a +somewhat more complex scheme. + +[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747 + +The idea is that the counter toggles between two states, depending on whether +its value is even or odd (or, equivalently, on the value of its low bit): + +* Even -- If the low bit is zero, then it means that there has been no new work + since the last thread got sleepy. +* Odd -- If the low bit is one, then it means that new work was posted since + the last thread got sleepy. + +### New work is posted + +When new work is posted, we check the value of the counter: if it is even, +then we increment it by one, so that it becomes odd. + +### Worker thread gets sleepy + +When a worker thread gets sleepy, it will read the value of the counter. If the +counter is odd, it will increment the counter so that it is even. Either way, it +remembers the final value of the counter. The final value will be used later, +when the thread is going to sleep. If at that time the counter has not changed, +then we can assume no new jobs have been posted (though note the remote +possibility of rollover, discussed in detail below). + +# Protocol for a worker thread to post work + +The full protocol for a thread to post work is as follows + +* If the work is posted into the injection queue, then execute a seq-cst fence (see below). +* Load the counters, incrementing the JEC if it is even so that it is odd. +* Check if there are idle threads available to handle this new job. If not, + and there are sleeping threads, then wake one or more threads. + +# Protocol for a worker thread to fall asleep + +The full protocol for a thread to fall asleep is as follows: + +* After completing all its jobs, the worker goes idle and begins to + search for work. As it searches, it counts "rounds". In each round, + it searches all other work threads' queues, plus the 'injector queue' for + work injected from the outside. If work is found in this search, the thread + becomes active again and hence restarts this protocol from the top. +* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy` + above, remembering the `final_value` of the JEC. It does one more search for work. +* If no work is found, the thread atomically: + * Checks the JEC to see that it has not changed from `final_value`. + * If it has, then the thread goes back to searching for work. We reset to + just before we got sleepy, so that we will do one more search + before attempting to sleep again (rather than searching for many rounds). + * Increments the number of sleeping threads by 1. +* The thread then executes a seq-cst fence operation (see below). +* The thread then does one final check for injected jobs (see below). If any + are available, it returns to the 'pre-sleepy' state as if the JEC had changed. +* The thread waits to be signaled. Once signaled, it returns to the idle state. + +# The jobs event counter and deadlock + +As described in the section on the JEC, the main concern around going to sleep +is avoiding a race condition wherein: + +* Thread A looks for work, finds none. +* Thread B posts work but sees no sleeping threads. +* Thread A goes to sleep. + +The JEC protocol largely prevents this, but due to rollover, this prevention is +not complete. It is possible -- if unlikely -- that enough activity occurs for +Thread A to observe the same JEC value that it saw when getting sleepy. If the +new work being published came from *inside* the thread-pool, then this race +condition isn't too harmful. It means that we have fewer workers processing the +work then we should, but we won't deadlock. This seems like an acceptable risk +given that this is unlikely in practice. + +However, if the work was posted as an *external* job, that is a problem. In that +case, it's possible that all of our workers could go to sleep, and the external +job would never get processed. To prevent that, the sleeping protocol includes +one final check to see if the injector queue is empty before fully falling +asleep. Note that this final check occurs **after** the number of sleeping +threads has been incremented. We are not concerned therefore with races against +injections that occur after that increment, only before. + +Unfortunately, there is one rather subtle point concerning this final check: +we wish to avoid the possibility that: + +* work is pushed into the injection queue by an outside thread X, +* the sleepy thread S sees the JEC but it has rolled over and is equal +* the sleepy thread S reads the injection queue but does not see the work posted by X. + +This is possible because the C++ memory model typically offers guarantees of the +form "if you see the access A, then you must see those other accesses" -- but it +doesn't guarantee that you will see the access A (i.e., if you think of +processors with independent caches, you may be operating on very out of date +cache state). + +## Using seq-cst fences to prevent deadlock + +To overcome this problem, we have inserted two sequentially consistent fence +operations into the protocols above: + +* One fence occurs after work is posted into the injection queue, but before the + counters are read (including the number of sleeping threads). + * Note that no fence is needed for work posted to internal queues, since it is ok + to overlook work in that case. +* One fence occurs after the number of sleeping threads is incremented, but + before the injection queue is read. + +### Proof sketch + +What follows is a "proof sketch" that the protocol is deadlock free. We model +two relevant bits of memory, the job injector queue J and the atomic counters C. + +Consider the actions of the injecting thread: + +* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics. +* PushFence: A sequentially consistent fence is executed. +* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first). + +Meanwhile, the sleepy thread does the following: + +* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C. +* SleepFence: A sequentially consistent fence is executed. +* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics. + +Either PushFence or SleepFence must come first: + +* If PushFence comes first, then PushJob must be visible to ReadJob. +* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. + +# Deadlock detection + +This module tracks a number of variables in order to detect deadlocks due to user code blocking. +These variables are stored in the `SleepData` struct which itself is kept behind a mutex. +It contains the following fields: +- `worker_count` - The number of threads in the thread pool. +- `active_threads` - The number of threads in the thread pool which are running + and aren't blocked in user code or sleeping. +- `blocked_threads` - The number of threads which are blocked in user code. + This doesn't include threads blocked by Rayon. + +User code can indicate blocking by calling `mark_blocked` before blocking and +calling `mark_unblocked` before unblocking a thread. +This will adjust `active_threads` and `blocked_threads` accordingly. + +When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to +`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked +by user code. + +A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0. +If we ignored `blocked_threads` we would have a deadlock +immediately when creating the thread pool. +We would also deadlock once the thread pool ran out of work. +It is not possible for Rayon itself to deadlock. +Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks. + +We check for the deadlock condition when +threads fall asleep in `mark_unblocked` and in `Sleep::sleep`. +If there's a deadlock detected we call the user provided deadlock handler while we hold the +lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and +`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread. +Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep. diff --git a/compiler/rustc_thread_pool/src/sleep/counters.rs b/compiler/rustc_thread_pool/src/sleep/counters.rs new file mode 100644 index 00000000000..05941becd1c --- /dev/null +++ b/compiler/rustc_thread_pool/src/sleep/counters.rs @@ -0,0 +1,277 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub(super) struct AtomicCounters { + /// Packs together a number of counters. The counters are ordered as + /// follows, from least to most significant bits (here, we assuming + /// that [`THREADS_BITS`] is equal to 10): + /// + /// * Bits 0..10: Stores the number of **sleeping threads** + /// * Bits 10..20: Stores the number of **inactive threads** + /// * Bits 20..: Stores the **job event counter** (JEC) + /// + /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note + /// that the total number of bits (and hence the number of bits used for the + /// JEC) will depend on whether we are using a 32- or 64-bit architecture. + value: AtomicUsize, +} + +#[derive(Copy, Clone)] +pub(super) struct Counters { + word: usize, +} + +/// A value read from the **Jobs Event Counter**. +/// See the [`README.md`](README.md) for more +/// coverage of how the jobs event counter works. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] +pub(super) struct JobsEventCounter(usize); + +impl JobsEventCounter { + pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(usize::MAX); + + #[inline] + pub(super) fn as_usize(self) -> usize { + self.0 + } + + /// The JEC "is sleepy" if the last thread to increment it was in the + /// process of becoming sleepy. This is indicated by its value being *even*. + /// When new jobs are posted, they check if the JEC is sleepy, and if so + /// they incremented it. + #[inline] + pub(super) fn is_sleepy(self) -> bool { + (self.as_usize() & 1) == 0 + } + + /// The JEC "is active" if the last thread to increment it was posting new + /// work. This is indicated by its value being *odd*. When threads get + /// sleepy, they will check if the JEC is active, and increment it. + #[inline] + pub(super) fn is_active(self) -> bool { + !self.is_sleepy() + } +} + +/// Number of bits used for the thread counters. +#[cfg(target_pointer_width = "64")] +const THREADS_BITS: usize = 16; + +#[cfg(target_pointer_width = "32")] +const THREADS_BITS: usize = 8; + +/// Bits to shift to select the sleeping threads +/// (used with `select_bits`). +#[allow(clippy::erasing_op)] +const SLEEPING_SHIFT: usize = 0 * THREADS_BITS; + +/// Bits to shift to select the inactive threads +/// (used with `select_bits`). +#[allow(clippy::identity_op)] +const INACTIVE_SHIFT: usize = 1 * THREADS_BITS; + +/// Bits to shift to select the JEC +/// (use JOBS_BITS). +const JEC_SHIFT: usize = 2 * THREADS_BITS; + +/// Max value for the thread counters. +pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1; + +/// Constant that can be added to add one sleeping thread. +const ONE_SLEEPING: usize = 1; + +/// Constant that can be added to add one inactive thread. +/// An inactive thread is either idle, sleepy, or sleeping. +const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT; + +/// Constant that can be added to add one to the JEC. +const ONE_JEC: usize = 1 << JEC_SHIFT; + +impl AtomicCounters { + #[inline] + pub(super) fn new() -> AtomicCounters { + AtomicCounters { + value: AtomicUsize::new(0), + } + } + + /// Load and return the current value of the various counters. + /// This value can then be given to other method which will + /// attempt to update the counters via compare-and-swap. + #[inline] + pub(super) fn load(&self, ordering: Ordering) -> Counters { + Counters::new(self.value.load(ordering)) + } + + #[inline] + fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool { + self.value + .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed) + .is_ok() + } + + /// Adds an inactive thread. This cannot fail. + /// + /// This should be invoked when a thread enters its idle loop looking + /// for work. It is decremented when work is found. Note that it is + /// not decremented if the thread transitions from idle to sleepy or sleeping; + /// so the number of inactive threads is always greater-than-or-equal + /// to the number of sleeping threads. + #[inline] + pub(super) fn add_inactive_thread(&self) { + self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); + } + + /// Increments the jobs event counter if `increment_when`, when applied to + /// the current value, is true. Used to toggle the JEC from even (sleepy) to + /// odd (active) or vice versa. Returns the final value of the counters, for + /// which `increment_when` is guaranteed to return false. + pub(super) fn increment_jobs_event_counter_if( + &self, + increment_when: impl Fn(JobsEventCounter) -> bool, + ) -> Counters { + loop { + let old_value = self.load(Ordering::SeqCst); + if increment_when(old_value.jobs_counter()) { + let new_value = old_value.increment_jobs_counter(); + if self.try_exchange(old_value, new_value, Ordering::SeqCst) { + return new_value; + } + } else { + return old_value; + } + } + } + + /// Subtracts an inactive thread. This cannot fail. It is invoked + /// when a thread finds work and hence becomes active. It returns the + /// number of sleeping threads to wake up (if any). + /// + /// See `add_inactive_thread`. + #[inline] + pub(super) fn sub_inactive_thread(&self) -> usize { + let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst)); + debug_assert!( + old_value.inactive_threads() > 0, + "sub_inactive_thread: old_value {:?} has no inactive threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() <= old_value.inactive_threads(), + "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads", + old_value, + old_value.sleeping_threads(), + old_value.inactive_threads(), + ); + + // Current heuristic: whenever an inactive thread goes away, if + // there are any sleeping threads, wake 'em up. + let sleeping_threads = old_value.sleeping_threads(); + Ord::min(sleeping_threads, 2) + } + + /// Subtracts a sleeping thread. This cannot fail, but it is only + /// safe to do if you you know the number of sleeping threads is + /// non-zero (i.e., because you have just awoken a sleeping + /// thread). + #[inline] + pub(super) fn sub_sleeping_thread(&self) { + let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst)); + debug_assert!( + old_value.sleeping_threads() > 0, + "sub_sleeping_thread: old_value {:?} had no sleeping threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() <= old_value.inactive_threads(), + "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads", + old_value, + old_value.sleeping_threads(), + old_value.inactive_threads(), + ); + } + + #[inline] + pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool { + debug_assert!( + old_value.inactive_threads() > 0, + "try_add_sleeping_thread: old_value {:?} has no inactive threads", + old_value, + ); + debug_assert!( + old_value.sleeping_threads() < THREADS_MAX, + "try_add_sleeping_thread: old_value {:?} has too many sleeping threads", + old_value, + ); + + let mut new_value = old_value; + new_value.word += ONE_SLEEPING; + + self.try_exchange(old_value, new_value, Ordering::SeqCst) + } +} + +#[inline] +fn select_thread(word: usize, shift: usize) -> usize { + (word >> shift) & THREADS_MAX +} + +#[inline] +fn select_jec(word: usize) -> usize { + word >> JEC_SHIFT +} + +impl Counters { + #[inline] + fn new(word: usize) -> Counters { + Counters { word } + } + + #[inline] + fn increment_jobs_counter(self) -> Counters { + // We can freely add to JEC because it occupies the most significant bits. + // Thus it doesn't overflow into the other counters, just wraps itself. + Counters { + word: self.word.wrapping_add(ONE_JEC), + } + } + + #[inline] + pub(super) fn jobs_counter(self) -> JobsEventCounter { + JobsEventCounter(select_jec(self.word)) + } + + /// The number of threads that are not actively + /// executing work. They may be idle, sleepy, or asleep. + #[inline] + pub(super) fn inactive_threads(self) -> usize { + select_thread(self.word, INACTIVE_SHIFT) + } + + #[inline] + pub(super) fn awake_but_idle_threads(self) -> usize { + debug_assert!( + self.sleeping_threads() <= self.inactive_threads(), + "sleeping threads: {} > raw idle threads {}", + self.sleeping_threads(), + self.inactive_threads() + ); + self.inactive_threads() - self.sleeping_threads() + } + + #[inline] + pub(super) fn sleeping_threads(self) -> usize { + select_thread(self.word, SLEEPING_SHIFT) + } +} + +impl std::fmt::Debug for Counters { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let word = format!("{:016x}", self.word); + fmt.debug_struct("Counters") + .field("word", &word) + .field("jobs", &self.jobs_counter().0) + .field("inactive", &self.inactive_threads()) + .field("sleeping", &self.sleeping_threads()) + .finish() + } +} diff --git a/compiler/rustc_thread_pool/src/sleep/mod.rs b/compiler/rustc_thread_pool/src/sleep/mod.rs new file mode 100644 index 00000000000..7d88ece2107 --- /dev/null +++ b/compiler/rustc_thread_pool/src/sleep/mod.rs @@ -0,0 +1,392 @@ +//! Code that decides when workers should go to sleep. See README.md +//! for an overview. + +use crate::latch::CoreLatch; +use crate::registry::WorkerThread; +use crate::DeadlockHandler; +use crossbeam_utils::CachePadded; +use std::sync::atomic::Ordering; +use std::sync::{Condvar, Mutex}; +use std::thread; + +mod counters; +pub(crate) use self::counters::THREADS_MAX; +use self::counters::{AtomicCounters, JobsEventCounter}; + +struct SleepData { + /// The number of threads in the thread pool. + worker_count: usize, + + /// The number of threads in the thread pool which are running and + /// aren't blocked in user code or sleeping. + active_threads: usize, + + /// The number of threads which are blocked in user code. + /// This doesn't include threads blocked by this module. + blocked_threads: usize, +} + +impl SleepData { + /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler + #[inline] + pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) { + if self.active_threads == 0 && self.blocked_threads > 0 { + (deadlock_handler.as_ref().unwrap())(); + } + } +} + +/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping +/// of workers. It has callbacks that are invoked periodically at significant events, +/// such as when workers are looping and looking for work, when latches are set, or when +/// jobs are published, and it either blocks threads or wakes them in response to these +/// events. See the [`README.md`] in this module for more details. +/// +/// [`README.md`] README.md +pub(super) struct Sleep { + /// One "sleep state" per worker. Used to track if a worker is sleeping and to have + /// them block. + worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, + + counters: AtomicCounters, + + data: Mutex<SleepData>, +} + +/// An instance of this struct is created when a thread becomes idle. +/// It is consumed when the thread finds work, and passed by `&mut` +/// reference for operations that preserve the idle state. (In other +/// words, producing one of these structs is evidence the thread is +/// idle.) It tracks state such as how long the thread has been idle. +pub(super) struct IdleState { + /// What is worker index of the idle thread? + worker_index: usize, + + /// How many rounds have we been circling without sleeping? + rounds: u32, + + /// Once we become sleepy, what was the sleepy counter value? + /// Set to `INVALID_SLEEPY_COUNTER` otherwise. + jobs_counter: JobsEventCounter, +} + +/// The "sleep state" for an individual worker. +#[derive(Default)] +struct WorkerSleepState { + /// Set to true when the worker goes to sleep; set to false when + /// the worker is notified or when it wakes. + is_blocked: Mutex<bool>, + + condvar: Condvar, +} + +const ROUNDS_UNTIL_SLEEPY: u32 = 32; +const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; + +impl Sleep { + pub(super) fn new(n_threads: usize) -> Sleep { + assert!(n_threads <= THREADS_MAX); + Sleep { + worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), + counters: AtomicCounters::new(), + data: Mutex::new(SleepData { + worker_count: n_threads, + active_threads: n_threads, + blocked_threads: 0, + }), + } + } + + /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler + /// if no other worker thread is active + #[inline] + pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads > 0); + debug_assert!(data.blocked_threads < data.worker_count); + debug_assert!(data.active_threads > 0); + data.active_threads -= 1; + data.blocked_threads += 1; + + data.deadlock_check(deadlock_handler); + } + + /// Mark a previously blocked Rayon worker thread as unblocked + #[inline] + pub fn mark_unblocked(&self) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads < data.worker_count); + debug_assert!(data.blocked_threads > 0); + data.active_threads += 1; + data.blocked_threads -= 1; + } + + #[inline] + pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { + self.counters.add_inactive_thread(); + + IdleState { + worker_index, + rounds: 0, + jobs_counter: JobsEventCounter::DUMMY, + } + } + + #[inline] + pub(super) fn work_found(&self) { + // If we were the last idle thread and other threads are still sleeping, + // then we should wake up another thread. + let threads_to_wake = self.counters.sub_inactive_thread(); + self.wake_any_threads(threads_to_wake as u32); + } + + #[inline] + pub(super) fn no_work_found( + &self, + idle_state: &mut IdleState, + latch: &CoreLatch, + thread: &WorkerThread, + ) { + if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { + thread::yield_now(); + idle_state.rounds += 1; + } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { + idle_state.jobs_counter = self.announce_sleepy(); + idle_state.rounds += 1; + thread::yield_now(); + } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { + idle_state.rounds += 1; + thread::yield_now(); + } else { + debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); + self.sleep(idle_state, latch, thread); + } + } + + #[cold] + fn announce_sleepy(&self) -> JobsEventCounter { + self.counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active) + .jobs_counter() + } + + #[cold] + fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) { + let worker_index = idle_state.worker_index; + + if !latch.get_sleepy() { + return; + } + + let sleep_state = &self.worker_sleep_states[worker_index]; + let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); + debug_assert!(!*is_blocked); + + // Our latch was signalled. We should wake back up fully as we + // will have some stuff to do. + if !latch.fall_asleep() { + idle_state.wake_fully(); + return; + } + + loop { + let counters = self.counters.load(Ordering::SeqCst); + + // Check if the JEC has changed since we got sleepy. + debug_assert!(idle_state.jobs_counter.is_sleepy()); + if counters.jobs_counter() != idle_state.jobs_counter { + // JEC has changed, so a new job was posted, but for some reason + // we didn't see it. We should return to just before the SLEEPY + // state so we can do another search and (if we fail to find + // work) go back to sleep. + idle_state.wake_partly(); + latch.wake_up(); + return; + } + + // Otherwise, let's move from IDLE to SLEEPING. + if self.counters.try_add_sleeping_thread(counters) { + break; + } + } + + // Successfully registered as asleep. + + // We have one last check for injected jobs to do. This protects against + // deadlock in the very unlikely event that + // + // - an external job is being injected while we are sleepy + // - that job triggers the rollover over the JEC such that we don't see it + // - we are the last active worker thread + std::sync::atomic::fence(Ordering::SeqCst); + if thread.has_injected_job() { + // If we see an externally injected job, then we have to 'wake + // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by + // the one that wakes us.) + self.counters.sub_sleeping_thread(); + } else { + { + // Decrement the number of active threads and check for a deadlock + let mut data = self.data.lock().unwrap(); + data.active_threads -= 1; + data.deadlock_check(&thread.registry.deadlock_handler); + } + + // If we don't see an injected job (the normal case), then flag + // ourselves as asleep and wait till we are notified. + // + // (Note that `is_blocked` is held under a mutex and the mutex was + // acquired *before* we incremented the "sleepy counter". This means + // that whomever is coming to wake us will have to wait until we + // release the mutex in the call to `wait`, so they will see this + // boolean as true.) + thread.registry.release_thread(); + *is_blocked = true; + while *is_blocked { + is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); + } + + // Drop `is_blocked` now in case `acquire_thread` blocks + drop(is_blocked); + + thread.registry.acquire_thread(); + } + + // Update other state: + idle_state.wake_fully(); + latch.wake_up(); + } + + /// Notify the given thread that it should wake up (if it is + /// sleeping). When this method is invoked, we typically know the + /// thread is asleep, though in rare cases it could have been + /// awoken by (e.g.) new work having been posted. + pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { + self.wake_specific_thread(target_worker_index); + } + + /// Signals that `num_jobs` new jobs were injected into the thread + /// pool from outside. This function will ensure that there are + /// threads available to process them, waking threads from sleep + /// if necessary. + /// + /// # Parameters + /// + /// - `num_jobs` -- lower bound on number of jobs available for stealing. + /// We'll try to get at least one thread per job. + #[inline] + pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + // This fence is needed to guarantee that threads + // as they are about to fall asleep, observe any + // new jobs that may have been injected. + std::sync::atomic::fence(Ordering::SeqCst); + + self.new_jobs(num_jobs, queue_was_empty) + } + + /// Signals that `num_jobs` new jobs were pushed onto a thread's + /// local deque. This function will try to ensure that there are + /// threads available to process them, waking threads from sleep + /// if necessary. However, this is not guaranteed: under certain + /// race conditions, the function may fail to wake any new + /// threads; in that case the existing thread should eventually + /// pop the job. + /// + /// # Parameters + /// + /// - `num_jobs` -- lower bound on number of jobs available for stealing. + /// We'll try to get at least one thread per job. + #[inline] + pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + self.new_jobs(num_jobs, queue_was_empty) + } + + /// Common helper for `new_injected_jobs` and `new_internal_jobs`. + #[inline] + fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + // Read the counters and -- if sleepy workers have announced themselves + // -- announce that there is now work available. The final value of `counters` + // with which we exit the loop thus corresponds to a state when + let counters = self + .counters + .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); + let num_awake_but_idle = counters.awake_but_idle_threads(); + let num_sleepers = counters.sleeping_threads(); + + if num_sleepers == 0 { + // nobody to wake + return; + } + + // Promote from u16 to u32 so we can interoperate with + // num_jobs more easily. + let num_awake_but_idle = num_awake_but_idle as u32; + let num_sleepers = num_sleepers as u32; + + // If the queue is non-empty, then we always wake up a worker + // -- clearly the existing idle jobs aren't enough. Otherwise, + // check to see if we have enough idle workers. + if !queue_was_empty { + let num_to_wake = Ord::min(num_jobs, num_sleepers); + self.wake_any_threads(num_to_wake); + } else if num_awake_but_idle < num_jobs { + let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers); + self.wake_any_threads(num_to_wake); + } + } + + #[cold] + fn wake_any_threads(&self, mut num_to_wake: u32) { + if num_to_wake > 0 { + for i in 0..self.worker_sleep_states.len() { + if self.wake_specific_thread(i) { + num_to_wake -= 1; + if num_to_wake == 0 { + return; + } + } + } + } + } + + fn wake_specific_thread(&self, index: usize) -> bool { + let sleep_state = &self.worker_sleep_states[index]; + + let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); + if *is_blocked { + *is_blocked = false; + + // Increment the number of active threads + self.data.lock().unwrap().active_threads += 1; + + sleep_state.condvar.notify_one(); + + // When the thread went to sleep, it will have incremented + // this value. When we wake it, its our job to decrement + // it. We could have the thread do it, but that would + // introduce a delay between when the thread was + // *notified* and when this counter was decremented. That + // might mislead people with new work into thinking that + // there are sleeping threads that they should try to + // wake, when in fact there is nothing left for them to + // do. + self.counters.sub_sleeping_thread(); + + true + } else { + false + } + } +} + +impl IdleState { + fn wake_fully(&mut self) { + self.rounds = 0; + self.jobs_counter = JobsEventCounter::DUMMY; + } + + fn wake_partly(&mut self) { + self.rounds = ROUNDS_UNTIL_SLEEPY; + self.jobs_counter = JobsEventCounter::DUMMY; + } +} diff --git a/compiler/rustc_thread_pool/src/spawn/mod.rs b/compiler/rustc_thread_pool/src/spawn/mod.rs new file mode 100644 index 00000000000..034df30dcfb --- /dev/null +++ b/compiler/rustc_thread_pool/src/spawn/mod.rs @@ -0,0 +1,164 @@ +use crate::job::*; +use crate::registry::Registry; +use crate::tlv::Tlv; +use crate::unwind; +use std::mem; +use std::sync::Arc; + +/// Puts the task into the Rayon threadpool's job queue in the "static" +/// or "global" scope. Just like a standard thread, this task is not +/// tied to the current stack frame, and hence it cannot hold any +/// references other than those with `'static` lifetime. If you want +/// to spawn a task that references stack data, use [the `scope()` +/// function][scope] to create a scope. +/// +/// [scope]: fn.scope.html +/// +/// Since tasks spawned with this function cannot hold references into +/// the enclosing stack frame, you almost certainly want to use a +/// `move` closure as their argument (otherwise, the closure will +/// typically hold references to any variables from the enclosing +/// function that you happen to use). +/// +/// This API assumes that the closure is executed purely for its +/// side-effects (i.e., it might send messages, modify data protected +/// by a mutex, or some such thing). +/// +/// There is no guaranteed order of execution for spawns, given that +/// other threads may steal tasks at any time. However, they are +/// generally prioritized in a LIFO order on the thread from which +/// they were spawned. Other threads always steal from the other end of +/// the deque, like FIFO order. The idea is that "recent" tasks are +/// most likely to be fresh in the local CPU's cache, while other +/// threads can steal older "stale" tasks. For an alternate approach, +/// consider [`spawn_fifo()`] instead. +/// +/// [`spawn_fifo()`]: fn.spawn_fifo.html +/// +/// # Panic handling +/// +/// If this closure should panic, the resulting panic will be +/// propagated to the panic handler registered in the `ThreadPoolBuilder`, +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// details. +/// +/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler +/// +/// # Examples +/// +/// This code creates a Rayon task that increments a global counter. +/// +/// ```rust +/// # use rayon_core as rayon; +/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +/// +/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; +/// +/// rayon::spawn(move || { +/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst); +/// }); +/// ``` +pub fn spawn<F>(func: F) +where + F: FnOnce() + Send + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_in(func, &Registry::current()) } +} + +/// Spawns an asynchronous job in `registry.` +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>) +where + F: FnOnce() + Send + 'static, +{ + // We assert that this does not hold any references (we know + // this because of the `'static` bound in the interface); + // moreover, we assert that the code below is not supposed to + // be able to panic, and hence the data won't leak but will be + // enqueued into some deque for later execution. + let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic + let job_ref = spawn_job(func, registry); + registry.inject_or_push(job_ref); + mem::forget(abort_guard); +} + +unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef +where + F: FnOnce() + Send + 'static, +{ + // Ensure that registry cannot terminate until this job has + // executed. This ref is decremented at the (*) below. + registry.increment_terminate_count(); + + HeapJob::new(Tlv::null(), { + let registry = Arc::clone(registry); + move || { + registry.catch_unwind(func); + registry.terminate(); // (*) permit registry to terminate now + } + }) + .into_static_job_ref() +} + +/// Fires off a task into the Rayon threadpool in the "static" or +/// "global" scope. Just like a standard thread, this task is not +/// tied to the current stack frame, and hence it cannot hold any +/// references other than those with `'static` lifetime. If you want +/// to spawn a task that references stack data, use [the `scope_fifo()` +/// function](fn.scope_fifo.html) to create a scope. +/// +/// The behavior is essentially the same as [the `spawn` +/// function](fn.spawn.html), except that calls from the same thread +/// will be prioritized in FIFO order. This is similar to the now- +/// deprecated [`breadth_first`] option, except the effect is isolated +/// to relative `spawn_fifo` calls, not all threadpool tasks. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panic handling +/// +/// If this closure should panic, the resulting panic will be +/// propagated to the panic handler registered in the `ThreadPoolBuilder`, +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// details. +/// +/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler +pub fn spawn_fifo<F>(func: F) +where + F: FnOnce() + Send + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_fifo_in(func, &Registry::current()) } +} + +/// Spawns an asynchronous FIFO job in `registry.` +/// +/// Unsafe because `registry` must not yet have terminated. +pub(super) unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>) +where + F: FnOnce() + Send + 'static, +{ + // We assert that this does not hold any references (we know + // this because of the `'static` bound in the interface); + // moreover, we assert that the code below is not supposed to + // be able to panic, and hence the data won't leak but will be + // enqueued into some deque for later execution. + let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic + let job_ref = spawn_job(func, registry); + + // If we're in the pool, use our thread's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match registry.current_thread() { + Some(worker) => worker.push_fifo(job_ref), + None => registry.inject(job_ref), + } + mem::forget(abort_guard); +} + +#[cfg(test)] +mod test; diff --git a/compiler/rustc_thread_pool/src/spawn/test.rs b/compiler/rustc_thread_pool/src/spawn/test.rs new file mode 100644 index 00000000000..b7a0535aabf --- /dev/null +++ b/compiler/rustc_thread_pool/src/spawn/test.rs @@ -0,0 +1,255 @@ +use crate::scope; +use std::any::Any; +use std::sync::mpsc::channel; +use std::sync::Mutex; + +use super::{spawn, spawn_fifo}; +use crate::ThreadPoolBuilder; + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_then_join_in_worker() { + let (tx, rx) = channel(); + scope(move |_| { + spawn(move || tx.send(22).unwrap()); + }); + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_then_join_outside_worker() { + let (tx, rx) = channel(); + spawn(move || tx.send(22).unwrap()); + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_fwd() { + let (tx, rx) = channel(); + + let tx = Mutex::new(tx); + let panic_handler = move |err: Box<dyn Any + Send>| { + let tx = tx.lock().unwrap(); + if let Some(&msg) = err.downcast_ref::<&str>() { + if msg == "Hello, world!" { + tx.send(1).unwrap(); + } else { + tx.send(2).unwrap(); + } + } else { + tx.send(3).unwrap(); + } + }; + + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + + builder + .build() + .unwrap() + .spawn(move || panic!("Hello, world!")); + + assert_eq!(1, rx.recv().unwrap()); +} + +/// Test what happens when the thread-pool is dropped but there are +/// still active asynchronous tasks. We expect the thread-pool to stay +/// alive and executing until those threads are complete. +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn termination_while_things_are_executing() { + let (tx0, rx0) = channel(); + let (tx1, rx1) = channel(); + + // Create a thread-pool and spawn some code in it, but then drop + // our reference to it. + { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + thread_pool.spawn(move || { + let data = rx0.recv().unwrap(); + + // At this point, we know the "main" reference to the + // `ThreadPool` has been dropped, but there are still + // active threads. Launch one more. + spawn(move || { + tx1.send(data).unwrap(); + }); + }); + } + + tx0.send(22).unwrap(); + let v = rx1.recv().unwrap(); + assert_eq!(v, 22); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn custom_panic_handler_and_spawn() { + let (tx, rx) = channel(); + + // Create a parallel closure that will send panics on the + // channel; since the closure is potentially executed in parallel + // with itself, we have to wrap `tx` in a mutex. + let tx = Mutex::new(tx); + let panic_handler = move |e: Box<dyn Any + Send>| { + tx.lock().unwrap().send(e).unwrap(); + }; + + // Execute an async that will panic. + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + builder.build().unwrap().spawn(move || { + panic!("Hello, world!"); + }); + + // Check that we got back the panic we expected. + let error = rx.recv().unwrap(); + if let Some(&msg) = error.downcast_ref::<&str>() { + assert_eq!(msg, "Hello, world!"); + } else { + panic!("did not receive a string from panic handler"); + } +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn custom_panic_handler_and_nested_spawn() { + let (tx, rx) = channel(); + + // Create a parallel closure that will send panics on the + // channel; since the closure is potentially executed in parallel + // with itself, we have to wrap `tx` in a mutex. + let tx = Mutex::new(tx); + let panic_handler = move |e| { + tx.lock().unwrap().send(e).unwrap(); + }; + + // Execute an async that will (eventually) panic. + const PANICS: usize = 3; + let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); + builder.build().unwrap().spawn(move || { + // launch 3 nested spawn-asyncs; these should be in the same + // thread-pool and hence inherit the same panic handler + for _ in 0..PANICS { + spawn(move || { + panic!("Hello, world!"); + }); + } + }); + + // Check that we get back the panics we expected. + for _ in 0..PANICS { + let error = rx.recv().unwrap(); + if let Some(&msg) = error.downcast_ref::<&str>() { + assert_eq!(msg, "Hello, world!"); + } else { + panic!("did not receive a string from panic handler"); + } + } +} + +macro_rules! test_order { + ($outer_spawn:ident, $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + $outer_spawn(move || { + for j in 0..10 { + let tx = tx.clone(); + $inner_spawn(move || { + tx.send(i * 10 + j).unwrap(); + }); + } + }); + } + }); + rx.iter().collect::<Vec<i32>>() + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn lifo_order() { + // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order. + let vec = test_order!(spawn, spawn); + let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn fifo_order() { + // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. + let vec = test_order!(spawn_fifo, spawn_fifo); + let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_order!(spawn, spawn_fifo); + let expected: Vec<i32> = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_order!(spawn_fifo, spawn); + let expected: Vec<i32> = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_send { + ($spawn:ident, $tx:ident, $i:expr) => {{ + let tx = $tx.clone(); + $spawn(move || tx.send($i).unwrap()); + }}; +} + +/// Test mixed spawns pushing a series of numbers, interleaved such +/// such that negative values are using the second kind of spawn. +macro_rules! test_mixed_order { + ($pos_spawn:ident, $neg_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + spawn_send!($pos_spawn, tx, 0); + spawn_send!($neg_spawn, tx, -1); + spawn_send!($pos_spawn, tx, 1); + spawn_send!($neg_spawn, tx, -2); + spawn_send!($pos_spawn, tx, 2); + spawn_send!($neg_spawn, tx, -3); + spawn_send!($pos_spawn, tx, 3); + }); + rx.iter().collect::<Vec<i32>>() + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_lifo_fifo_order() { + let vec = test_mixed_order!(spawn, spawn_fifo); + let expected = vec![3, -1, 2, -2, 1, -3, 0]; + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(spawn_fifo, spawn); + let expected = vec![0, -3, 1, -2, 2, -1, 3]; + assert_eq!(vec, expected); +} diff --git a/compiler/rustc_thread_pool/src/test.rs b/compiler/rustc_thread_pool/src/test.rs new file mode 100644 index 00000000000..25b8487f73b --- /dev/null +++ b/compiler/rustc_thread_pool/src/test.rs @@ -0,0 +1,200 @@ +#![cfg(test)] + +use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Barrier}; + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn worker_thread_index() { + let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + assert_eq!(pool.current_num_threads(), 22); + assert_eq!(pool.current_thread_index(), None); + let index = pool.install(|| pool.current_thread_index().unwrap()); + assert!(index < 22); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn start_callback_called() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let barrier = Arc::new(Barrier::new(n_threads + 1)); + + let b = Arc::clone(&barrier); + let nc = Arc::clone(&n_called); + let start_handler = move |_| { + nc.fetch_add(1, Ordering::SeqCst); + b.wait(); + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .start_handler(start_handler); + let _ = conf.build().unwrap(); + + // Wait for all the threads to have been scheduled to run. + barrier.wait(); + + // The handler must have been called on every started thread. + assert_eq!(n_called.load(Ordering::SeqCst), n_threads); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn exit_callback_called() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let barrier = Arc::new(Barrier::new(n_threads + 1)); + + let b = Arc::clone(&barrier); + let nc = Arc::clone(&n_called); + let exit_handler = move |_| { + nc.fetch_add(1, Ordering::SeqCst); + b.wait(); + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .exit_handler(exit_handler); + { + let _ = conf.build().unwrap(); + // Drop the pool so it stops the running threads. + } + + // Wait for all the threads to have been scheduled to run. + barrier.wait(); + + // The handler must have been called on every exiting thread. + assert_eq!(n_called.load(Ordering::SeqCst), n_threads); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn handler_panics_handled_correctly() { + let n_threads = 16; + let n_called = Arc::new(AtomicUsize::new(0)); + // Wait for all the threads in the pool plus the one running tests. + let start_barrier = Arc::new(Barrier::new(n_threads + 1)); + let exit_barrier = Arc::new(Barrier::new(n_threads + 1)); + + let start_handler = move |_| { + panic!("ensure panic handler is called when starting"); + }; + let exit_handler = move |_| { + panic!("ensure panic handler is called when exiting"); + }; + + let sb = Arc::clone(&start_barrier); + let eb = Arc::clone(&exit_barrier); + let nc = Arc::clone(&n_called); + let panic_handler = move |_| { + let val = nc.fetch_add(1, Ordering::SeqCst); + if val < n_threads { + sb.wait(); + } else { + eb.wait(); + } + }; + + let conf = ThreadPoolBuilder::new() + .num_threads(n_threads) + .start_handler(start_handler) + .exit_handler(exit_handler) + .panic_handler(panic_handler); + { + let _ = conf.build().unwrap(); + + // Wait for all the threads to start, panic in the start handler, + // and been taken care of by the panic handler. + start_barrier.wait(); + + // Drop the pool so it stops the running threads. + } + + // Wait for all the threads to exit, panic in the exit handler, + // and been taken care of by the panic handler. + exit_barrier.wait(); + + // The panic handler must have been called twice on every thread. + assert_eq!(n_called.load(Ordering::SeqCst), 2 * n_threads); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn check_config_build() { + let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + assert_eq!(pool.current_num_threads(), 22); +} + +/// Helper used by check_error_send_sync to ensure ThreadPoolBuildError is Send + Sync +fn _send_sync<T: Send + Sync>() {} + +#[test] +fn check_error_send_sync() { + _send_sync::<ThreadPoolBuildError>(); +} + +#[allow(deprecated)] +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn configuration() { + let start_handler = move |_| {}; + let exit_handler = move |_| {}; + let panic_handler = move |_| {}; + let thread_name = move |i| format!("thread_name_{}", i); + + // Ensure we can call all public methods on Configuration + crate::Configuration::new() + .thread_name(thread_name) + .num_threads(5) + .panic_handler(panic_handler) + .stack_size(4e6 as usize) + .breadth_first() + .start_handler(start_handler) + .exit_handler(exit_handler) + .build() + .unwrap(); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn default_pool() { + ThreadPoolBuilder::default().build().unwrap(); +} + +/// Test that custom spawned threads get their `WorkerThread` cleared once +/// the pool is done with them, allowing them to be used with rayon again +/// later. e.g. WebAssembly want to have their own pool of available threads. +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> { + let n_threads = 5; + let mut handles = vec![]; + let pool = ThreadPoolBuilder::new() + .num_threads(n_threads) + .spawn_handler(|thread| { + let handle = std::thread::spawn(move || { + thread.run(); + + // Afterward, the current thread shouldn't be set anymore. + assert_eq!(crate::current_thread_index(), None); + }); + handles.push(handle); + Ok(()) + }) + .build()?; + assert_eq!(handles.len(), n_threads); + + pool.install(|| assert!(crate::current_thread_index().is_some())); + drop(pool); + + // Wait for all threads to make their assertions and exit + for handle in handles { + handle.join().unwrap(); + } + + Ok(()) +} diff --git a/compiler/rustc_thread_pool/src/thread_pool/mod.rs b/compiler/rustc_thread_pool/src/thread_pool/mod.rs new file mode 100644 index 00000000000..65af6d7106e --- /dev/null +++ b/compiler/rustc_thread_pool/src/thread_pool/mod.rs @@ -0,0 +1,514 @@ +//! Contains support for user-managed thread pools, represented by the +//! the [`ThreadPool`] type (see that struct for details). +//! +//! [`ThreadPool`]: struct.ThreadPool.html + +use crate::broadcast::{self, BroadcastContext}; +use crate::join; +use crate::registry::{Registry, ThreadSpawn, WorkerThread}; +use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; +use crate::spawn; +use crate::{scope, Scope}; +use crate::{scope_fifo, ScopeFifo}; +use crate::{ThreadPoolBuildError, ThreadPoolBuilder}; +use std::error::Error; +use std::fmt; +use std::sync::Arc; + +mod test; + +/// Represents a user created [thread-pool]. +/// +/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads +/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then +/// execute functions explicitly within this [`ThreadPool`] using +/// [`ThreadPool::install()`]. By contrast, top level rayon functions +/// (like `join()`) will execute implicitly within the current thread-pool. +/// +/// +/// ## Creating a ThreadPool +/// +/// ```rust +/// # use rayon_core as rayon; +/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); +/// ``` +/// +/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s +/// threads. In addition, any other rayon operations called inside of `install()` will also +/// execute in the context of the `ThreadPool`. +/// +/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, +/// they will complete executing any remaining work that you have spawned, and automatically +/// terminate. +/// +/// +/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool +/// [`ThreadPool`]: struct.ThreadPool.html +/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new +/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html +/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build +/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install +pub struct ThreadPool { + registry: Arc<Registry>, +} + +impl ThreadPool { + #[deprecated(note = "Use `ThreadPoolBuilder::build`")] + #[allow(deprecated)] + /// Deprecated in favor of `ThreadPoolBuilder::build`. + pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> { + Self::build(configuration.into_builder()).map_err(Box::from) + } + + pub(super) fn build<S>( + builder: ThreadPoolBuilder<S>, + ) -> Result<ThreadPool, ThreadPoolBuildError> + where + S: ThreadSpawn, + { + let registry = Registry::new(builder)?; + Ok(ThreadPool { registry }) + } + + /// Executes `op` within the threadpool. Any attempts to use + /// `join`, `scope`, or parallel iterators will then operate + /// within that threadpool. + /// + /// # Warning: thread-local data + /// + /// Because `op` is executing within the Rayon thread-pool, + /// thread-local data from the current thread will not be + /// accessible. + /// + /// # Warning: execution order + /// + /// If the current thread is part of a different thread pool, it will try to + /// keep busy while the `op` completes in its target pool, similar to + /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may + /// potentially schedule other tasks to run on the current thread in the + /// meantime. For example + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap(); + /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap(); + /// let do_it = || { + /// print!("one "); + /// pool.install(||{}); + /// print!("two "); + /// }; + /// rayon::join(|| do_it(), || do_it()); + /// } + /// ``` + /// + /// Since we configured just one thread in the global pool, one might + /// expect `do_it()` to run sequentially, producing: + /// + /// ```ascii + /// one two one two + /// ``` + /// + /// However each call to `install()` yields implicitly, allowing rayon to + /// run multiple instances of `do_it()` concurrently on the single, global + /// thread. The following output would be equally valid: + /// + /// ```ascii + /// one one two two + /// ``` + /// + /// # Panics + /// + /// If `op` should panic, that panic will be propagated. + /// + /// ## Using `install()` + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); + /// let n = pool.install(|| fib(20)); + /// println!("{}", n); + /// } + /// + /// fn fib(n: usize) -> usize { + /// if n == 0 || n == 1 { + /// return n; + /// } + /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` + /// return a + b; + /// } + /// ``` + pub fn install<OP, R>(&self, op: OP) -> R + where + OP: FnOnce() -> R + Send, + R: Send, + { + self.registry.in_worker(|_, _| op()) + } + + /// Executes `op` within every thread in the threadpool. Any attempts to use + /// `join`, `scope`, or parallel iterators will then operate within that + /// threadpool. + /// + /// Broadcasts are executed on each thread after they have exhausted their + /// local work queue, before they attempt work-stealing from other threads. + /// The goal of that strategy is to run everywhere in a timely manner + /// *without* being too disruptive to current work. There may be alternative + /// broadcast styles added in the future for more or less aggressive + /// injection, if the need arises. + /// + /// # Warning: thread-local data + /// + /// Because `op` is executing within the Rayon thread-pool, + /// thread-local data from the current thread will not be + /// accessible. + /// + /// # Panics + /// + /// If `op` should panic on one or more threads, exactly one panic + /// will be propagated, only after all threads have completed + /// (or panicked) their own `op`. + /// + /// # Examples + /// + /// ``` + /// # use rayon_core as rayon; + /// use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// fn main() { + /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap(); + /// + /// // The argument gives context, including the index of each thread. + /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index()); + /// assert_eq!(v, &[0, 1, 4, 9, 16]); + /// + /// // The closure can reference the local stack + /// let count = AtomicUsize::new(0); + /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed)); + /// assert_eq!(count.into_inner(), 5); + /// } + /// ``` + pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R> + where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, + { + // We assert that `self.registry` has not terminated. + unsafe { broadcast::broadcast_in(op, &self.registry) } + } + + /// Returns the (current) number of threads in the thread pool. + /// + /// # Future compatibility note + /// + /// Note that unless this thread-pool was created with a + /// [`ThreadPoolBuilder`] that specifies the number of threads, + /// then this number may vary over time in future versions (see [the + /// `num_threads()` method for details][snt]). + /// + /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads + /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html + #[inline] + pub fn current_num_threads(&self) -> usize { + self.registry.num_threads() + } + + /// If called from a Rayon worker thread in this thread-pool, + /// returns the index of that thread; if not called from a Rayon + /// thread, or called from a Rayon thread that belongs to a + /// different thread-pool, returns `None`. + /// + /// The index for a given thread will not change over the thread's + /// lifetime. However, multiple threads may share the same index if + /// they are in distinct thread-pools. + /// + /// # Future compatibility note + /// + /// Currently, every thread-pool (including the global + /// thread-pool) has a fixed number of threads, but this may + /// change in future Rayon versions (see [the `num_threads()` method + /// for details][snt]). In that case, the index for a + /// thread would not change during its lifetime, but thread + /// indices may wind up being reused if threads are terminated and + /// restarted. + /// + /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads + #[inline] + pub fn current_thread_index(&self) -> Option<usize> { + let curr = self.registry.current_thread()?; + Some(curr.index()) + } + + /// Returns true if the current worker thread currently has "local + /// tasks" pending. This can be useful as part of a heuristic for + /// deciding whether to spawn a new task or execute code on the + /// current thread, particularly in breadth-first + /// schedulers. However, keep in mind that this is an inherently + /// racy check, as other worker threads may be actively "stealing" + /// tasks from our local deque. + /// + /// **Background:** Rayon's uses a [work-stealing] scheduler. The + /// key idea is that each thread has its own [deque] of + /// tasks. Whenever a new task is spawned -- whether through + /// `join()`, `Scope::spawn()`, or some other means -- that new + /// task is pushed onto the thread's *local* deque. Worker threads + /// have a preference for executing their own tasks; if however + /// they run out of tasks, they will go try to "steal" tasks from + /// other threads. This function therefore has an inherent race + /// with other active worker threads, which may be removing items + /// from the local deque. + /// + /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing + /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue + #[inline] + pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { + let curr = self.registry.current_thread()?; + Some(!curr.local_deque_is_empty()) + } + + /// Execute `oper_a` and `oper_b` in the thread-pool and return + /// the results. Equivalent to `self.install(|| join(oper_a, + /// oper_b))`. + pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + self.install(|| join(oper_a, oper_b)) + } + + /// Creates a scope that executes within this thread-pool. + /// Equivalent to `self.install(|| scope(...))`. + /// + /// See also: [the `scope()` function][scope]. + /// + /// [scope]: fn.scope.html + pub fn scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, + { + self.install(|| scope(op)) + } + + /// Creates a scope that executes within this thread-pool. + /// Spawns from the same thread are prioritized in relative FIFO order. + /// Equivalent to `self.install(|| scope_fifo(...))`. + /// + /// See also: [the `scope_fifo()` function][scope_fifo]. + /// + /// [scope_fifo]: fn.scope_fifo.html + pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, + R: Send, + { + self.install(|| scope_fifo(op)) + } + + /// Creates a scope that spawns work into this thread-pool. + /// + /// See also: [the `in_place_scope()` function][in_place_scope]. + /// + /// [in_place_scope]: fn.in_place_scope.html + pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R, + { + do_in_place_scope(Some(&self.registry), op) + } + + /// Creates a scope that spawns work into this thread-pool in FIFO order. + /// + /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. + /// + /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html + pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&ScopeFifo<'scope>) -> R, + { + do_in_place_scope_fifo(Some(&self.registry), op) + } + + /// Spawns an asynchronous task in this thread-pool. This task will + /// run in the implicit, global scope, which means that it may outlast + /// the current stack frame -- therefore, it cannot capture any references + /// onto the stack (you will likely need a `move` closure). + /// + /// See also: [the `spawn()` function defined on scopes][spawn]. + /// + /// [spawn]: struct.Scope.html#method.spawn + pub fn spawn<OP>(&self, op: OP) + where + OP: FnOnce() + Send + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { spawn::spawn_in(op, &self.registry) } + } + + /// Spawns an asynchronous task in this thread-pool. This task will + /// run in the implicit, global scope, which means that it may outlast + /// the current stack frame -- therefore, it cannot capture any references + /// onto the stack (you will likely need a `move` closure). + /// + /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. + /// + /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo + pub fn spawn_fifo<OP>(&self, op: OP) + where + OP: FnOnce() + Send + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { spawn::spawn_fifo_in(op, &self.registry) } + } + + /// Spawns an asynchronous task on every thread in this thread-pool. This task + /// will run in the implicit, global scope, which means that it may outlast the + /// current stack frame -- therefore, it cannot capture any references onto the + /// stack (you will likely need a `move` closure). + pub fn spawn_broadcast<OP>(&self, op: OP) + where + OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { broadcast::spawn_broadcast_in(op, &self.registry) } + } + + /// Cooperatively yields execution to Rayon. + /// + /// This is similar to the general [`yield_now()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_now(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_now()) + } + + /// Cooperatively yields execution to local Rayon work. + /// + /// This is similar to the general [`yield_local()`], but only if the current + /// thread is part of *this* thread pool. + /// + /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if + /// nothing was available, or `None` if the current thread is not part this pool. + pub fn yield_local(&self) -> Option<Yield> { + let curr = self.registry.current_thread()?; + Some(curr.yield_local()) + } + + pub(crate) fn wait_until_stopped(self) { + let registry = self.registry.clone(); + drop(self); + registry.wait_until_stopped(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.registry.terminate(); + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool") + .field("num_threads", &self.current_num_threads()) + .field("id", &self.registry.id()) + .finish() + } +} + +/// If called from a Rayon worker thread, returns the index of that +/// thread within its current pool; if not called from a Rayon thread, +/// returns `None`. +/// +/// The index for a given thread will not change over the thread's +/// lifetime. However, multiple threads may share the same index if +/// they are in distinct thread-pools. +/// +/// See also: [the `ThreadPool::current_thread_index()` method]. +/// +/// [m]: struct.ThreadPool.html#method.current_thread_index +/// +/// # Future compatibility note +/// +/// Currently, every thread-pool (including the global +/// thread-pool) has a fixed number of threads, but this may +/// change in future Rayon versions (see [the `num_threads()` method +/// for details][snt]). In that case, the index for a +/// thread would not change during its lifetime, but thread +/// indices may wind up being reused if threads are terminated and +/// restarted. +/// +/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads +#[inline] +pub fn current_thread_index() -> Option<usize> { + unsafe { + let curr = WorkerThread::current().as_ref()?; + Some(curr.index()) + } +} + +/// If called from a Rayon worker thread, indicates whether that +/// thread's local deque still has pending tasks. Otherwise, returns +/// `None`. For more information, see [the +/// `ThreadPool::current_thread_has_pending_tasks()` method][m]. +/// +/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks +#[inline] +pub fn current_thread_has_pending_tasks() -> Option<bool> { + unsafe { + let curr = WorkerThread::current().as_ref()?; + Some(!curr.local_deque_is_empty()) + } +} + +/// Cooperatively yields execution to Rayon. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in the pool, then executes it. Completion of +/// that work might include nested work or further work stealing. +/// +/// This is similar to [`std::thread::yield_now()`], but does not literally make +/// that call. If you are implementing a polling loop, you may want to also +/// yield to the OS scheduler yourself if no Rayon work was found. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_now() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_now()) + } +} + +/// Cooperatively yields execution to local Rayon work. +/// +/// If the current thread is part of a rayon thread pool, this looks for a +/// single unit of pending work in this thread's queue, then executes it. +/// Completion of that work might include nested work or further work stealing. +/// +/// This is similar to [`yield_now()`], but does not steal from other threads. +/// +/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if +/// nothing was available, or `None` if this thread is not part of any pool at all. +pub fn yield_local() -> Option<Yield> { + unsafe { + let thread = WorkerThread::current().as_ref()?; + Some(thread.yield_local()) + } +} + +/// Result of [`yield_now()`] or [`yield_local()`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Yield { + /// Work was found and executed. + Executed, + /// No available work was found. + Idle, +} diff --git a/compiler/rustc_thread_pool/src/thread_pool/test.rs b/compiler/rustc_thread_pool/src/thread_pool/test.rs new file mode 100644 index 00000000000..88b36282d48 --- /dev/null +++ b/compiler/rustc_thread_pool/src/thread_pool/test.rs @@ -0,0 +1,418 @@ +#![cfg(test)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; + +use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; + +#[test] +#[should_panic(expected = "Hello, world!")] +fn panic_propagate() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + thread_pool.install(|| { + panic!("Hello, world!"); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn workers_stop() { + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = thread_pool.install(|| { + // do some work on these threads + join_a_lot(22); + + Arc::clone(&thread_pool.registry) + }); + assert_eq!(registry.num_threads(), 22); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +fn join_a_lot(n: usize) { + if n > 0 { + join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); + } +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn sleeper_stop() { + use std::{thread, time}; + + let registry; + + { + // once we exit this block, thread-pool will be dropped + let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); + registry = Arc::clone(&thread_pool.registry); + + // Give time for at least some of the thread pool to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + } + + // once thread-pool is dropped, registry should terminate, which + // should lead to worker threads stopping + registry.wait_until_stopped(); +} + +/// Creates a start/exit handler that increments an atomic counter. +fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) { + let count = Arc::new(AtomicUsize::new(0)); + (Arc::clone(&count), move |_| { + count.fetch_add(1, Ordering::SeqCst); + }) +} + +/// Wait until a counter is no longer shared, then return its value. +fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { + use std::{thread, time}; + + for _ in 0..60 { + counter = match Arc::try_unwrap(counter) { + Ok(counter) => return counter.into_inner(), + Err(counter) => { + thread::sleep(time::Duration::from_secs(1)); + counter + } + }; + } + + // That's too long! + panic!("Counter is still shared!"); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn failed_thread_stack() { + // Note: we first tried to force failure with a `usize::MAX` stack, but + // macOS and Windows weren't fazed, or at least didn't fail the way we want. + // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a + // 2GB stack, so it might not fail until the second thread. + let stack_size = ::std::isize::MAX as usize; + + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .stack_size(stack_size) + .start_handler(start_handler) + .exit_handler(exit_handler); + + let pool = builder.build(); + assert!(pool.is_err(), "thread stack should have failed!"); + + // With such a huge stack, 64-bit will probably fail on the first thread; + // 32-bit might manage the first 2GB, but certainly fail the second. + let start_count = wait_for_counter(start_count); + assert!(start_count <= 1); + assert_eq!(start_count, wait_for_counter(exit_count)); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn panic_thread_name() { + let (start_count, start_handler) = count_handler(); + let (exit_count, exit_handler) = count_handler(); + let builder = ThreadPoolBuilder::new() + .num_threads(10) + .start_handler(start_handler) + .exit_handler(exit_handler) + .thread_name(|i| { + if i >= 5 { + panic!(); + } + format!("panic_thread_name#{}", i) + }); + + let pool = crate::unwind::halt_unwinding(|| builder.build()); + assert!(pool.is_err(), "thread-name panic should propagate!"); + + // Assuming they're created in order, threads 0 through 4 should have + // been started already, and then terminated by the panic. + assert_eq!(5, wait_for_counter(start_count)); + assert_eq!(5, wait_for_counter(exit_count)); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn self_install() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + // If the inner `install` blocks, then nothing will actually run it! + assert!(pool.install(|| pool.install(|| true))); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mutual_install() { + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn mutual_install_sleepy() { + use std::{thread, time}; + + let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let ok = pool1.install(|| { + // This creates a dependency from `pool1` -> `pool2` + pool2.install(|| { + // Give `pool1` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // This creates a dependency from `pool2` -> `pool1` + pool1.install(|| { + // Give `pool2` time to fall asleep. + thread::sleep(time::Duration::from_secs(1)); + + // If they blocked on inter-pool installs, there would be no + // threads left to run this! + true + }) + }) + }); + assert!(ok); +} + +#[test] +#[allow(deprecated)] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn check_thread_pool_new() { + let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); + assert_eq!(pool.current_num_threads(), 22); +} + +macro_rules! test_scope_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + pool.$scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_lifo_order() { + let vec = test_scope_order!(scope => spawn); + let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn scope_fifo_order() { + let vec = test_scope_order!(scope_fifo => spawn_fifo); + let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_spawn_order { + ($spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = &builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + pool.$spawn(move || { + tx.send(i).unwrap(); + }); + } + }); + rx.iter().collect::<Vec<i32>>() + }}; +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_lifo_order() { + let vec = test_spawn_order!(spawn); + let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_fifo_order() { + let vec = test_spawn_order!(spawn_fifo); + let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_scopes() { + // Create matching scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) + where + OP: FnOnce(&[&Scope<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn nested_fifo_scopes() { + // Create matching fifo scopes for every thread pool. + fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) + where + OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, + { + if let Some((pool, tail)) = pools.split_first() { + pool.scope_fifo(move |s| { + // This move reduces the reference lifetimes by variance to match s, + // but the actual scopes are still tied to the invariant 'scope. + let mut scopes = scopes; + scopes.push(s); + nest(tail, scopes, op) + }) + } else { + (op)(&scopes) + } + } + + let pools: Vec<_> = (0..10) + .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) + .collect(); + + let counter = AtomicUsize::new(0); + nest(&pools, vec![], |scopes| { + for &s in scopes { + s.spawn_fifo(|_| { + // Our 'scope lets us borrow the counter in every pool. + counter.fetch_add(1, Ordering::Relaxed); + }); + } + }); + assert_eq!(counter.into_inner(), pools.len()); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn in_place_scope_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn in_place_scope_fifo_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope_fifo(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn_fifo(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} + +#[test] +fn yield_now_to_spawn() { + let (tx, rx) = channel(); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_now(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} + +#[test] +fn yield_local_to_spawn() { + let (tx, rx) = channel(); + + // Queue a regular spawn. + crate::spawn(move || tx.send(22).unwrap()); + + // The single-threaded fallback mode (for wasm etc.) won't + // get a chance to run the spawn if we never yield to it. + crate::registry::in_worker(move |_, _| { + crate::yield_local(); + }); + + // The spawn **must** have started by now, but we still might have to wait + // for it to finish if a different thread stole it first. + assert_eq!(22, rx.recv().unwrap()); +} diff --git a/compiler/rustc_thread_pool/src/tlv.rs b/compiler/rustc_thread_pool/src/tlv.rs new file mode 100644 index 00000000000..ce22f7aa0ce --- /dev/null +++ b/compiler/rustc_thread_pool/src/tlv.rs @@ -0,0 +1,31 @@ +//! Allows access to the Rayon's thread local value +//! which is preserved when moving jobs across threads + +use std::{cell::Cell, ptr}; + +thread_local!(pub static TLV: Cell<*const ()> = const { Cell::new(ptr::null()) }); + +#[derive(Copy, Clone)] +pub(crate) struct Tlv(pub(crate) *const ()); + +impl Tlv { + #[inline] + pub(crate) fn null() -> Self { + Self(ptr::null()) + } +} + +unsafe impl Sync for Tlv {} +unsafe impl Send for Tlv {} + +/// Sets the current thread-local value +#[inline] +pub(crate) fn set(value: Tlv) { + TLV.with(|tlv| tlv.set(value.0)); +} + +/// Returns the current thread-local value +#[inline] +pub(crate) fn get() -> Tlv { + TLV.with(|tlv| Tlv(tlv.get())) +} diff --git a/compiler/rustc_thread_pool/src/unwind.rs b/compiler/rustc_thread_pool/src/unwind.rs new file mode 100644 index 00000000000..9671fa57821 --- /dev/null +++ b/compiler/rustc_thread_pool/src/unwind.rs @@ -0,0 +1,31 @@ +//! Package up unwind recovery. Note that if you are in some sensitive +//! place, you can use the `AbortIfPanic` helper to protect against +//! accidental panics in the rayon code itself. + +use std::any::Any; +use std::panic::{self, AssertUnwindSafe}; +use std::thread; + +/// Executes `f` and captures any panic, translating that panic into a +/// `Err` result. The assumption is that any panic will be propagated +/// later with `resume_unwinding`, and hence `f` can be treated as +/// exception safe. +pub(super) fn halt_unwinding<F, R>(func: F) -> thread::Result<R> +where + F: FnOnce() -> R, +{ + panic::catch_unwind(AssertUnwindSafe(func)) +} + +pub(super) fn resume_unwinding(payload: Box<dyn Any + Send>) -> ! { + panic::resume_unwind(payload) +} + +pub(super) struct AbortIfPanic; + +impl Drop for AbortIfPanic { + fn drop(&mut self) { + eprintln!("Rayon: detected unexpected panic; aborting"); + ::std::process::abort(); + } +} diff --git a/compiler/rustc_thread_pool/src/worker_local.rs b/compiler/rustc_thread_pool/src/worker_local.rs new file mode 100644 index 00000000000..85d51687c19 --- /dev/null +++ b/compiler/rustc_thread_pool/src/worker_local.rs @@ -0,0 +1,78 @@ +use crate::registry::{Registry, WorkerThread}; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; + +#[repr(align(64))] +#[derive(Debug)] +struct CacheAligned<T>(T); + +/// Holds worker-locals values for each thread in a thread pool. +/// You can only access the worker local value through the Deref impl +/// on the thread pool it was constructed on. It will panic otherwise +pub struct WorkerLocal<T> { + locals: Vec<CacheAligned<T>>, + registry: Arc<Registry>, +} + +/// We prevent concurrent access to the underlying value in the +/// Deref impl, thus any values safe to send across threads can +/// be used with WorkerLocal. +unsafe impl<T: Send> Sync for WorkerLocal<T> {} + +impl<T> WorkerLocal<T> { + /// Creates a new worker local where the `initial` closure computes the + /// value this worker local should take for each thread in the thread pool. + #[inline] + pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> { + let registry = Registry::current(); + WorkerLocal { + locals: (0..registry.num_threads()) + .map(|i| CacheAligned(initial(i))) + .collect(), + registry, + } + } + + /// Returns the worker-local value for each thread + #[inline] + pub fn into_inner(self) -> Vec<T> { + self.locals.into_iter().map(|c| c.0).collect() + } + + fn current(&self) -> &T { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || &*(*worker_thread).registry as *const _ != &*self.registry as *const _ + { + panic!("WorkerLocal can only be used on the thread pool it was created on") + } + &self.locals[(*worker_thread).index].0 + } + } +} + +impl<T> WorkerLocal<Vec<T>> { + /// Joins the elements of all the worker locals into one Vec + pub fn join(self) -> Vec<T> { + self.into_inner().into_iter().flat_map(|v| v).collect() + } +} + +impl<T: fmt::Debug> fmt::Debug for WorkerLocal<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerLocal") + .field("registry", &self.registry.id()) + .finish() + } +} + +impl<T> Deref for WorkerLocal<T> { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &T { + self.current() + } +} diff --git a/compiler/rustc_thread_pool/tests/double_init_fail.rs b/compiler/rustc_thread_pool/tests/double_init_fail.rs new file mode 100644 index 00000000000..15915304ddc --- /dev/null +++ b/compiler/rustc_thread_pool/tests/double_init_fail.rs @@ -0,0 +1,15 @@ +use rayon_core::ThreadPoolBuilder; +use std::error::Error; + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn double_init_fail() { + let result1 = ThreadPoolBuilder::new().build_global(); + assert!(result1.is_ok()); + let err = ThreadPoolBuilder::new().build_global().unwrap_err(); + assert!(err.source().is_none()); + assert_eq!( + err.to_string(), + "The global thread pool has already been initialized.", + ); +} diff --git a/compiler/rustc_thread_pool/tests/init_zero_threads.rs b/compiler/rustc_thread_pool/tests/init_zero_threads.rs new file mode 100644 index 00000000000..3c1ad251c7e --- /dev/null +++ b/compiler/rustc_thread_pool/tests/init_zero_threads.rs @@ -0,0 +1,10 @@ +use rayon_core::ThreadPoolBuilder; + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn init_zero_threads() { + ThreadPoolBuilder::new() + .num_threads(0) + .build_global() + .unwrap(); +} diff --git a/compiler/rustc_thread_pool/tests/scope_join.rs b/compiler/rustc_thread_pool/tests/scope_join.rs new file mode 100644 index 00000000000..9d88133bc5b --- /dev/null +++ b/compiler/rustc_thread_pool/tests/scope_join.rs @@ -0,0 +1,45 @@ +/// Test that one can emulate join with `scope`: +fn pseudo_join<F, G>(f: F, g: G) +where + F: FnOnce() + Send, + G: FnOnce() + Send, +{ + rayon_core::scope(|s| { + s.spawn(|_| g()); + f(); + }); +} + +fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, hi) = v.split_at_mut(mid); + pseudo_join(|| quick_sort(lo), || quick_sort(hi)); +} + +fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn is_sorted<T: Send + Ord>(v: &[T]) -> bool { + (1..v.len()).all(|i| v[i - 1] <= v[i]) +} + +#[test] +fn scope_join() { + let mut v: Vec<i32> = (0..256).rev().collect(); + quick_sort(&mut v); + assert!(is_sorted(&v)); +} diff --git a/compiler/rustc_thread_pool/tests/scoped_threadpool.rs b/compiler/rustc_thread_pool/tests/scoped_threadpool.rs new file mode 100644 index 00000000000..932147179f5 --- /dev/null +++ b/compiler/rustc_thread_pool/tests/scoped_threadpool.rs @@ -0,0 +1,99 @@ +use crossbeam_utils::thread; +use rayon_core::ThreadPoolBuilder; + +#[derive(PartialEq, Eq, Debug)] +struct Local(i32); + +scoped_tls::scoped_thread_local!(static LOCAL: Local); + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn missing_scoped_tls() { + LOCAL.set(&Local(42), || { + let pool = ThreadPoolBuilder::new() + .build() + .expect("thread pool created"); + + // `LOCAL` is not set in the pool. + pool.install(|| { + assert!(!LOCAL.is_set()); + }); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn spawn_scoped_tls_threadpool() { + LOCAL.set(&Local(42), || { + LOCAL.with(|x| { + thread::scope(|scope| { + let pool = ThreadPoolBuilder::new() + .spawn_handler(move |thread| { + scope + .builder() + .spawn(move |_| { + // Borrow the same local value in the thread pool. + LOCAL.set(x, || thread.run()) + }) + .map(|_| ()) + }) + .build() + .expect("thread pool created"); + + // The pool matches our local value. + pool.install(|| { + assert!(LOCAL.is_set()); + LOCAL.with(|y| { + assert_eq!(x, y); + }); + }); + + // If we change our local value, the pool is not affected. + LOCAL.set(&Local(-1), || { + pool.install(|| { + assert!(LOCAL.is_set()); + LOCAL.with(|y| { + assert_eq!(x, y); + }); + }); + }); + }) + .expect("scope threads ok"); + // `thread::scope` will wait for the threads to exit before returning. + }); + }); +} + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn build_scoped_tls_threadpool() { + LOCAL.set(&Local(42), || { + LOCAL.with(|x| { + ThreadPoolBuilder::new() + .build_scoped( + move |thread| LOCAL.set(x, || thread.run()), + |pool| { + // The pool matches our local value. + pool.install(|| { + assert!(LOCAL.is_set()); + LOCAL.with(|y| { + assert_eq!(x, y); + }); + }); + + // If we change our local value, the pool is not affected. + LOCAL.set(&Local(-1), || { + pool.install(|| { + assert!(LOCAL.is_set()); + LOCAL.with(|y| { + assert_eq!(x, y); + }); + }); + }); + }, + ) + .expect("thread pool created"); + // Internally, `std::thread::scope` will wait for the threads to exit before returning. + }); + }); +} diff --git a/compiler/rustc_thread_pool/tests/simple_panic.rs b/compiler/rustc_thread_pool/tests/simple_panic.rs new file mode 100644 index 00000000000..2564482a47e --- /dev/null +++ b/compiler/rustc_thread_pool/tests/simple_panic.rs @@ -0,0 +1,7 @@ +use rayon_core::join; + +#[test] +#[should_panic(expected = "should panic")] +fn simple_panic() { + join(|| {}, || panic!("should panic")); +} diff --git a/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs new file mode 100644 index 00000000000..a6494069212 --- /dev/null +++ b/compiler/rustc_thread_pool/tests/stack_overflow_crash.rs @@ -0,0 +1,98 @@ +use rayon_core::ThreadPoolBuilder; + +use std::env; +use std::process::{Command, ExitStatus, Stdio}; + +#[cfg(target_os = "linux")] +use std::os::unix::process::ExitStatusExt; + +fn force_stack_overflow(depth: u32) { + let mut buffer = [0u8; 1024 * 1024]; + #[allow(clippy::incompatible_msrv)] + std::hint::black_box(&mut buffer); + if depth > 0 { + force_stack_overflow(depth - 1); + } +} + +#[cfg(unix)] +fn disable_core() { + unsafe { + libc::setrlimit( + libc::RLIMIT_CORE, + &libc::rlimit { + rlim_cur: 0, + rlim_max: 0, + }, + ); + } +} + +#[cfg(unix)] +fn overflow_code() -> Option<i32> { + None +} + +#[cfg(windows)] +fn overflow_code() -> Option<i32> { + use std::os::windows::process::ExitStatusExt; + + ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code() +} + +#[test] +#[cfg_attr(not(any(unix, windows)), ignore)] +fn stack_overflow_crash() { + // First check that the recursive call actually causes a stack overflow, + // and does not get optimized away. + let status = run_ignored("run_with_small_stack"); + assert!(!status.success()); + #[cfg(any(unix, windows))] + assert_eq!(status.code(), overflow_code()); + #[cfg(target_os = "linux")] + assert!(matches!( + status.signal(), + Some(libc::SIGABRT | libc::SIGSEGV) + )); + + // Now run with a larger stack and verify correct operation. + let status = run_ignored("run_with_large_stack"); + assert_eq!(status.code(), Some(0)); + #[cfg(target_os = "linux")] + assert_eq!(status.signal(), None); +} + +fn run_ignored(test: &str) -> ExitStatus { + Command::new(env::current_exe().unwrap()) + .arg("--ignored") + .arg("--exact") + .arg(test) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .unwrap() +} + +#[test] +#[ignore] +fn run_with_small_stack() { + run_with_stack(8); +} + +#[test] +#[ignore] +fn run_with_large_stack() { + run_with_stack(48); +} + +fn run_with_stack(stack_size_in_mb: usize) { + let pool = ThreadPoolBuilder::new() + .stack_size(stack_size_in_mb * 1024 * 1024) + .build() + .unwrap(); + pool.install(|| { + #[cfg(unix)] + disable_core(); + force_stack_overflow(32); + }); +} | 
