about summary refs log tree commit diff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-01-25 16:03:35 +0000
committerGitHub <noreply@github.com>2022-01-25 16:03:35 +0000
commit2cb85c14b622002767c66881c6a316a94e0f0be4 (patch)
tree141d17e197d3da3fad2d61eb99ae313be893b371
parent5f13d6af9ff59a07eb9d4e5b269c735bae17c372 (diff)
parent25f67b6939ec84235b12a35c1e2176f3c68a6dea (diff)
downloadrust-2cb85c14b622002767c66881c6a316a94e0f0be4.tar.gz
rust-2cb85c14b622002767c66881c6a316a94e0f0be4.zip
Merge #11281
11281: ide: parallel prime caches r=jonas-schievink a=jhgg

cache priming goes brrrr... the successor to #10149

---

this PR implements a parallel cache priming strategy that uses a topological work queue to feed a pool of worker threads the crates to index in parallel.

## todo
- [x] should we keep the old prime caches?
- [x] we should use num_cpus to detect how many cpus to use to prime caches. should we also expose a config for # of worker CPU threads to use?
- [x] something is wonky with cancellation, need to figure it out before this can merge. 

Co-authored-by: Jake Heinz <jh@discordapp.com>
-rw-r--r--Cargo.lock2
-rw-r--r--crates/ide/Cargo.toml1
-rw-r--r--crates/ide/src/lib.rs8
-rw-r--r--crates/ide/src/prime_caches.rs153
-rw-r--r--crates/ide/src/prime_caches/topologic_sort.rs98
-rw-r--r--crates/rust-analyzer/Cargo.toml1
-rw-r--r--crates/rust-analyzer/src/cli/load_cargo.rs2
-rw-r--r--crates/rust-analyzer/src/config.rs17
-rw-r--r--crates/rust-analyzer/src/main_loop.rs28
-rw-r--r--docs/user/generated_config.adoc5
-rw-r--r--editors/code/package.json7
11 files changed, 289 insertions, 33 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d5a67c40930..6cbb1324e2e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -597,6 +597,7 @@ version = "0.0.0"
 dependencies = [
  "cfg",
  "cov-mark",
+ "crossbeam-channel",
  "dot",
  "either",
  "expect-test",
@@ -1367,6 +1368,7 @@ dependencies = [
  "lsp-types",
  "mbe",
  "mimalloc",
+ "num_cpus",
  "oorandom",
  "parking_lot",
  "proc_macro_api",
diff --git a/crates/ide/Cargo.toml b/crates/ide/Cargo.toml
index ae1109a63db..250673d3fe7 100644
--- a/crates/ide/Cargo.toml
+++ b/crates/ide/Cargo.toml
@@ -11,6 +11,7 @@ doctest = false
 
 [dependencies]
 cov-mark = "2.0.0-pre.1"
+crossbeam-channel = "0.5.0"
 either = "1.5.3"
 itertools = "0.10.0"
 tracing = "0.1"
diff --git a/crates/ide/src/lib.rs b/crates/ide/src/lib.rs
index db7b80f71bf..4028b0bc725 100644
--- a/crates/ide/src/lib.rs
+++ b/crates/ide/src/lib.rs
@@ -87,7 +87,7 @@ pub use crate::{
     moniker::{MonikerKind, MonikerResult, PackageInformation},
     move_item::Direction,
     navigation_target::NavigationTarget,
-    prime_caches::PrimeCachesProgress,
+    prime_caches::ParallelPrimeCachesProgress,
     references::ReferenceSearchResult,
     rename::RenameError,
     runnables::{Runnable, RunnableKind, TestId},
@@ -244,11 +244,11 @@ impl Analysis {
         self.with_db(|db| status::status(&*db, file_id))
     }
 
-    pub fn prime_caches<F>(&self, cb: F) -> Cancellable<()>
+    pub fn parallel_prime_caches<F>(&self, num_worker_threads: u8, cb: F) -> Cancellable<()>
     where
-        F: Fn(PrimeCachesProgress) + Sync + std::panic::UnwindSafe,
+        F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
     {
-        self.with_db(move |db| prime_caches::prime_caches(db, &cb))
+        self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb))
     }
 
     /// Gets the text of the source file.
diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs
index 5eba1d1e276..892b34c7d90 100644
--- a/crates/ide/src/prime_caches.rs
+++ b/crates/ide/src/prime_caches.rs
@@ -2,29 +2,152 @@
 //! sometimes is counter productive when, for example, the first goto definition
 //! request takes longer to compute. This modules implemented prepopulation of
 //! various caches, it's not really advanced at the moment.
+mod topologic_sort;
+
+use std::time::Duration;
 
 use hir::db::DefDatabase;
-use ide_db::base_db::{SourceDatabase, SourceDatabaseExt};
+use ide_db::{
+    base_db::{
+        salsa::{Database, ParallelDatabase, Snapshot},
+        Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt,
+    },
+    FxIndexMap,
+};
 use rustc_hash::FxHashSet;
 
 use crate::RootDatabase;
 
-/// We started indexing a crate.
+/// We're indexing many crates.
 #[derive(Debug)]
-pub struct PrimeCachesProgress {
-    pub on_crate: String,
-    pub n_done: usize,
-    pub n_total: usize,
+pub struct ParallelPrimeCachesProgress {
+    /// the crates that we are currently priming.
+    pub crates_currently_indexing: Vec<String>,
+    /// the total number of crates we want to prime.
+    pub crates_total: usize,
+    /// the total number of crates that have finished priming
+    pub crates_done: usize,
 }
 
-pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) {
+pub(crate) fn parallel_prime_caches(
+    db: &RootDatabase,
+    num_worker_threads: u8,
+    cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
+) {
     let _p = profile::span("prime_caches");
+
     let graph = db.crate_graph();
+    let mut crates_to_prime = {
+        let crate_ids = compute_crates_to_prime(db, &graph);
+
+        let mut builder = topologic_sort::TopologicalSortIter::builder();
+
+        for &crate_id in &crate_ids {
+            let crate_data = &graph[crate_id];
+            let dependencies = crate_data
+                .dependencies
+                .iter()
+                .map(|d| d.crate_id)
+                .filter(|i| crate_ids.contains(i));
+
+            builder.add(crate_id, dependencies);
+        }
+
+        builder.build()
+    };
+
+    enum ParallelPrimeCacheWorkerProgress {
+        BeginCrate { crate_id: CrateId, crate_name: String },
+        EndCrate { crate_id: CrateId },
+    }
+
+    let (work_sender, progress_receiver) = {
+        let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
+        let (work_sender, work_receiver) = crossbeam_channel::unbounded();
+        let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
+            while let Ok((crate_id, crate_name)) = work_receiver.recv() {
+                progress_sender
+                    .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
+
+                // This also computes the DefMap
+                db.import_map(crate_id);
+
+                progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?;
+            }
+
+            Ok::<_, crossbeam_channel::SendError<_>>(())
+        };
+
+        for _ in 0..num_worker_threads {
+            let worker = prime_caches_worker.clone();
+            let db = db.snapshot();
+            std::thread::spawn(move || Cancelled::catch(|| worker(db)));
+        }
+
+        (work_sender, progress_receiver)
+    };
+
+    let crates_total = crates_to_prime.pending();
+    let mut crates_done = 0;
+
+    // an index map is used to preserve ordering so we can sort the progress report in order of
+    // "longest crate to index" first
+    let mut crates_currently_indexing =
+        FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
+
+    while crates_done < crates_total {
+        db.unwind_if_cancelled();
+
+        for crate_id in &mut crates_to_prime {
+            work_sender
+                .send((
+                    crate_id,
+                    graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
+                ))
+                .ok();
+        }
+
+        // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
+        // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
+        // if this thread exits, and closes the work channel.
+        let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
+            Ok(p) => p,
+            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
+                continue;
+            }
+            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
+                // our workers may have died from a cancelled task, so we'll check and re-raise here.
+                db.unwind_if_cancelled();
+                break;
+            }
+        };
+        match worker_progress {
+            ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
+                crates_currently_indexing.insert(crate_id, crate_name);
+            }
+            ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
+                crates_currently_indexing.remove(&crate_id);
+                crates_to_prime.mark_done(crate_id);
+                crates_done += 1;
+            }
+        };
+
+        let progress = ParallelPrimeCachesProgress {
+            crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
+            crates_done,
+            crates_total,
+        };
+
+        cb(progress);
+    }
+}
+
+fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> {
     // We're only interested in the workspace crates and the `ImportMap`s of their direct
     // dependencies, though in practice the latter also compute the `DefMap`s.
     // We don't prime transitive dependencies because they're generally not visible in
     // the current workspace.
-    let to_prime: FxHashSet<_> = graph
+    graph
         .iter()
         .filter(|&id| {
             let file_id = graph[id].root_file_id;
@@ -32,17 +155,5 @@ pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress)
             !db.source_root(root_id).is_library
         })
         .flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id))
