1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
use std::sync::{Arc, LazyLock, OnceLock};
pub use jobserver_crate::{Acquired, Client, HelperThread};
use jobserver_crate::{FromEnv, FromEnvErrorKind};
use parking_lot::{Condvar, Mutex};
// We can only call `from_env_ext` once per process
// We stick this in a global because there could be multiple rustc instances
// in this process, and the jobserver is per-process.
static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
// Note that this is unsafe because it may misinterpret file descriptors
// on Unix as jobserver file descriptors. We hopefully execute this near
// the beginning of the process though to ensure we don't get false
// positives, or in other words we try to execute this before we open
// any file descriptors ourselves.
let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
let error = match client {
Ok(client) => return Ok(client),
Err(e) => e,
};
if matches!(
error.kind(),
FromEnvErrorKind::NoEnvVar
| FromEnvErrorKind::NoJobserver
| FromEnvErrorKind::NegativeFd
| FromEnvErrorKind::Unsupported
) {
return Ok(default_client());
}
// Environment specifies jobserver, but it looks incorrect.
// Safety: `error.kind()` should be `NoEnvVar` if `var == None`.
let (name, value) = var.unwrap();
Err(format!(
"failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
value
))
});
// Create a new jobserver if there's no inherited one.
fn default_client() -> Client {
// Pick a "reasonable maximum" capping out at 32
// so we don't take everything down by hogging the process run queue.
// The fixed number is used to have deterministic compilation across machines.
let client = Client::new(32).expect("failed to create jobserver");
// Acquire a token for the main thread which we can release later
client.acquire_raw().ok();
client
}
static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
let client_checked = match &*GLOBAL_CLIENT {
Ok(client) => client.clone(),
Err(e) => {
report_warning(e);
default_client()
}
};
GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
}
const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
pub fn client() -> Client {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
}
struct ProxyData {
/// The number of tokens assigned to threads.
/// If this is 0, a single token is still assigned to this process, but is unused.
used: u16,
/// The number of threads requesting a token
pending: u16,
}
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
pub struct Proxy {
client: Client,
data: Mutex<ProxyData>,
/// Threads which are waiting on a token will wait on this.
wake_pending: Condvar,
helper: OnceLock<HelperThread>,
}
impl Proxy {
pub fn new() -> Arc<Self> {
let proxy = Arc::new(Proxy {
client: client(),
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
wake_pending: Condvar::new(),
helper: OnceLock::new(),
});
let proxy_ = Arc::clone(&proxy);
let helper = proxy
.client
.clone()
.into_helper_thread(move |token| {
if let Ok(token) = token {
let mut data = proxy_.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
token.drop_without_releasing();
assert!(data.used > 0);
data.used += 1;
data.pending -= 1;
proxy_.wake_pending.notify_one();
} else {
// The token is no longer needed, drop it.
drop(data);
drop(token);
}
}
})
.expect("failed to create helper thread");
proxy.helper.set(helper).unwrap();
proxy
}
pub fn acquire_thread(&self) {
let mut data = self.data.lock();
if data.used == 0 {
// There was a free token around. This can
// happen when all threads release their token.
assert_eq!(data.pending, 0);
data.used += 1;
} else {
// Request a token from the helper thread. We can't directly use `acquire_raw`
// as we also need to be able to wait for the final token in the process which
// does not get a corresponding `release_raw` call.
self.helper.get().unwrap().request_token();
data.pending += 1;
self.wake_pending.wait(&mut data);
}
}
pub fn release_thread(&self) {
let mut data = self.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
data.pending -= 1;
self.wake_pending.notify_one();
} else {
data.used -= 1;
// Release the token unless it's the last one in the process
if data.used > 0 {
drop(data);
self.client.release_raw().ok();
}
}
}
}
|