diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-05-05 16:58:42 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-05-13 17:27:42 -0700 |
| commit | f09592a5d154177f0c9d739c9fe60742ec4cd951 (patch) | |
| tree | 79540b4f5f7e125a260b9ca00f25e796241425f9 /src/libstd | |
| parent | 9f7caed2024268f6de16f99b6696d191f3ca3228 (diff) | |
| download | rust-f09592a5d154177f0c9d739c9fe60742ec4cd951.tar.gz rust-f09592a5d154177f0c9d739c9fe60742ec4cd951.zip | |
io: Implement process wait timeouts
This implements set_timeout() for std::io::Process which will affect wait() operations on the process. This follows the same pattern as the rest of the timeouts emerging in std::io::net. The implementation was super easy for everything except libnative on unix (backwards from usual!), which required a good bit of signal handling. There's a doc comment explaining the strategy in libnative. Internally, this also required refactoring the "helper thread" implementation used by libnative to allow for an extra helper thread (not just the timer). This is a breaking change in terms of the io::Process API. It is now possible for wait() to fail, and subsequently wait_with_output(). These two functions now return IoResult<T> due to the fact that they can time out. Additionally, the wait_with_output() function has moved from taking `&mut self` to taking `self`. If a timeout occurs while waiting with output, the semantics are undesirable in almost all cases if attempting to re-wait on the process. Equivalent functionality can still be achieved by dealing with the output handles manually. [breaking-change] cc #13523
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/io/process.rs | 190 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 3 |
2 files changed, 133 insertions, 60 deletions
diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index 529fd25dc50..349cac723ff 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -10,6 +10,8 @@ //! Bindings for executing child processes +#![allow(experimental)] + use prelude::*; use fmt; @@ -50,7 +52,7 @@ use rt::rtio::{RtioProcess, IoFactory, LocalIo}; /// }; /// /// let contents = child.stdout.get_mut_ref().read_to_end(); -/// assert!(child.wait().success()); +/// assert!(child.wait().unwrap().success()); /// ``` pub struct Process { handle: Box<RtioProcess:Send>, @@ -284,7 +286,7 @@ impl Process { /// println!("stderr: {}", str::from_utf8_lossy(output.error.as_slice())); /// ``` pub fn output(prog: &str, args: &[~str]) -> IoResult<ProcessOutput> { - Process::new(prog, args).map(|mut p| p.wait_with_output()) + Process::new(prog, args).and_then(|p| p.wait_with_output()) } /// Executes a child process and collects its exit status. This will block @@ -303,7 +305,7 @@ impl Process { /// println!("process exited with: {}", status); /// ``` pub fn status(prog: &str, args: &[~str]) -> IoResult<ProcessExit> { - Process::new(prog, args).map(|mut p| p.wait()) + Process::new(prog, args).and_then(|mut p| p.wait()) } /// Creates a new process with the specified configuration. @@ -378,17 +380,72 @@ impl Process { /// after it has been called at least once. /// /// The stdin handle to the child process will be closed before waiting. - pub fn wait(&mut self) -> ProcessExit { + /// + /// # Errors + /// + /// This function can fail if a timeout was previously specified via + /// `set_timeout` and the timeout expires before the child exits. + pub fn wait(&mut self) -> IoResult<ProcessExit> { drop(self.stdin.take()); self.handle.wait() } + /// Sets a timeout, in milliseconds, for future calls to wait(). + /// + /// The argument specified is a relative distance into the future, in + /// milliseconds, after which any call to wait() will return immediately + /// with a timeout error, and all future calls to wait() will not block. + /// + /// A value of `None` will clear any previous timeout, and a value of `Some` + /// will override any previously set timeout. + /// + /// # Example + /// + /// ```no_run + /// # #![allow(experimental)] + /// use std::io::process::{Process, ProcessExit}; + /// use std::io::IoResult; + /// + /// fn run_gracefully(prog: &str) -> IoResult<ProcessExit> { + /// let mut p = try!(Process::new("long-running-process", [])); + /// + /// // give the process 10 seconds to finish completely + /// p.set_timeout(Some(10_000)); + /// match p.wait() { + /// Ok(status) => return Ok(status), + /// Err(..) => {} + /// } + /// + /// // Attempt to exit gracefully, but don't wait for it too long + /// try!(p.signal_exit()); + /// p.set_timeout(Some(1_000)); + /// match p.wait() { + /// Ok(status) => return Ok(status), + /// Err(..) => {} + /// } + /// + /// // Well, we did our best, forcefully kill the process + /// try!(p.signal_kill()); + /// p.set_timeout(None); + /// p.wait() + /// } + /// ``` + #[experimental = "the type of the timeout is likely to change"] + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.handle.set_timeout(timeout_ms) + } + /// Simultaneously wait for the child to exit and collect all remaining /// output on the stdout/stderr handles, returning a `ProcessOutput` /// instance. /// /// The stdin handle to the child is closed before waiting. - pub fn wait_with_output(&mut self) -> ProcessOutput { + /// + /// # Errors + /// + /// This function can fail for any of the same reasons that `wait()` can + /// fail. + pub fn wait_with_output(mut self) -> IoResult<ProcessOutput> { drop(self.stdin.take()); fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> { let (tx, rx) = channel(); @@ -404,11 +461,13 @@ impl Process { let stdout = read(self.stdout.take()); let stderr = read(self.stderr.take()); - let status = self.wait(); + let status = try!(self.wait()); - ProcessOutput { status: status, - output: stdout.recv().ok().unwrap_or(Vec::new()), - error: stderr.recv().ok().unwrap_or(Vec::new()) } + Ok(ProcessOutput { + status: status, + output: stdout.recv().ok().unwrap_or(Vec::new()), + error: stderr.recv().ok().unwrap_or(Vec::new()), + }) } } @@ -421,7 +480,8 @@ impl Drop for Process { drop(self.stderr.take()); drop(mem::replace(&mut self.extra_io, Vec::new())); - self.wait(); + self.set_timeout(None); + let _ = self.wait().unwrap(); } } @@ -441,7 +501,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(not(target_os="android"))] @@ -465,7 +525,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - assert!(p.wait().matches_exit_status(1)); + assert!(p.wait().unwrap().matches_exit_status(1)); drop(p.wait().clone()); }) @@ -479,7 +539,7 @@ mod tests { let p = Process::configure(args); assert!(p.is_ok()); let mut p = p.unwrap(); - match p.wait() { + match p.wait().unwrap() { process::ExitSignal(1) => {}, result => fail!("not terminated by signal 1 (instead, {})", result), } @@ -495,7 +555,7 @@ mod tests { let mut p = p.unwrap(); assert!(p.stdout.is_some()); let ret = read_all(p.stdout.get_mut_ref() as &mut Reader); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); return ret; } @@ -536,7 +596,7 @@ mod tests { p.stdin.get_mut_ref().write("foobar".as_bytes()).unwrap(); drop(p.stdin.take()); let out = read_all(p.stdout.get_mut_ref() as &mut Reader); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); assert_eq!(out, "foobar\n".to_owned()); }) @@ -548,7 +608,7 @@ mod tests { .. ProcessConfig::new() }; let mut p = Process::configure(args).unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(windows)] @@ -572,7 +632,7 @@ mod tests { .. ProcessConfig::new() }; let mut p = Process::configure(args).unwrap(); - assert!(p.wait().success()); + assert!(p.wait().unwrap().success()); }) #[cfg(unix, not(target_os="android"))] @@ -635,21 +695,21 @@ mod tests { #[cfg(not(target_os="android"))] iotest!(fn test_finish_once() { let mut prog = Process::new("false", []).unwrap(); - assert!(prog.wait().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); }) #[cfg(not(target_os="android"))] iotest!(fn test_finish_twice() { let mut prog = Process::new("false", []).unwrap(); - assert!(prog.wait().matches_exit_status(1)); - assert!(prog.wait().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); + assert!(prog.wait().unwrap().matches_exit_status(1)); }) #[cfg(not(target_os="android"))] iotest!(fn test_wait_with_output_once() { - let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap(); - let ProcessOutput {status, output, error} = prog.wait_with_output(); + let prog = Process::new("echo", ["hello".to_owned()]).unwrap(); + let ProcessOutput {status, output, error} = prog.wait_with_output().unwrap(); let output_str = str::from_utf8(output.as_slice()).unwrap(); assert!(status.success()); @@ -660,30 +720,6 @@ mod tests { } }) - #[cfg(not(target_os="android"))] - iotest!(fn test_wait_with_output_twice() { - let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap(); - let ProcessOutput {status, output, error} = prog.wait_with_output(); - - let output_str = str::from_utf8(output.as_slice()).unwrap(); - - assert!(status.success()); - assert_eq!(output_str.trim().to_owned(), "hello".to_owned()); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, Vec::new()); - } - - let ProcessOutput {status, output, error} = prog.wait_with_output(); - - assert!(status.success()); - assert_eq!(output, Vec::new()); - // FIXME #7224 - if !running_on_valgrind() { - assert_eq!(error, Vec::new()); - } - }) - #[cfg(unix,not(target_os="android"))] pub fn run_pwd(dir: Option<&Path>) -> Process { Process::configure(ProcessConfig { @@ -714,9 +750,10 @@ mod tests { iotest!(fn test_keep_current_working_dir() { use os; - let mut prog = run_pwd(None); + let prog = run_pwd(None); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let parent_dir = os::getcwd(); let child_dir = Path::new(output.trim()); @@ -732,9 +769,10 @@ mod tests { // test changing to the parent of os::getcwd() because we know // the path exists (and os::getcwd() is not expected to be root) let parent_dir = os::getcwd().dir_path(); - let mut prog = run_pwd(Some(&parent_dir)); + let prog = run_pwd(Some(&parent_dir)); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let child_dir = Path::new(output.trim()); let parent_stat = parent_dir.stat().unwrap(); @@ -777,8 +815,9 @@ mod tests { use os; if running_on_valgrind() { return; } - let mut prog = run_env(None); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let prog = run_env(None); + let output = str::from_utf8(prog.wait_with_output().unwrap() + .output.as_slice()).unwrap().to_owned(); let r = os::env(); for &(ref k, ref v) in r.iter() { @@ -791,8 +830,10 @@ mod tests { use os; if running_on_valgrind() { return; } - let mut prog = run_env(None); - let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned(); + let prog = run_env(None); + let output = str::from_utf8(prog.wait_with_output() + .unwrap().output.as_slice()) + .unwrap().to_owned(); let r = os::env(); for &(ref k, ref v) in r.iter() { @@ -807,8 +848,8 @@ mod tests { iotest!(fn test_add_to_env() { let new_env = box [("RUN_TEST_NEW_ENV".to_owned(), "123".to_owned())]; - let mut prog = run_env(Some(new_env)); - let result = prog.wait_with_output(); + let prog = run_env(Some(new_env)); + let result = prog.wait_with_output().unwrap(); let output = str::from_utf8_lossy(result.output.as_slice()).into_owned(); assert!(output.contains("RUN_TEST_NEW_ENV=123"), @@ -830,14 +871,14 @@ mod tests { iotest!(fn test_kill() { let mut p = sleeper(); Process::kill(p.id(), PleaseExitSignal).unwrap(); - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); }) iotest!(fn test_exists() { let mut p = sleeper(); assert!(Process::kill(p.id(), 0).is_ok()); p.signal_kill().unwrap(); - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); }) iotest!(fn test_zero() { @@ -845,11 +886,42 @@ mod tests { p.signal_kill().unwrap(); for _ in range(0, 20) { if p.signal(0).is_err() { - assert!(!p.wait().success()); + assert!(!p.wait().unwrap().success()); return } timer::sleep(100); } fail!("never saw the child go away"); }) + + iotest!(fn wait_timeout() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + p.set_timeout(None); + assert!(p.wait().is_ok()); + }) + + iotest!(fn wait_timeout2() { + let (tx, rx) = channel(); + let tx2 = tx.clone(); + spawn(proc() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + tx.send(()); + }); + spawn(proc() { + let mut p = sleeper(); + p.set_timeout(Some(10)); + assert_eq!(p.wait().err().unwrap().kind, TimedOut); + p.signal_kill().unwrap(); + tx2.send(()); + }); + rx.recv(); + rx.recv(); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index d23d327d558..90f97e59caa 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -275,7 +275,8 @@ pub trait RtioFileStream { pub trait RtioProcess { fn id(&self) -> libc::pid_t; fn kill(&mut self, signal: int) -> IoResult<()>; - fn wait(&mut self) -> ProcessExit; + fn wait(&mut self) -> IoResult<ProcessExit>; + fn set_timeout(&mut self, timeout: Option<u64>); } pub trait RtioPipe { |
