diff options
4 files changed, 188 insertions, 256 deletions
diff --git a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs index 764b583fb84..5356614dce5 100644 --- a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs +++ b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs @@ -2,12 +2,10 @@ //! sometimes is counter productive when, for example, the first goto definition //! request takes longer to compute. This module implements prepopulation of //! various caches, it's not really advanced at the moment. -mod topologic_sort; - -use std::time::Duration; +use std::panic::AssertUnwindSafe; use hir::{Symbol, db::DefDatabase}; -use itertools::Itertools; +use rustc_hash::FxHashMap; use salsa::{Cancelled, Database}; use crate::{ @@ -35,59 +33,114 @@ pub fn parallel_prime_caches( ) { let _p = tracing::info_span!("parallel_prime_caches").entered(); - let mut crates_to_prime = { - // FIXME: We already have the crate list topologically sorted (but without the things - // `TopologicalSortIter` gives us). Maybe there is a way to avoid using it and rip it out - // of the codebase? - let mut builder = topologic_sort::TopologicalSortIter::builder(); - - for &crate_id in db.all_crates().iter() { - builder.add(crate_id, crate_id.data(db).dependencies.iter().map(|d| d.crate_id)); - } - - builder.build() - }; - enum ParallelPrimeCacheWorkerProgress { - BeginCrate { crate_id: Crate, crate_name: Symbol }, - EndCrate { crate_id: Crate }, + BeginCrateDefMap { crate_id: Crate, crate_name: Symbol }, + EndCrateDefMap { crate_id: Crate }, + EndCrateImportMap, + EndModuleSymbols, Cancelled(Cancelled), } - // We split off def map computation from other work, - // as the def map is the relevant one. Once the defmaps are computed - // the project is ready to go, the other indices are just nice to have for some IDE features. - #[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)] - enum PrimingPhase { - DefMap, - ImportMap, - CrateSymbols, - } + // The setup here is a bit complicated. We try to make best use of compute resources. + // The idea is that if we have a def map available to compute, we should do that first. + // This is because def map is a dependency of both import map and symbols. So if we have + // e.g. a def map and a symbols, if we compute the def map we can, after it completes, + // compute the def maps of dependencies, the existing symbols and the symbols of the + // new crate, all in parallel. But if we compute the symbols, after that we will only + // have the def map to compute, and the rest of the CPU cores will rest, which is not + // good. + // However, it's better to compute symbols/import map than to compute a def map that + // isn't ready yet, because one of its dependencies hasn't yet completed its def map. + // Such def map will just block on the dependency, which is just wasted time. So better + // to compute the symbols/import map of an already computed def map in that time. + + let (reverse_deps, mut to_be_done_deps) = { + let all_crates = db.all_crates(); + let to_be_done_deps = all_crates + .iter() + .map(|&krate| (krate, krate.data(db).dependencies.len() as u32)) + .collect::<FxHashMap<_, _>>(); + let mut reverse_deps = + all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>(); + for &krate in &*all_crates { + for dep in &krate.data(db).dependencies { + reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate); + } + } + (reverse_deps, to_be_done_deps) + }; - let (work_sender, progress_receiver) = { + let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = { let (progress_sender, progress_receiver) = crossbeam_channel::unbounded(); - let (work_sender, work_receiver) = crossbeam_channel::unbounded(); - let prime_caches_worker = move |db: RootDatabase| { - while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() { - progress_sender - .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?; - - let cancelled = Cancelled::catch(|| match kind { - PrimingPhase::DefMap => _ = hir::crate_def_map(&db, crate_id), - PrimingPhase::ImportMap => _ = db.import_map(crate_id), - PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()), - }); + let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded(); + let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded(); + let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded(); + let prime_caches_worker = + move |db: RootDatabase| { + let handle_def_map = |crate_id, crate_name| { + progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { + crate_id, + crate_name, + })?; - match cancelled { - Ok(()) => progress_sender - .send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?, - Err(cancelled) => progress_sender - .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?, - } - } + let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id)); - Ok::<_, crossbeam_channel::SendError<_>>(()) - }; + match cancelled { + Ok(()) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?, + Err(cancelled) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?, + } + + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; + let handle_import_map = |crate_id| { + let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id)); + + match cancelled { + Ok(()) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?, + Err(cancelled) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?, + } + + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; + let handle_symbols = |module| { + let cancelled = + Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module))); + + match cancelled { + Ok(()) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?, + Err(cancelled) => progress_sender + .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?, + } + + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; + + loop { + db.unwind_if_revision_cancelled(); + + // Biased because we want to prefer def maps. + crossbeam_channel::select_biased! { + recv(def_map_work_receiver) -> work => { + let Ok((crate_id, crate_name)) = work else { break }; + handle_def_map(crate_id, crate_name)?; + } + recv(import_map_work_receiver) -> work => { + let Ok(crate_id) = work else { break }; + handle_import_map(crate_id)?; + } + recv(symbols_work_receiver) -> work => { + let Ok(module) = work else { break }; + handle_symbols(module)?; + } + } + } + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; for id in 0..num_worker_threads { stdx::thread::Builder::new( @@ -103,138 +156,121 @@ pub fn parallel_prime_caches( .expect("failed to spawn thread"); } - (work_sender, progress_receiver) + (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) }; - let crates_total = crates_to_prime.pending(); - let mut crates_done = 0; + let crate_def_maps_total = db.all_crates().len(); + let mut crate_def_maps_done = 0; + let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize); + let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize); // an index map is used to preserve ordering so we can sort the progress report in order of // "longest crate to index" first let mut crates_currently_indexing = FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default()); - let mut additional_phases = vec![]; - - while crates_done < crates_total { - db.unwind_if_revision_cancelled(); - - for krate in &mut crates_to_prime { - let name = krate.extra_data(db).display_name.as_deref().cloned().unwrap_or_else(|| { - Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize) - }); - let origin = &krate.data(db).origin; - if origin.is_lang() { - additional_phases.push((krate, name.clone(), PrimingPhase::ImportMap)); - } else if origin.is_local() { - // Compute the symbol search index. - // This primes the cache for `ide_db::symbol_index::world_symbols()`. - // - // We do this for workspace crates only (members of local_roots), because doing it - // for all dependencies could be *very* unnecessarily slow in a large project. - // - // FIXME: We should do it unconditionally if the configuration is set to default to - // searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we - // would need to pipe that configuration information down here. - additional_phases.push((krate, name.clone(), PrimingPhase::CrateSymbols)); - } - - work_sender.send((krate, name, PrimingPhase::DefMap)).ok(); + for (&krate, &to_be_done_deps) in &to_be_done_deps { + if to_be_done_deps != 0 { + continue; } - // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision - // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or - // if this thread exits, and closes the work channel. - let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) { - Ok(p) => p, - Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - continue; - } - Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { - // all our workers have exited, mark us as finished and exit - cb(ParallelPrimeCachesProgress { - crates_currently_indexing: vec![], - crates_done, - crates_total: crates_done, - work_type: "Indexing", - }); - return; - } - }; - match worker_progress { - ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { - crates_currently_indexing.insert(crate_id, crate_name); - } - ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => { - crates_currently_indexing.swap_remove(&crate_id); - crates_to_prime.mark_done(crate_id); - crates_done += 1; - } - ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => { - // Cancelled::throw should probably be public - std::panic::resume_unwind(Box::new(cancelled)); - } - }; + let name = crate_name(db, krate); + def_map_work_sender.send((krate, name)).ok(); + } + + while crate_def_maps_done < crate_def_maps_total + || crate_import_maps_done < crate_import_maps_total + || module_symbols_done < module_symbols_total + { + db.unwind_if_revision_cancelled(); let progress = ParallelPrimeCachesProgress { crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), - crates_done, - crates_total, + crates_done: crate_def_maps_done, + crates_total: crate_def_maps_total, work_type: "Indexing", }; cb(progress); - } - - let mut crates_done = 0; - let crates_total = additional_phases.len(); - for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) { - work_sender.send(w).ok(); - } - - while crates_done < crates_total { - db.unwind_if_revision_cancelled(); - // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision - // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or - // if this thread exits, and closes the work channel. - let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) { + // Biased to prefer progress updates (and because it's faster). + let progress = match progress_receiver.recv() { Ok(p) => p, - Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - continue; - } - Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { + Err(crossbeam_channel::RecvError) => { // all our workers have exited, mark us as finished and exit cb(ParallelPrimeCachesProgress { crates_currently_indexing: vec![], - crates_done, - crates_total: crates_done, - work_type: "Populating symbols", + crates_done: crate_def_maps_done, + crates_total: crate_def_maps_done, + work_type: "Done", }); return; } }; - match worker_progress { - ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { + + match progress { + ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => { crates_currently_indexing.insert(crate_id, crate_name); } - ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => { + ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => { crates_currently_indexing.swap_remove(&crate_id); - crates_done += 1; + crate_def_maps_done += 1; + + // Fire ready dependencies. + for &dep in &reverse_deps[&crate_id] { + let to_be_done = to_be_done_deps.get_mut(&dep).unwrap(); + *to_be_done -= 1; + if *to_be_done == 0 { + let dep_name = crate_name(db, dep); + def_map_work_sender.send((dep, dep_name)).ok(); + } + } + + if crate_def_maps_done == crate_def_maps_total { + cb(ParallelPrimeCachesProgress { + crates_currently_indexing: vec![], + crates_done: crate_def_maps_done, + crates_total: crate_def_maps_done, + work_type: "Collecting Symbols", + }); + } + + let origin = &crate_id.data(db).origin; + if origin.is_lang() { + crate_import_maps_total += 1; + import_map_work_sender.send(crate_id).ok(); + } else if origin.is_local() { + // Compute the symbol search index. + // This primes the cache for `ide_db::symbol_index::world_symbols()`. + // + // We do this for workspace crates only (members of local_roots), because doing it + // for all dependencies could be *very* unnecessarily slow in a large project. + // + // FIXME: We should do it unconditionally if the configuration is set to default to + // searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we + // would need to pipe that configuration information down here. + let modules = hir::Crate::from(crate_id).modules(db); + module_symbols_total += modules.len(); + for module in modules { + symbols_work_sender.send(module).ok(); + } + } } + ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1, + ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1, ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => { // Cancelled::throw should probably be public std::panic::resume_unwind(Box::new(cancelled)); } - }; - - let progress = ParallelPrimeCachesProgress { - crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), - crates_done, - crates_total, - work_type: "Populating symbols", - }; - - cb(progress); + } } } + +fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol { + krate + .extra_data(db) + .display_name + .as_deref() + .cloned() + .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize)) +} diff --git a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs deleted file mode 100644 index c8a03863103..00000000000 --- a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! helper data structure to schedule work for parallel prime caches. -use std::{collections::VecDeque, hash::Hash}; - -use crate::FxHashMap; - -pub(crate) struct TopologicSortIterBuilder<T> { - nodes: FxHashMap<T, Entry<T>>, -} - -// this implementation has different bounds on T than would be implied by #[derive(Default)] -impl<T> Default for TopologicSortIterBuilder<T> -where - T: Copy + Eq + PartialEq + Hash, -{ - fn default() -> Self { - Self { nodes: Default::default() } - } -} - -impl<T> TopologicSortIterBuilder<T> -where - T: Copy + Eq + PartialEq + Hash, -{ - fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> { - self.nodes.entry(item).or_default() - } - - pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) { - let mut num_predecessors = 0; - - for predecessor in predecessors.into_iter() { - self.get_or_create_entry(predecessor).successors.push(item); - num_predecessors += 1; - } - - let entry = self.get_or_create_entry(item); - entry.num_predecessors += num_predecessors; - } - - pub(crate) fn build(self) -> TopologicalSortIter<T> { - let ready = self - .nodes - .iter() - .filter_map( - |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None }, - ) - .collect(); - - TopologicalSortIter { nodes: self.nodes, ready } - } -} - -pub(crate) struct TopologicalSortIter<T> { - ready: VecDeque<T>, - nodes: FxHashMap<T, Entry<T>>, -} - -impl<T> TopologicalSortIter<T> -where - T: Copy + Eq + PartialEq + Hash, -{ - pub(crate) fn builder() -> TopologicSortIterBuilder<T> { - TopologicSortIterBuilder::default() - } - - pub(crate) fn pending(&self) -> usize { - self.nodes.len() - } - - pub(crate) fn mark_done(&mut self, item: T) { - let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done"); - - for successor in entry.successors { - let succ_entry = self - .nodes - .get_mut(&successor) - .expect("invariant: unknown successor referenced by entry"); - - succ_entry.num_predecessors -= 1; - if succ_entry.num_predecessors == 0 { - self.ready.push_back(successor); - } - } - } -} - -impl<T> Iterator for TopologicalSortIter<T> { - type Item = T; - - fn next(&mut self) -> Option<Self::Item> { - self.ready.pop_front() - } -} - -struct Entry<T> { - successors: Vec<T>, - num_predecessors: usize, -} - -impl<T> Default for Entry<T> { - fn default() -> Self { - Self { successors: Default::default(), num_predecessors: 0 } - } -} diff --git a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs index 57f95d114d9..16f351272b6 100644 --- a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs +++ b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs @@ -150,8 +150,8 @@ xflags::xflags! { optional --disable-proc-macros /// Run the proc-macro-srv binary at the specified path. optional --proc-macro-srv path: PathBuf - /// Run cache priming in parallel. - optional --parallel + /// The number of threads to use. Defaults to the number of physical cores. + optional --num-threads num_threads: usize } cmd ssr { @@ -299,7 +299,7 @@ pub struct PrimeCaches { pub disable_build_scripts: bool, pub disable_proc_macros: bool, pub proc_macro_srv: Option<PathBuf>, - pub parallel: bool, + pub num_threads: Option<usize>, } #[derive(Debug)] diff --git a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs index 46fb701ab42..467d8a53884 100644 --- a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs +++ b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs @@ -52,7 +52,7 @@ impl flags::PrimeCaches { elapsed.memory.allocated.megabytes() as u64 ); - let threads = if self.parallel { num_cpus::get() } else { 1 }; + let threads = self.num_threads.unwrap_or_else(num_cpus::get_physical); ide_db::prime_caches::parallel_prime_caches(&db, threads, &|_| ()); let elapsed = stop_watch.elapsed(); |