-        .collect();
-
-    // FIXME: This would be easy to parallelize, since it's in the ideal ordering for that.
-    // Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks
-    // cancellation, so we cannot use rayon.
-    let n_total = to_prime.len();
-    for (n_done, &crate_id) in to_prime.iter().enumerate() {
-        let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
-
-        cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
-        // This also computes the DefMap
-        db.import_map(crate_id);
-    }
+        .collect()
 }
diff --git a/crates/ide/src/prime_caches/topologic_sort.rs b/crates/ide/src/prime_caches/topologic_sort.rs
new file mode 100644
index 00000000000..b04087fa7bd
--- /dev/null
+++ b/crates/ide/src/prime_caches/topologic_sort.rs
@@ -0,0 +1,98 @@
+//! helper data structure to schedule work for parallel prime caches.
+use std::{collections::VecDeque, hash::Hash};
+
+use rustc_hash::FxHashMap;
+
+pub(crate) struct TopologicSortIterBuilder<T> {
+    nodes: FxHashMap<T, Entry<T>>,
+}
+
+impl<T> TopologicSortIterBuilder<T>
+where
+    T: Copy + Eq + PartialEq + Hash,
+{
+    fn new() -> Self {
+        Self { nodes: Default::default() }
+    }
+
+    fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
+        self.nodes.entry(item).or_default()
+    }
+
+    pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
+        let mut num_predecessors = 0;
+
+        for predecessor in predecessors.into_iter() {
+            self.get_or_create_entry(predecessor).successors.push(item);
+            num_predecessors += 1;
+        }
+
+        let entry = self.get_or_create_entry(item);
+        entry.num_predecessors += num_predecessors;
+    }
+
+    pub(crate) fn build(self) -> TopologicalSortIter<T> {
+        let ready = self
+            .nodes
+            .iter()
+            .filter_map(
+                |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
+            )
+            .collect();
+
+        TopologicalSortIter { nodes: self.nodes, ready }
+    }
+}
+
+pub(crate) struct TopologicalSortIter<T> {
+    ready: VecDeque<T>,
+    nodes: FxHashMap<T, Entry<T>>,
+}
+
+impl<T> TopologicalSortIter<T>
+where
+    T: Copy + Eq + PartialEq + Hash,
+{
+    pub(crate) fn builder() -> TopologicSortIterBuilder<T> {
+        TopologicSortIterBuilder::new()
+    }
+
+    pub(crate) fn pending(&self) -> usize {
+        self.nodes.len()
+    }
+
+    pub(crate) fn mark_done(&mut self, item: T) {
+        let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
+
+        for successor in entry.successors {
+            let succ_entry = self
+                .nodes
+                .get_mut(&successor)
+                .expect("invariant: unknown successor referenced by entry");
+
+            succ_entry.num_predecessors -= 1;
+            if succ_entry.num_predecessors == 0 {
+                self.ready.push_back(successor);
+            }
+        }
+    }
+}
+
+impl<T> Iterator for TopologicalSortIter<T> {
+    type Item = T;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.ready.pop_front()
+    }
+}
+
+struct Entry<T> {
+    successors: Vec<T>,
+    num_predecessors: usize,
+}
+
+impl<T> Default for Entry<T> {
+    fn default() -> Self {
+        Self { successors: Default::default(), num_predecessors: 0 }
+    }
+}
diff --git a/crates/rust-analyzer/Cargo.toml b/crates/rust-analyzer/Cargo.toml
index 0ab866a707e..014001397de 100644
--- a/crates/rust-analyzer/Cargo.toml
+++ b/crates/rust-analyzer/Cargo.toml
@@ -31,6 +31,7 @@ serde = { version = "1.0.106", features = ["derive"] }
 serde_json = { version = "1.0.48", features = ["preserve_order"] }
 threadpool = "1.7.1"
 rayon = "1.5"
