about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs332
-rw-r--r--src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs104
-rw-r--r--src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs6
-rw-r--r--src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs2
4 files changed, 188 insertions, 256 deletions
diff --git a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs
index 764b583fb84..5356614dce5 100644
--- a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs
+++ b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches.rs
@@ -2,12 +2,10 @@
 //! sometimes is counter productive when, for example, the first goto definition
 //! request takes longer to compute. This module implements prepopulation of
 //! various caches, it's not really advanced at the moment.
-mod topologic_sort;
-
-use std::time::Duration;
+use std::panic::AssertUnwindSafe;
 
 use hir::{Symbol, db::DefDatabase};
-use itertools::Itertools;
+use rustc_hash::FxHashMap;
 use salsa::{Cancelled, Database};
 
 use crate::{
@@ -35,59 +33,114 @@ pub fn parallel_prime_caches(
 ) {
     let _p = tracing::info_span!("parallel_prime_caches").entered();
 
-    let mut crates_to_prime = {
-        // FIXME: We already have the crate list topologically sorted (but without the things
-        // `TopologicalSortIter` gives us). Maybe there is a way to avoid using it and rip it out
-        // of the codebase?
-        let mut builder = topologic_sort::TopologicalSortIter::builder();
-
-        for &crate_id in db.all_crates().iter() {
-            builder.add(crate_id, crate_id.data(db).dependencies.iter().map(|d| d.crate_id));
-        }
-
-        builder.build()
-    };
-
     enum ParallelPrimeCacheWorkerProgress {
-        BeginCrate { crate_id: Crate, crate_name: Symbol },
-        EndCrate { crate_id: Crate },
+        BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
+        EndCrateDefMap { crate_id: Crate },
+        EndCrateImportMap,
+        EndModuleSymbols,
         Cancelled(Cancelled),
     }
 
-    // We split off def map computation from other work,
-    // as the def map is the relevant one. Once the defmaps are computed
-    // the project is ready to go, the other indices are just nice to have for some IDE features.
-    #[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)]
-    enum PrimingPhase {
-        DefMap,
-        ImportMap,
-        CrateSymbols,
-    }
+    // The setup here is a bit complicated. We try to make best use of compute resources.
+    // The idea is that if we have a def map available to compute, we should do that first.
+    // This is because def map is a dependency of both import map and symbols. So if we have
+    // e.g. a def map and a symbols, if we compute the def map we can, after it completes,
+    // compute the def maps of dependencies, the existing symbols and the symbols of the
+    // new crate, all in parallel. But if we compute the symbols, after that we will only
+    // have the def map to compute, and the rest of the CPU cores will rest, which is not
+    // good.
+    // However, it's better to compute symbols/import map than to compute a def map that
+    // isn't ready yet, because one of its dependencies hasn't yet completed its def map.
+    // Such def map will just block on the dependency, which is just wasted time. So better
+    // to compute the symbols/import map of an already computed def map in that time.
+
+    let (reverse_deps, mut to_be_done_deps) = {
+        let all_crates = db.all_crates();
+        let to_be_done_deps = all_crates
+            .iter()
+            .map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
+            .collect::<FxHashMap<_, _>>();
+        let mut reverse_deps =
+            all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
+        for &krate in &*all_crates {
+            for dep in &krate.data(db).dependencies {
+                reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
+            }
+        }
+        (reverse_deps, to_be_done_deps)
+    };
 
-    let (work_sender, progress_receiver) = {
+    let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = {
         let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
-        let (work_sender, work_receiver) = crossbeam_channel::unbounded();
-        let prime_caches_worker = move |db: RootDatabase| {
-            while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() {
-                progress_sender
-                    .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
-
-                let cancelled = Cancelled::catch(|| match kind {
-                    PrimingPhase::DefMap => _ = hir::crate_def_map(&db, crate_id),
-                    PrimingPhase::ImportMap => _ = db.import_map(crate_id),
-                    PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()),
-                });
+        let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
+        let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
+        let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
+        let prime_caches_worker =
+            move |db: RootDatabase| {
+                let handle_def_map = |crate_id, crate_name| {
+                    progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
+                        crate_id,
+                        crate_name,
+                    })?;
 
-                match cancelled {
-                    Ok(()) => progress_sender
-                        .send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?,
-                    Err(cancelled) => progress_sender
-                        .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
-                }
-            }
+                    let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id));
 
-            Ok::<_, crossbeam_channel::SendError<_>>(())
-        };
+                    match cancelled {
+                        Ok(()) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
+                        Err(cancelled) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
+                    }
+
+                    Ok::<_, crossbeam_channel::SendError<_>>(())
+                };
+                let handle_import_map = |crate_id| {
+                    let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
+
+                    match cancelled {
+                        Ok(()) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?,
+                        Err(cancelled) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
+                    }
+
+                    Ok::<_, crossbeam_channel::SendError<_>>(())
+                };
+                let handle_symbols = |module| {
+                    let cancelled =
+                        Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module)));
+
+                    match cancelled {
+                        Ok(()) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?,
+                        Err(cancelled) => progress_sender
+                            .send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
+                    }
+
+                    Ok::<_, crossbeam_channel::SendError<_>>(())
+                };
+
+                loop {
+                    db.unwind_if_revision_cancelled();
+
+                    // Biased because we want to prefer def maps.
+                    crossbeam_channel::select_biased! {
+                        recv(def_map_work_receiver) -> work => {
+                            let Ok((crate_id, crate_name)) = work else { break };
+                            handle_def_map(crate_id, crate_name)?;
+                        }
+                        recv(import_map_work_receiver) -> work => {
+                            let Ok(crate_id) = work else { break };
+                            handle_import_map(crate_id)?;
+                        }
+                        recv(symbols_work_receiver) -> work => {
+                            let Ok(module) = work else { break };
+                            handle_symbols(module)?;
+                        }
+                    }
+                }
+                Ok::<_, crossbeam_channel::SendError<_>>(())
+            };
 
         for id in 0..num_worker_threads {
             stdx::thread::Builder::new(
@@ -103,138 +156,121 @@ pub fn parallel_prime_caches(
             .expect("failed to spawn thread");
         }
 
-        (work_sender, progress_receiver)
+        (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver)
     };
 
-    let crates_total = crates_to_prime.pending();
-    let mut crates_done = 0;
+    let crate_def_maps_total = db.all_crates().len();
+    let mut crate_def_maps_done = 0;
+    let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
+    let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
 
     // an index map is used to preserve ordering so we can sort the progress report in order of
     // "longest crate to index" first
     let mut crates_currently_indexing =
         FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
 
-    let mut additional_phases = vec![];
-
-    while crates_done < crates_total {
-        db.unwind_if_revision_cancelled();
-
-        for krate in &mut crates_to_prime {
-            let name = krate.extra_data(db).display_name.as_deref().cloned().unwrap_or_else(|| {
-                Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize)
-            });
-            let origin = &krate.data(db).origin;
-            if origin.is_lang() {
-                additional_phases.push((krate, name.clone(), PrimingPhase::ImportMap));
-            } else if origin.is_local() {
-                // Compute the symbol search index.
-                // This primes the cache for `ide_db::symbol_index::world_symbols()`.
-                //
-                // We do this for workspace crates only (members of local_roots), because doing it
-                // for all dependencies could be *very* unnecessarily slow in a large project.
-                //
-                // FIXME: We should do it unconditionally if the configuration is set to default to
-                // searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
-                // would need to pipe that configuration information down here.
-                additional_phases.push((krate, name.clone(), PrimingPhase::CrateSymbols));
-            }
-
-            work_sender.send((krate, name, PrimingPhase::DefMap)).ok();
+    for (&krate, &to_be_done_deps) in &to_be_done_deps {
+        if to_be_done_deps != 0 {
+            continue;
         }
 
-        // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
-        // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
-        // if this thread exits, and closes the work channel.
-        let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
-            Ok(p) => p,
-            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
-                continue;
-            }
-            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
-                // all our workers have exited, mark us as finished and exit
-                cb(ParallelPrimeCachesProgress {
-                    crates_currently_indexing: vec![],
-                    crates_done,
-                    crates_total: crates_done,
-                    work_type: "Indexing",
-                });
-                return;
-            }
-        };
-        match worker_progress {
-            ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
-                crates_currently_indexing.insert(crate_id, crate_name);
-            }
-            ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
-                crates_currently_indexing.swap_remove(&crate_id);
-                crates_to_prime.mark_done(crate_id);
-                crates_done += 1;
-            }
-            ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
-                // Cancelled::throw should probably be public
-                std::panic::resume_unwind(Box::new(cancelled));
-            }
-        };
+        let name = crate_name(db, krate);
+        def_map_work_sender.send((krate, name)).ok();
+    }
+
+    while crate_def_maps_done < crate_def_maps_total
+        || crate_import_maps_done < crate_import_maps_total
+        || module_symbols_done < module_symbols_total
+    {
+        db.unwind_if_revision_cancelled();
 
         let progress = ParallelPrimeCachesProgress {
             crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
-            crates_done,
-            crates_total,
+            crates_done: crate_def_maps_done,
+            crates_total: crate_def_maps_total,
             work_type: "Indexing",
         };
 
         cb(progress);
-    }
-
-    let mut crates_done = 0;
-    let crates_total = additional_phases.len();
-    for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) {
-        work_sender.send(w).ok();
-    }
-
-    while crates_done < crates_total {
-        db.unwind_if_revision_cancelled();
 
-        // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
-        // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
-        // if this thread exits, and closes the work channel.
-        let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
+        // Biased to prefer progress updates (and because it's faster).
+        let progress = match progress_receiver.recv() {
             Ok(p) => p,
-            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
-                continue;
-            }
-            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
+            Err(crossbeam_channel::RecvError) => {
                 // all our workers have exited, mark us as finished and exit
                 cb(ParallelPrimeCachesProgress {
                     crates_currently_indexing: vec![],
-                    crates_done,
-                    crates_total: crates_done,
-                    work_type: "Populating symbols",
+                    crates_done: crate_def_maps_done,
+                    crates_total: crate_def_maps_done,
+                    work_type: "Done",
                 });
                 return;
             }
         };
