about summary refs log tree commit diff
diff options
context:
space:
mode:
authorJake Heinz <jh@discordapp.com>2022-01-14 09:11:47 +0000
committerJake Heinz <jh@discordapp.com>2022-01-14 09:16:35 +0000
commit3168148cc626c26658d49f7304ff7f07194baec2 (patch)
tree84eb2db02eaa88fb6353cdf9131efcd675a83dd4
parentb4c31481a554d0132003228ba319bd9476fe85ae (diff)
downloadrust-3168148cc626c26658d49f7304ff7f07194baec2.tar.gz
rust-3168148cc626c26658d49f7304ff7f07194baec2.zip
ide: parallel prime caches
-rw-r--r--Cargo.lock2
-rw-r--r--crates/ide/Cargo.toml2
-rw-r--r--crates/ide/src/lib.rs9
-rw-r--r--crates/ide/src/prime_caches.rs157
-rw-r--r--crates/ide/src/prime_caches/topologic_sort.rs101
-rw-r--r--crates/rust-analyzer/src/main_loop.rs26
6 files changed, 273 insertions, 24 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1dcbc14f014..915da5d2837 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -577,6 +577,8 @@ version = "0.0.0"
 dependencies = [
  "cfg",
  "cov-mark",
+ "crossbeam-channel",
+ "crossbeam-utils",
  "dot",
  "either",
  "expect-test",
diff --git a/crates/ide/Cargo.toml b/crates/ide/Cargo.toml
index c5e79838fc0..8cddc1e8ec2 100644
--- a/crates/ide/Cargo.toml
+++ b/crates/ide/Cargo.toml
@@ -11,6 +11,8 @@ doctest = false
 
 [dependencies]
 cov-mark = "2.0.0-pre.1"
+crossbeam-channel = "0.5.0"
+crossbeam-utils = "0.8.5"
 either = "1.5.3"
 itertools = "0.10.0"
 tracing = "0.1"
diff --git a/crates/ide/src/lib.rs b/crates/ide/src/lib.rs
index db7b80f71bf..5c872fe9c71 100644
--- a/crates/ide/src/lib.rs
+++ b/crates/ide/src/lib.rs
@@ -87,7 +87,7 @@ pub use crate::{
     moniker::{MonikerKind, MonikerResult, PackageInformation},
     move_item::Direction,
     navigation_target::NavigationTarget,
-    prime_caches::PrimeCachesProgress,
+    prime_caches::{ParallelPrimeCachesProgress, PrimeCachesProgress},
     references::ReferenceSearchResult,
     rename::RenameError,
     runnables::{Runnable, RunnableKind, TestId},
@@ -251,6 +251,13 @@ impl Analysis {
         self.with_db(move |db| prime_caches::prime_caches(db, &cb))
     }
 
+    pub fn parallel_prime_caches<F>(&self, num_worker_threads: u8, cb: F) -> Cancellable<()>
+    where
+        F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
+    {
+        self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb))
+    }
+
     /// Gets the text of the source file.
     pub fn file_text(&self, file_id: FileId) -> Cancellable<Arc<String>> {
         self.with_db(|db| db.file_text(file_id))
diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs
index 5eba1d1e276..b873329e861 100644
--- a/crates/ide/src/prime_caches.rs
+++ b/crates/ide/src/prime_caches.rs
@@ -2,10 +2,14 @@
 //! sometimes is counter productive when, for example, the first goto definition
 //! request takes longer to compute. This modules implemented prepopulation of
 //! various caches, it's not really advanced at the moment.
+mod topologic_sort;
 
 use hir::db::DefDatabase;
-use ide_db::base_db::{SourceDatabase, SourceDatabaseExt};
-use rustc_hash::FxHashSet;
+use ide_db::base_db::{
+    salsa::{Database, ParallelDatabase, Snapshot},
+    Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt,
+};
+use rustc_hash::{FxHashMap, FxHashSet};
 
 use crate::RootDatabase;
 
@@ -20,11 +24,144 @@ pub struct PrimeCachesProgress {
 pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) {
     let _p = profile::span("prime_caches");
     let graph = db.crate_graph();
+    let to_prime = compute_crates_to_prime(db, &graph);
+
+    let n_total = to_prime.len();
+    for (n_done, &crate_id) in to_prime.iter().enumerate() {
+        let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
+
+        cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
+        // This also computes the DefMap
+        db.import_map(crate_id);
+    }
+}
+
+/// We're indexing many crates.
+#[derive(Debug)]
+pub struct ParallelPrimeCachesProgress {
+    /// the crates that we are currently priming.
+    pub crates_currently_indexing: Vec<String>,
+    /// the total number of crates we want to prime.
+    pub crates_total: usize,
+    /// the total number of crates that have finished priming
+    pub crates_done: usize,
+}
+
+pub(crate) fn parallel_prime_caches<F>(db: &RootDatabase, num_worker_threads: u8, cb: &F)
+where
+    F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
+{
+    let _p = profile::span("prime_caches");
+
+    let graph = db.crate_graph();
+    let mut crates_to_prime = {
+        let crate_ids = compute_crates_to_prime(db, &graph);
+
+        let mut builder = topologic_sort::TopologicalSortIter::builder();
+
+        for &crate_id in &crate_ids {
+            let crate_data = &graph[crate_id];
+            let dependencies = crate_data
+                .dependencies
+                .iter()
+                .map(|d| d.crate_id)
+                .filter(|i| crate_ids.contains(i));
+
+            builder.add(crate_id, dependencies);
+        }
+
+        builder.build()
+    };
+
+    crossbeam_utils::thread::scope(move |s| {
+        let (work_sender, work_receiver) = crossbeam_channel::unbounded();
+        let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
+
+        enum ParallelPrimeCacheWorkerProgress {
+            BeginCrate { crate_id: CrateId, crate_name: String },
+            EndCrate { crate_id: CrateId, cancelled: bool },
+        }
+
+        let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
+            while let Ok((crate_id, crate_name)) = work_receiver.recv() {
+                progress_sender
+                    .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
+
+                let cancelled = Cancelled::catch(|| {
+                    // This also computes the DefMap
+                    db.import_map(crate_id);
+                })
+                .is_err();
+
+                progress_sender
+                    .send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled })?;
+
+                if cancelled {
+                    break;
+                }
+            }
+
+            Ok::<_, crossbeam_channel::SendError<_>>(())
+        };
+
+        for _ in 0..num_worker_threads {
+            let worker = prime_caches_worker.clone();
+            let db = db.snapshot();
+            s.spawn(move |_| worker(db));
+        }
+
+        let crates_total = crates_to_prime.len();
+        let mut crates_done = 0;
+
+        let mut is_cancelled = false;
+        let mut crates_currently_indexing =
+            FxHashMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
+
+        while !crates_to_prime.is_empty() && !is_cancelled {
+            for crate_id in &mut crates_to_prime {
+                work_sender
+                    .send((
+                        crate_id,
+                        graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
+                    ))
+                    .ok();
+            }
+
+            let worker_progress = match progress_receiver.recv() {
+                Ok(p) => p,
+                Err(_) => break,
+            };
+            match worker_progress {
+                ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
+                    crates_currently_indexing.insert(crate_id, crate_name);
+                }
+                ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled } => {
+                    crates_currently_indexing.remove(&crate_id);
+                    crates_to_prime.mark_done(crate_id);
+                    crates_done += 1;
+                    is_cancelled = cancelled;
+                }
+            };
+
+            let progress = ParallelPrimeCachesProgress {
+                crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
+                crates_done,
+                crates_total,
+            };
+
+            cb(progress);
+            db.unwind_if_cancelled();
+        }
+    })
+    .unwrap();
+}
+
+fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> {
     // We're only interested in the workspace crates and the `ImportMap`s of their direct
     // dependencies, though in practice the latter also compute the `DefMap`s.
     // We don't prime transitive dependencies because they're generally not visible in
     // the current workspace.
-    let to_prime: FxHashSet<_> = graph
+    graph
         .iter()
         .filter(|&id| {
             let file_id = graph[id].root_file_id;
@@ -32,17 +169,5 @@ pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress)
             !db.source_root(root_id).is_library
         })
         .flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id))