+num_cpus = "1.13.1"
 mimalloc = { version = "0.1.19", default-features = false, optional = true }
 lsp-server = "0.5.1"
 tracing = "0.1"
diff --git a/crates/rust-analyzer/src/cli/load_cargo.rs b/crates/rust-analyzer/src/cli/load_cargo.rs
index 19ce86e3ffa..490aef50f3e 100644
--- a/crates/rust-analyzer/src/cli/load_cargo.rs
+++ b/crates/rust-analyzer/src/cli/load_cargo.rs
@@ -88,7 +88,7 @@ pub fn load_workspace(
         load_crate_graph(crate_graph, project_folders.source_root_config, &mut vfs, &receiver);
 
     if load_config.prefill_caches {
-        host.analysis().prime_caches(|_| {})?;
+        host.analysis().parallel_prime_caches(1, |_| {})?;
     }
     Ok((host, vfs, proc_macro_client))
 }
diff --git a/crates/rust-analyzer/src/config.rs b/crates/rust-analyzer/src/config.rs
index 1df19ffe780..76b72707974 100644
--- a/crates/rust-analyzer/src/config.rs
+++ b/crates/rust-analyzer/src/config.rs
@@ -298,6 +298,9 @@ config_data! {
         /// Whether to show `can't find Cargo.toml` error message.
         notifications_cargoTomlNotFound: bool      = "true",
 
+        /// How many worker threads to to handle priming caches. The default `0` means to pick automatically.
+        primeCaches_numThreads: ParallelPrimeCachesNumThreads = "0",
+
         /// Enable support for procedural macros, implies `#rust-analyzer.cargo.runBuildScripts#`.
         procMacro_enable: bool                     = "true",
         /// Internal config, path to proc-macro server executable (typically,
@@ -1016,6 +1019,13 @@ impl Config {
             yield_points: self.data.highlightRelated_yieldPoints,
         }
     }