-        match worker_progress {
-            ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
+
+        match progress {
+            ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
                 crates_currently_indexing.insert(crate_id, crate_name);
             }
-            ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
+            ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
                 crates_currently_indexing.swap_remove(&crate_id);
-                crates_done += 1;
+                crate_def_maps_done += 1;
+
+                // Fire ready dependencies.
+                for &dep in &reverse_deps[&crate_id] {
+                    let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
+                    *to_be_done -= 1;
+                    if *to_be_done == 0 {
+                        let dep_name = crate_name(db, dep);
+                        def_map_work_sender.send((dep, dep_name)).ok();
+                    }
+                }
+
+                if crate_def_maps_done == crate_def_maps_total {
+                    cb(ParallelPrimeCachesProgress {
+                        crates_currently_indexing: vec![],
+                        crates_done: crate_def_maps_done,
+                        crates_total: crate_def_maps_done,
+                        work_type: "Collecting Symbols",
+                    });
+                }
+
+                let origin = &crate_id.data(db).origin;
+                if origin.is_lang() {
+                    crate_import_maps_total += 1;
+                    import_map_work_sender.send(crate_id).ok();
+                } else if origin.is_local() {
+                    // Compute the symbol search index.
+                    // This primes the cache for `ide_db::symbol_index::world_symbols()`.
+                    //
+                    // We do this for workspace crates only (members of local_roots), because doing it
+                    // for all dependencies could be *very* unnecessarily slow in a large project.
+                    //
+                    // FIXME: We should do it unconditionally if the configuration is set to default to
+                    // searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
+                    // would need to pipe that configuration information down here.
+                    let modules = hir::Crate::from(crate_id).modules(db);
+                    module_symbols_total += modules.len();
+                    for module in modules {
+                        symbols_work_sender.send(module).ok();
+                    }
+                }
             }
