1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
// Copyright 2012-2015 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Manages the communication between the compiler's main thread and
//! the thread that constructs the dependency graph. The basic idea is
//! to use double buffering to lower the cost of producing a message.
//! In the compiler thread, we accumulate messages in a vector until
//! the vector is full, or until we want to query the graph, and then
//! we send that vector over to the depgraph thread. At the same time,
//! we receive an empty vector from the depgraph thread that we can use
//! to accumulate more messages. This way we only ever have two vectors
//! allocated (and both have a fairly large capacity).
use hir::def_id::DefId;
use rustc_data_structures::veccell::VecCell;
use std::cell::Cell;
use std::sync::mpsc::{self, Sender, Receiver};
use std::thread;
use super::DepGraphQuery;
use super::DepNode;
use super::edges::DepGraphEdges;
#[derive(Debug)]
pub enum DepMessage {
Read(DepNode<DefId>),
Write(DepNode<DefId>),
PushTask(DepNode<DefId>),
PopTask(DepNode<DefId>),
PushIgnore,
PopIgnore,
Query,
}
pub struct DepGraphThreadData {
enabled: bool,
// Local counter that just tracks how many tasks are pushed onto the
// stack, so that we still get an error in the case where one is
// missing. If dep-graph construction is enabled, we'd get the same
// error when processing tasks later on, but that's annoying because
// it lacks precision about the source of the error.
tasks_pushed: Cell<usize>,
// current buffer, where we accumulate messages
messages: VecCell<DepMessage>,
// whence to receive new buffer when full
swap_in: Receiver<Vec<DepMessage>>,
// where to send buffer when full
swap_out: Sender<Vec<DepMessage>>,
// where to receive query results
query_in: Receiver<DepGraphQuery<DefId>>,
}
const INITIAL_CAPACITY: usize = 2048;
impl DepGraphThreadData {
pub fn new(enabled: bool) -> DepGraphThreadData {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
let (txq, rxq) = mpsc::channel();
if enabled {
thread::spawn(move || main(rx1, tx2, txq));
}
DepGraphThreadData {
enabled: enabled,
tasks_pushed: Cell::new(0),
messages: VecCell::with_capacity(INITIAL_CAPACITY),
swap_in: rx2,
swap_out: tx1,
query_in: rxq,
}
}
#[inline]
pub fn enabled(&self) -> bool {
self.enabled
}
/// Sends the current batch of messages to the thread. Installs a
/// new vector of messages.
fn swap(&self) {
assert!(self.enabled, "should never swap if not enabled");
// should be a buffer waiting for us (though of course we may
// have to wait for depgraph thread to finish processing the
// old messages)
let new_messages = self.swap_in.recv().unwrap();
assert!(new_messages.is_empty());
// swap in the empty buffer and extract the full one
let old_messages = self.messages.swap(new_messages);
// send full buffer to depgraph thread to be processed
self.swap_out.send(old_messages).unwrap();
}
pub fn query(&self) -> DepGraphQuery<DefId> {
assert!(self.enabled, "cannot query if dep graph construction not enabled");
self.enqueue(DepMessage::Query);
self.swap();
self.query_in.recv().unwrap()
}
/// Enqueue a message to be sent when things are next swapped. (If
/// the buffer is full, this may swap.)
#[inline]
pub fn enqueue(&self, message: DepMessage) {
debug!("enqueue: {:?} tasks_pushed={}", message, self.tasks_pushed.get());
// Regardless of whether dep graph construction is enabled, we
// still want to check that we always have a valid task on the
// stack when a read/write/etc event occurs.
match message {
DepMessage::Read(_) | DepMessage::Write(_) =>
if self.tasks_pushed.get() == 0 {
self.invalid_message("read/write but no current task")
},
DepMessage::PushTask(_) | DepMessage::PushIgnore =>
self.tasks_pushed.set(self.tasks_pushed.get() + 1),
DepMessage::PopTask(_) | DepMessage::PopIgnore =>
self.tasks_pushed.set(self.tasks_pushed.get() - 1),
DepMessage::Query =>
(),
}
if self.enabled {
self.enqueue_enabled(message);
}
}
// Outline this fn since I expect it may want to be inlined
// separately.
fn enqueue_enabled(&self, message: DepMessage) {
let len = self.messages.push(message);
if len == INITIAL_CAPACITY {
self.swap();
}
}
// Outline this too.
fn invalid_message(&self, string: &str) {
bug!("{}; see src/librustc/dep_graph/README.md for more information", string)
}
}
/// Definition of the depgraph thread.
pub fn main(swap_in: Receiver<Vec<DepMessage>>,
swap_out: Sender<Vec<DepMessage>>,
query_out: Sender<DepGraphQuery<DefId>>) {
let mut edges = DepGraphEdges::new();
// the compiler thread always expects a fresh buffer to be
// waiting, so queue one up
swap_out.send(Vec::with_capacity(INITIAL_CAPACITY)).unwrap();
// process the buffers from compiler thread as we receive them
for mut messages in swap_in {
for msg in messages.drain(..) {
match msg {
DepMessage::Read(node) => edges.read(node),
DepMessage::Write(node) => edges.write(node),
DepMessage::PushTask(node) => edges.push_task(node),
DepMessage::PopTask(node) => edges.pop_task(node),
DepMessage::PushIgnore => edges.push_ignore(),
DepMessage::PopIgnore => edges.pop_ignore(),
DepMessage::Query => query_out.send(edges.query()).unwrap(),
}
}
if let Err(_) = swap_out.send(messages) {
// the receiver must have been dropped already
break;
}
}
}
|