1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
// So when running tests in parallel there's a potential race on environment
// variables if we let each task spawn its own children - between the time the
// environment is set and the process is spawned another task could spawn its
// child process. Because of that we have to use a complicated scheme with a
// dedicated server for spawning processes.
import core::comm;
import core::option;
import task;
import std::generic_os::setenv;
import std::generic_os::getenv;
import vec;
import std::os;
import std::run;
import std::io;
import str;
import comm::chan;
import comm::port;
import comm::send;
import comm::recv;
import ctypes::{pid_t, fd_t};
export handle;
export mk;
export from_chan;
export run;
export close;
export reqchan;
type reqchan = chan<request>;
type handle =
{task: option::t<(task::task, port<task::task_notification>)>,
chan: reqchan};
tag request { exec([u8], [u8], [[u8]], chan<response>); stop; }
type response = {pid: pid_t, infd: fd_t,
outfd: fd_t, errfd: fd_t};
fn mk() -> handle {
let setupport = port();
let task = task::spawn_joinable(
chan(setupport),
fn (setupchan: chan<chan<request>>) {
let reqport = port();
let reqchan = chan(reqport);
send(setupchan, reqchan);
worker(reqport);
});
ret {task: option::some(task), chan: recv(setupport)};
}
fn from_chan(ch: reqchan) -> handle { {task: option::none, chan: ch} }
fn close(handle: handle) {
send(handle.chan, stop);
task::join(option::get(handle.task));
}
fn run(handle: handle, lib_path: str, prog: str, args: [str],
input: option::t<str>) -> {status: int, out: str, err: str} {
let p = port();
let ch = chan(p);
send(handle.chan,
exec(str::bytes(lib_path), str::bytes(prog), clone_vecstr(args),
ch));
let resp = recv(p);
writeclose(resp.infd, input);
let output = readclose(resp.outfd);
let errput = readclose(resp.errfd);
let status = run::waitpid(resp.pid);
ret {status: status, out: output, err: errput};
}
fn writeclose(fd: fd_t, s: option::t<str>) {
if option::is_some(s) {
let writer = io::new_writer(io::fd_buf_writer(fd, option::none));
writer.write_str(option::get(s));
}
os::close(fd);
}
fn readclose(fd: fd_t) -> str {
// Copied from run::program_output
let file = os::fd_FILE(fd);
let reader = io::new_reader(io::FILE_buf_reader(file, option::none));
let buf = "";
while !reader.eof() {
let bytes = reader.read_bytes(4096u);
buf += str::unsafe_from_bytes(bytes);
}
os::fclose(file);
ret buf;
}
fn worker(p: port<request>) {
// FIXME (787): If we declare this inside of the while loop and then
// break out of it before it's ever initialized (i.e. we don't run
// any tests), then the cleanups will puke.
let execparms;
while true {
// FIXME: Sending strings across channels seems to still
// leave them refed on the sender's end, which causes problems if
// the receiver's poniters outlive the sender's. Here we clone
// everything and let the originals go out of scope before sending
// a response.
execparms =
{
// FIXME (785): The 'discriminant' of an alt expression has
// the same scope as the alt expression itself, so we have to
// put the entire alt in another block to make sure the exec
// message goes out of scope. Seems like the scoping rules for
// the alt discriminant are wrong.
alt recv(p) {
exec(lib_path, prog, args, respchan) {
{lib_path: str::unsafe_from_bytes(lib_path),
prog: str::unsafe_from_bytes(prog),
args: clone_vecu8str(args),
respchan: respchan}
}
stop. { ret }
}
};
// This is copied from run::start_program
let pipe_in = os::pipe();
let pipe_out = os::pipe();
let pipe_err = os::pipe();
let spawnproc =
bind run::spawn_process(execparms.prog, execparms.args,
pipe_in.in, pipe_out.out, pipe_err.out);
let pid = maybe_with_lib_path(execparms.lib_path, spawnproc);
os::close(pipe_in.in);
os::close(pipe_out.out);
os::close(pipe_err.out);
if pid == -1i32 {
os::close(pipe_in.out);
os::close(pipe_out.in);
os::close(pipe_err.in);
fail;
}
send(execparms.respchan,
{pid: pid,
infd: pipe_in.out,
outfd: pipe_out.in,
errfd: pipe_err.in});
}
}
// Only windows needs to set the library path
#[cfg(target_os = "win32")]
fn maybe_with_lib_path<T>(path: str, f: fn@() -> T) -> T {
with_lib_path(path, f)
}
#[cfg(target_os = "linux")]
#[cfg(target_os = "macos")]
fn maybe_with_lib_path<T>(_path: str, f: fn@() -> T) -> T {
f()
}
fn with_lib_path<T>(path: str, f: fn@() -> T) -> T {
let maybe_oldpath = getenv(util::lib_path_env_var());
append_lib_path(path);
let res = f();
if option::is_some(maybe_oldpath) {
export_lib_path(option::get(maybe_oldpath));
} else {
// FIXME: This should really be unset but we don't have that yet
export_lib_path("");
}
ret res;
}
fn append_lib_path(path: str) { export_lib_path(util::make_new_path(path)); }
fn export_lib_path(path: str) { setenv(util::lib_path_env_var(), path); }
fn clone_vecstr(v: [str]) -> [[u8]] {
let r = [];
for t: str in vec::slice(v, 0u, vec::len(v)) { r += [str::bytes(t)]; }
ret r;
}
fn clone_vecu8str(v: [[u8]]) -> [str] {
let r = [];
for t in vec::slice(v, 0u, vec::len(v)) {
r += [str::unsafe_from_bytes(t)];
}
ret r;
}
|