+
+    pub fn prime_caches_num_threads(&self) -> u8 {
+        match self.data.primeCaches_numThreads {
+            0 => num_cpus::get_physical().try_into().unwrap_or(u8::MAX),
+            n => n,
+        }
+    }
 }
 
 #[derive(Deserialize, Debug, Clone, Copy)]
@@ -1130,6 +1140,8 @@ enum WorkspaceSymbolSearchKindDef {
     AllSymbols,
 }
 
+type ParallelPrimeCachesNumThreads = u8;
+
 macro_rules! _config_data {
     (struct $name:ident {
         $(
@@ -1351,6 +1363,11 @@ fn field_props(field: &str, ty: &str, doc: &[&str], default: &str) -> serde_json
                 "Search for all symbols kinds"
             ],
         },
+        "ParallelPrimeCachesNumThreads" => set! {
+            "type": "number",
+            "minimum": 0,
+            "maximum": 255
+        },
         _ => panic!("{}: {}", ty, default),
     }
 
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index 830b77f3918..5bec301736a 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -70,7 +70,7 @@ pub(crate) enum Task {
 #[derive(Debug)]
 pub(crate) enum PrimeCachesProgress {
     Begin,
-    Report(ide::PrimeCachesProgress),
+    Report(ide::ParallelPrimeCachesProgress),
     End { cancelled: bool },
 }
 
@@ -291,11 +291,23 @@ impl GlobalState {
                         }
                         PrimeCachesProgress::Report(report) => {
                             state = Progress::Report;
-                            message = Some(format!(
-                                "{}/{} ({})",
-                                report.n_done, report.n_total, report.on_crate
-                            ));
-                            fraction = Progress::fraction(report.n_done, report.n_total);
+
+                            message = match &report.crates_currently_indexing[..] {
+                                [crate_name] => Some(format!(
+                                    "{}/{} ({})",
+                                    report.crates_done, report.crates_total, crate_name
+                                )),
+                                [crate_name, rest @ ..] => Some(format!(
+                                    "{}/{} ({} + {} more)",
+                                    report.crates_done,
+                                    report.crates_total,
+                                    crate_name,
+                                    rest.len()
+                                )),
+                                _ => None,
+                            };
+
+                            fraction = Progress::fraction(report.crates_done, report.crates_total);
                         }
                         PrimeCachesProgress::End { cancelled } => {
                             state = Progress::End;
@@ -493,11 +505,13 @@ impl GlobalState {
             self.fetch_build_data();
         }
         if self.prime_caches_queue.should_start_op() {
+            let num_worker_threads = self.config.prime_caches_num_threads();
+
             self.task_pool.handle.spawn_with_sender({
                 let analysis = self.snapshot().analysis;
                 move |sender| {
                     sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
-                    let res = analysis.prime_caches(|progress| {
+                    let res = analysis.parallel_prime_caches(num_worker_threads, |progress| {
                         let report = PrimeCachesProgress::Report(progress);
                         sender.send(Task::PrimeCaches(report)).unwrap();
                     });
diff --git a/docs/user/generated_config.adoc b/docs/user/generated_config.adoc
index f7a533c7c23..b10b0d35522 100644
--- a/docs/user/generated_config.adoc
+++ b/docs/user/generated_config.adoc
@@ -454,6 +454,11 @@ Number of syntax trees rust-analyzer keeps in memory. Defaults to 128.
 --
 Whether to show `can't find Cargo.toml` error message.
 --
+[[rust-analyzer.primeCaches.numThreads]]rust-analyzer.primeCaches.numThreads (default: `0`)::
++
+--
+How many worker threads to to handle priming caches. The default `0` means to pick automatically.
+--
 [[rust-analyzer.procMacro.enable]]rust-analyzer.procMacro.enable (default: `true`)::
 +
 --
diff --git a/editors/code/package.json b/editors/code/package.json
index ed81cb52403..8f4157da0d1 100644
--- a/editors/code/package.json
+++ b/editors/code/package.json
@@ -880,6 +880,13 @@
                     "default": true,
                     "type": "boolean"
                 },
+                "rust-analyzer.primeCaches.numThreads": {
+                    "markdownDescription": "How many worker threads to to handle priming caches. The default `0` means to pick automatically.",
+                    "default": 0,
+                    "type": "number",
+                    "minimum": 0,
+                    "maximum": 255
+                },
                 "rust-analyzer.procMacro.enable": {
                     "markdownDescription": "Enable support for procedural macros, implies `#rust-analyzer.cargo.runBuildScripts#`.",
                     "default": true,