// Copyright 2012-2015 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! Manages the communication between the compiler's main thread and //! the thread that constructs the dependency graph. The basic idea is //! to use double buffering to lower the cost of producing a message. //! In the compiler thread, we accumulate messages in a vector until //! the vector is full, or until we want to query the graph, and then //! we send that vector over to the depgraph thread. At the same time, //! we receive an empty vector from the depgraph thread that we can use //! to accumulate more messages. This way we only ever have two vectors //! allocated (and both have a fairly large capacity). use hir::def_id::DefId; use rustc_data_structures::veccell::VecCell; use std::cell::Cell; use std::sync::mpsc::{self, Sender, Receiver}; use std::thread; use super::DepGraphQuery; use super::DepNode; use super::edges::DepGraphEdges; #[derive(Debug)] pub enum DepMessage { Read(DepNode), Write(DepNode), PushTask(DepNode), PopTask(DepNode), PushIgnore, PopIgnore, Query, } pub struct DepGraphThreadData { enabled: bool, // Local counter that just tracks how many tasks are pushed onto the // stack, so that we still get an error in the case where one is // missing. If dep-graph construction is enabled, we'd get the same // error when processing tasks later on, but that's annoying because // it lacks precision about the source of the error. tasks_pushed: Cell, // current buffer, where we accumulate messages messages: VecCell, // whence to receive new buffer when full swap_in: Receiver>, // where to send buffer when full swap_out: Sender>, // where to receive query results query_in: Receiver>, } const INITIAL_CAPACITY: usize = 2048; impl DepGraphThreadData { pub fn new(enabled: bool) -> DepGraphThreadData { let (tx1, rx1) = mpsc::channel(); let (tx2, rx2) = mpsc::channel(); let (txq, rxq) = mpsc::channel(); if enabled { thread::spawn(move || main(rx1, tx2, txq)); } DepGraphThreadData { enabled: enabled, tasks_pushed: Cell::new(0), messages: VecCell::with_capacity(INITIAL_CAPACITY), swap_in: rx2, swap_out: tx1, query_in: rxq, } } #[inline] pub fn enabled(&self) -> bool { self.enabled } /// Sends the current batch of messages to the thread. Installs a /// new vector of messages. fn swap(&self) { assert!(self.enabled, "should never swap if not enabled"); // should be a buffer waiting for us (though of course we may // have to wait for depgraph thread to finish processing the // old messages) let new_messages = self.swap_in.recv().unwrap(); assert!(new_messages.is_empty()); // swap in the empty buffer and extract the full one let old_messages = self.messages.swap(new_messages); // send full buffer to depgraph thread to be processed self.swap_out.send(old_messages).unwrap(); } pub fn query(&self) -> DepGraphQuery { assert!(self.enabled, "cannot query if dep graph construction not enabled"); self.enqueue(DepMessage::Query); self.swap(); self.query_in.recv().unwrap() } /// Enqueue a message to be sent when things are next swapped. (If /// the buffer is full, this may swap.) #[inline] pub fn enqueue(&self, message: DepMessage) { debug!("enqueue: {:?} tasks_pushed={}", message, self.tasks_pushed.get()); // Regardless of whether dep graph construction is enabled, we // still want to check that we always have a valid task on the // stack when a read/write/etc event occurs. match message { DepMessage::Read(_) | DepMessage::Write(_) => if self.tasks_pushed.get() == 0 { self.invalid_message("read/write but no current task") }, DepMessage::PushTask(_) | DepMessage::PushIgnore => self.tasks_pushed.set(self.tasks_pushed.get() + 1), DepMessage::PopTask(_) | DepMessage::PopIgnore => self.tasks_pushed.set(self.tasks_pushed.get() - 1), DepMessage::Query => (), } if self.enabled { self.enqueue_enabled(message); } } // Outline this fn since I expect it may want to be inlined // separately. fn enqueue_enabled(&self, message: DepMessage) { let len = self.messages.push(message); if len == INITIAL_CAPACITY { self.swap(); } } // Outline this too. fn invalid_message(&self, string: &str) { bug!("{}; see src/librustc/dep_graph/README.md for more information", string) } } /// Definition of the depgraph thread. pub fn main(swap_in: Receiver>, swap_out: Sender>, query_out: Sender>) { let mut edges = DepGraphEdges::new(); // the compiler thread always expects a fresh buffer to be // waiting, so queue one up swap_out.send(Vec::with_capacity(INITIAL_CAPACITY)).unwrap(); // process the buffers from compiler thread as we receive them for mut messages in swap_in { for msg in messages.drain(..) { match msg { DepMessage::Read(node) => edges.read(node), DepMessage::Write(node) => edges.write(node), DepMessage::PushTask(node) => edges.push_task(node), DepMessage::PopTask(node) => edges.pop_task(node), DepMessage::PushIgnore => edges.push_ignore(), DepMessage::PopIgnore => edges.pop_ignore(), DepMessage::Query => query_out.send(edges.query()).unwrap(), } } if let Err(_) = swap_out.send(messages) { // the receiver must have been dropped already break; } } }