-        .collect();
-
-    // FIXME: This would be easy to parallelize, since it's in the ideal ordering for that.
-    // Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks
-    // cancellation, so we cannot use rayon.
-    let n_total = to_prime.len();
-    for (n_done, &crate_id) in to_prime.iter().enumerate() {
-        let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
-
-        cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
-        // This also computes the DefMap
-        db.import_map(crate_id);
-    }
+        .collect()
 }
diff --git a/crates/ide/src/prime_caches/topologic_sort.rs b/crates/ide/src/prime_caches/topologic_sort.rs
new file mode 100644
index 00000000000..f89f84bc5b5
--- /dev/null
+++ b/crates/ide/src/prime_caches/topologic_sort.rs
@@ -0,0 +1,101 @@
+use std::{collections::VecDeque, hash::Hash};
+
+use rustc_hash::FxHashMap;
+
+pub struct TopologicSortIterBuilder<T> {
+    nodes: FxHashMap<T, Entry<T>>,
+}
+
+impl<T> TopologicSortIterBuilder<T>
+where
+    T: Copy + Eq + PartialEq + Hash,
+{
+    fn new() -> Self {
+        Self { nodes: Default::default() }
+    }
+
+    fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
+        self.nodes.entry(item).or_default()
+    }
+
+    pub fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
+        let mut num_predecessors = 0;
+
+        for predecessor in predecessors.into_iter() {
+            self.get_or_create_entry(predecessor).successors.push(item);
+            num_predecessors += 1;
+        }
+
+        let entry = self.get_or_create_entry(item);
+        entry.num_predecessors += num_predecessors;
+    }
+
+    pub fn build(self) -> TopologicalSortIter<T> {
+        let ready = self
+            .nodes
+            .iter()
+            .filter_map(
+                |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
+            )
+            .collect();
+
+        TopologicalSortIter { nodes: self.nodes, ready }
+    }
+}
+
+pub struct TopologicalSortIter<T> {
+    ready: VecDeque<T>,
+    nodes: FxHashMap<T, Entry<T>>,
+}
+
+impl<T> TopologicalSortIter<T>
+where
+    T: Copy + Eq + PartialEq + Hash,
+{
+    pub fn builder() -> TopologicSortIterBuilder<T> {
+        TopologicSortIterBuilder::new()
+    }
+
+    pub fn len(&self) -> usize {
+        self.nodes.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    pub fn mark_done(&mut self, item: T) {
+        let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
+
+        for successor in entry.successors {
+            let succ_entry = self
+                .nodes
+                .get_mut(&successor)
+                .expect("invariant: unknown successor referenced by entry");
+
+            succ_entry.num_predecessors -= 1;
+            if succ_entry.num_predecessors == 0 {
+                self.ready.push_back(successor);
+            }
+        }
+    }
+}
+
+impl<T> Iterator for TopologicalSortIter<T> {
+    type Item = T;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.ready.pop_front()
+    }
+}
+
+struct Entry<T> {
+    successors: Vec<T>,
+    num_predecessors: usize,
+}
+
+impl<T> Default for Entry<T> {
+    fn default() -> Self {
+        Self { successors: Default::default(), num_predecessors: 0 }
+    }
+}
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index af987230def..c0736ede090 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -70,7 +70,7 @@ pub(crate) enum Task {
 #[derive(Debug)]
 pub(crate) enum PrimeCachesProgress {
     Begin,
-    Report(ide::PrimeCachesProgress),
+    Report(ide::ParallelPrimeCachesProgress),
     End { cancelled: bool },
 }
 
@@ -291,11 +291,23 @@ impl GlobalState {
                         }
                         PrimeCachesProgress::Report(report) => {
                             state = Progress::Report;
-                            message = Some(format!(
-                                "{}/{} ({})",
-                                report.n_done, report.n_total, report.on_crate
-                            ));
-                            fraction = Progress::fraction(report.n_done, report.n_total);
+
+                            message = match &report.crates_currently_indexing[..] {
+                                [crate_name] => Some(format!(
+                                    "{}/{} ({})",
+                                    report.crates_done, report.crates_total, crate_name
+                                )),
+                                [crate_name, rest @ ..] => Some(format!(
+                                    "{}/{} ({} + {} more)",
+                                    report.crates_done,
+                                    report.crates_total,
+                                    crate_name,
+                                    rest.len()
+                                )),
+                                _ => None,
+                            };
+
+                            fraction = Progress::fraction(report.crates_done, report.crates_total);
                         }
                         PrimeCachesProgress::End { cancelled } => {
                             state = Progress::End;
@@ -497,7 +509,7 @@ impl GlobalState {
                 let analysis = self.snapshot().analysis;
                 move |sender| {
                     sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
-                    let res = analysis.prime_caches(|progress| {
+                    let res = analysis.parallel_prime_caches(32, |progress| {
                         let report = PrimeCachesProgress::Report(progress);
                         sender.send(Task::PrimeCaches(report)).unwrap();
                     });