+            ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
+            ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
             ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
                 // Cancelled::throw should probably be public
                 std::panic::resume_unwind(Box::new(cancelled));
             }
-        };
-
-        let progress = ParallelPrimeCachesProgress {
-            crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
-            crates_done,
-            crates_total,
-            work_type: "Populating symbols",
-        };
-
-        cb(progress);
+        }
     }
 }
+
+fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
+    krate
+        .extra_data(db)
+        .display_name
+        .as_deref()
+        .cloned()
+        .unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize))
+}
diff --git a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs b/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs
deleted file mode 100644
index c8a03863103..00000000000
--- a/src/tools/rust-analyzer/crates/ide-db/src/prime_caches/topologic_sort.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-//! helper data structure to schedule work for parallel prime caches.
-use std::{collections::VecDeque, hash::Hash};
-
-use crate::FxHashMap;
-
-pub(crate) struct TopologicSortIterBuilder<T> {
-    nodes: FxHashMap<T, Entry<T>>,
-}
-
-// this implementation has different bounds on T than would be implied by #[derive(Default)]
-impl<T> Default for TopologicSortIterBuilder<T>
-where
-    T: Copy + Eq + PartialEq + Hash,
-{
-    fn default() -> Self {
-        Self { nodes: Default::default() }
-    }
-}
-
-impl<T> TopologicSortIterBuilder<T>
-where
-    T: Copy + Eq + PartialEq + Hash,
-{
-    fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
-        self.nodes.entry(item).or_default()
-    }
-
-    pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
-        let mut num_predecessors = 0;
-
-        for predecessor in predecessors.into_iter() {
-            self.get_or_create_entry(predecessor).successors.push(item);
-            num_predecessors += 1;
-        }
-
-        let entry = self.get_or_create_entry(item);
-        entry.num_predecessors += num_predecessors;
-    }
-
-    pub(crate) fn build(self) -> TopologicalSortIter<T> {
-        let ready = self
-            .nodes
-            .iter()
-            .filter_map(
-                |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
-            )
-            .collect();
-
-        TopologicalSortIter { nodes: self.nodes, ready }
-    }
-}
-
-pub(crate) struct TopologicalSortIter<T> {
-    ready: VecDeque<T>,
-    nodes: FxHashMap<T, Entry<T>>,
-}
-
-impl<T> TopologicalSortIter<T>
-where
-    T: Copy + Eq + PartialEq + Hash,
-{
-    pub(crate) fn builder() -> TopologicSortIterBuilder<T> {
-        TopologicSortIterBuilder::default()
-    }
-
-    pub(crate) fn pending(&self) -> usize {
-        self.nodes.len()
-    }
-
-    pub(crate) fn mark_done(&mut self, item: T) {
-        let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
-
-        for successor in entry.successors {
-            let succ_entry = self
-                .nodes
-                .get_mut(&successor)
-                .expect("invariant: unknown successor referenced by entry");
-
-            succ_entry.num_predecessors -= 1;
-            if succ_entry.num_predecessors == 0 {
-                self.ready.push_back(successor);
-            }
-        }
-    }
-}
-
-impl<T> Iterator for TopologicalSortIter<T> {
-    type Item = T;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.ready.pop_front()
-    }
-}
-
-struct Entry<T> {
-    successors: Vec<T>,
-    num_predecessors: usize,
-}
-
-impl<T> Default for Entry<T> {
-    fn default() -> Self {
-        Self { successors: Default::default(), num_predecessors: 0 }
-    }
-}
diff --git a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs
index 57f95d114d9..16f351272b6 100644
--- a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs
+++ b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/flags.rs
@@ -150,8 +150,8 @@ xflags::xflags! {
             optional --disable-proc-macros
             /// Run the proc-macro-srv binary at the specified path.
             optional --proc-macro-srv path: PathBuf
-            /// Run cache priming in parallel.
-            optional --parallel
+            /// The number of threads to use. Defaults to the number of physical cores.
+            optional --num-threads num_threads: usize
         }
 
         cmd ssr {
@@ -299,7 +299,7 @@ pub struct PrimeCaches {
     pub disable_build_scripts: bool,
     pub disable_proc_macros: bool,
     pub proc_macro_srv: Option<PathBuf>,
-    pub parallel: bool,
+    pub num_threads: Option<usize>,
 }
 
 #[derive(Debug)]
diff --git a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs
index 46fb701ab42..467d8a53884 100644
--- a/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs
+++ b/src/tools/rust-analyzer/crates/rust-analyzer/src/cli/prime_caches.rs
@@ -52,7 +52,7 @@ impl flags::PrimeCaches {
             elapsed.memory.allocated.megabytes() as u64
         );
 
-        let threads = if self.parallel { num_cpus::get() } else { 1 };
+        let threads = self.num_threads.unwrap_or_else(num_cpus::get_physical);
         ide_db::prime_caches::parallel_prime_caches(&db, threads, &|_| ());
 
         let elapsed = stop_watch.elapsed();