1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
// NB: transitionary, de-mode-ing.
#[forbid(deprecated_mode)];
#[forbid(deprecated_pattern)];
#[doc(hidden)];
export chan_from_global_ptr, weaken_task;
use compare_and_swap = rustrt::rust_compare_and_swap_ptr;
use task::TaskBuilder;
#[allow(non_camel_case_types)] // runtime type
type rust_port_id = uint;
extern mod rustrt {
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
oldval: libc::uintptr_t,
newval: libc::uintptr_t) -> bool;
fn rust_task_weaken(ch: rust_port_id);
fn rust_task_unweaken(ch: rust_port_id);
}
type GlobalPtr = *libc::uintptr_t;
/**
* Atomically gets a channel from a pointer to a pointer-sized memory location
* or, if no channel exists creates and installs a new channel and sets up a
* new task to receive from it.
*/
unsafe fn chan_from_global_ptr<T: Send>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
+f: fn~(comm::Port<T>)
) -> comm::Chan<T> {
enum Msg {
Proceed,
Abort
}
log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
let is_probably_zero = *global == 0u;
log(debug,~"after is_prob_zero check");
if is_probably_zero {
log(debug,~"is probably zero...");
// There's no global channel. We must make it
let (setup_po, setup_ch) = do task_fn().spawn_conversation
|setup_po, setup_ch| {
let po = comm::Port::<T>();
let ch = comm::Chan(po);
comm::send(setup_ch, ch);
// Wait to hear if we are the official instance of
// this global task
match comm::recv::<Msg>(setup_po) {
Proceed => f(po),
Abort => ()
}
};
log(debug,~"before setup recv..");
// This is the proposed global channel
let ch = comm::recv(setup_po);
// 0 is our sentinal value. It is not a valid channel
assert unsafe::reinterpret_cast(&ch) != 0u;
// Install the channel
log(debug,~"BEFORE COMPARE AND SWAP");
let swapped = compare_and_swap(
global, 0u, unsafe::reinterpret_cast(&ch));
log(debug,fmt!("AFTER .. swapped? %?", swapped));
if swapped {
// Success!
comm::send(setup_ch, Proceed);
ch
} else {
// Somebody else got in before we did
comm::send(setup_ch, Abort);
unsafe::reinterpret_cast(&*global)
}
} else {
log(debug, ~"global != 0");
unsafe::reinterpret_cast(&*global)
}
}
#[test]
fn test_from_global_chan1() {
// This is unreadable, right?
// The global channel
let globchan = 0u;
let globchanp = ptr::addr_of(globchan);
// Create the global channel, attached to a new task
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = comm::recv(po);
comm::send(ch, true);
let ch = comm::recv(po);
comm::send(ch, true);
}
};
// Talk to it
let po = comm::Port();
comm::send(ch, comm::Chan(po));
assert comm::recv(po) == true;
// This one just reuses the previous channel
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = comm::recv(po);
comm::send(ch, false);
}
};
// Talk to the original global task
let po = comm::Port();
comm::send(ch, comm::Chan(po));
assert comm::recv(po) == true;
}
#[test]
fn test_from_global_chan2() {
for iter::repeat(100u) {
// The global channel
let globchan = 0u;
let globchanp = ptr::addr_of(globchan);
let resultpo = comm::Port();
let resultch = comm::Chan(resultpo);
// Spawn a bunch of tasks that all want to compete to
// create the global channel
for uint::range(0u, 10u) |i| {
do task::spawn {
let ch = unsafe {
do chan_from_global_ptr(
globchanp, task::task) |po| {
for uint::range(0u, 10u) |_j| {
let ch = comm::recv(po);
comm::send(ch, {i});
}
}
};
let po = comm::Port();
comm::send(ch, comm::Chan(po));
// We are The winner if our version of the
// task was installed
let winner = comm::recv(po);
comm::send(resultch, winner == i);
}
}
// There should be only one winner
let mut winners = 0u;
for uint::range(0u, 10u) |_i| {
let res = comm::recv(resultpo);
if res { winners += 1u };
}
assert winners == 1u;
}
}
/**
* Convert the current task to a 'weak' task temporarily
*
* As a weak task it will not be counted towards the runtime's set
* of live tasks. When there are no more outstanding live (non-weak) tasks
* the runtime will send an exit message on the provided channel.
*
* This function is super-unsafe. Do not use.
*
* # Safety notes
*
* * Weak tasks must either die on their own or exit upon receipt of
* the exit message. Failure to do so will cause the runtime to never
* exit
* * Tasks must not call `weaken_task` multiple times. This will
* break the kernel's accounting of live tasks.
* * Weak tasks must not be supervised. A supervised task keeps
* a reference to its parent, so the parent will not die.
*/
unsafe fn weaken_task(f: fn(comm::Port<()>)) {
let po = comm::Port();
let ch = comm::Chan(po);
unsafe {
rustrt::rust_task_weaken(unsafe::reinterpret_cast(&ch));
}
let _unweaken = Unweaken(ch);
f(po);
struct Unweaken {
ch: comm::Chan<()>,
drop unsafe {
rustrt::rust_task_unweaken(unsafe::reinterpret_cast(&self.ch));
}
}
fn Unweaken(ch: comm::Chan<()>) -> Unweaken {
Unweaken {
ch: ch
}
}
}
#[test]
fn test_weaken_task_then_unweaken() {
do task::try {
unsafe {
do weaken_task |_po| {
}
}
};
}
#[test]
fn test_weaken_task_wait() {
do task::spawn_unlinked {
unsafe {
do weaken_task |po| {
comm::recv(po);
}
}
}
}
#[test]
fn test_weaken_task_stress() {
// Create a bunch of weak tasks
for iter::repeat(100u) {
do task::spawn {
unsafe {
do weaken_task |_po| {
}
}
}
do task::spawn_unlinked {
unsafe {
do weaken_task |po| {
// Wait for it to tell us to die
comm::recv(po);
}
}
}
}
}
#[test]
#[ignore(cfg(windows))]
fn test_weaken_task_fail() {
let res = do task::try {
unsafe {
do weaken_task |_po| {
fail;
}
}
};
assert result::is_err(res);
}
|