diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-08-15 14:18:13 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-08-27 20:46:43 -0700 |
| commit | b89e1c000e133fb5db3ea5afd0948db6dc088977 (patch) | |
| tree | 6bd105ae723233519d985e61bdef09f4b0e13603 /src/libstd/rt/uv/uvio.rs | |
| parent | ed204257a0c6abc8386879bb631471ec17d8a96a (diff) | |
| download | rust-b89e1c000e133fb5db3ea5afd0948db6dc088977.tar.gz rust-b89e1c000e133fb5db3ea5afd0948db6dc088977.zip | |
Implement process bindings to libuv
Closes #6436
Diffstat (limited to 'src/libstd/rt/uv/uvio.rs')
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 294 |
1 files changed, 244 insertions, 50 deletions
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index e620ab274b1..80f1aef37ac 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -13,7 +13,7 @@ use cast::transmute; use cast; use cell::Cell; use clone::Clone; -use libc::{c_int, c_uint, c_void}; +use libc::{c_int, c_uint, c_void, pid_t}; use ops::Drop; use option::*; use ptr; @@ -22,6 +22,7 @@ use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; +use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; @@ -148,7 +149,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind, }; if r != 0 { - let status = status_to_maybe_uv_error(handle, r); + let status = status_to_maybe_uv_error(r); return Err(uv_error_to_io_error(status.unwrap())); } @@ -591,6 +592,63 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } + + fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> { + let home = get_handle_to_current_scheduler!(); + Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) + } + + fn spawn(&mut self, + config: &process::Config) -> Result<~RtioProcessObject, IoError> { + // Sadly, we must create the UvProcess before we actually call uv_spawn + // so that the exit_cb can close over it and notify it when the process + // has exited. + let mut ret = ~UvProcess { + process: Process::new(), + home: None, + exit_status: None, + term_signal: None, + exit_error: None, + descheduled: None, + }; + let ret_ptr = unsafe { + *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) + }; + + // The purpose of this exit callback is to record the data about the + // exit and then wake up the task which may be waiting for the process + // to exit. This is all performed in the current io-loop, and the + // implementation of UvProcess ensures that reading these fields always + // occurs on the current io-loop. + let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { + unsafe { + assert!((*ret_ptr).exit_status.is_none()); + (*ret_ptr).exit_status = Some(exit_status); + (*ret_ptr).term_signal = Some(term_signal); + (*ret_ptr).exit_error = error; + match (*ret_ptr).descheduled.take() { + Some(task) => { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); + } + None => {} + } + } + }; + + match ret.process.spawn(self.uv_loop(), config, exit_cb) { + Ok(()) => { + // Only now do we actually get a handle to this scheduler. + ret.home = Some(get_handle_to_current_scheduler!()); + Ok(ret) + } + Err(uverr) => { + // We still need to close the process handle we created, but + // that's taken care for us in the destructor of UvProcess + Err(uv_error_to_io_error(uverr)) + } + } + } } pub struct UvTcpListener { @@ -679,7 +737,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -692,7 +750,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher(), r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -700,40 +758,15 @@ impl RtioTcpListener for UvTcpListener { } } -pub struct UvTcpStream { - watcher: TcpWatcher, - home: SchedHandle, -} - -impl HomingIO for UvTcpStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +trait UvStream: HomingIO { + fn as_stream(&mut self) -> StreamWatcher; } -impl Drop for UvTcpStream { - fn drop(&self) { - // XXX need mutable finalizer - let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; - do this.home_for_io_with_sched |self_, scheduler| { - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl RtioSocket for UvTcpStream { - fn socket_name(&mut self) -> Result<SocketAddr, IoError> { - do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) - } - } -} - -impl RtioTcpStream for UvTcpStream { +// FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but +// that has conflicts with other traits that also have methods +// called `read` and `write` +macro_rules! rtiostream(($t:ident) => { +impl RtioStream for $t { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); @@ -747,7 +780,7 @@ impl RtioTcpStream for UvTcpStream { let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.watcher.as_stream(); + let mut watcher = self_.as_stream(); do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { // Stop reading so that no read callbacks are @@ -783,7 +816,7 @@ impl RtioTcpStream for UvTcpStream { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.watcher.as_stream(); + let mut watcher = self_.as_stream(); do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) @@ -802,7 +835,85 @@ impl RtioTcpStream for UvTcpStream { result_cell.take() } } +} +}) + +rtiostream!(UvPipeStream) +rtiostream!(UvTcpStream) + +pub struct UvPipeStream { + pipe: Pipe, + home: SchedHandle, +} + +impl UvStream for UvPipeStream { + fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() } +} + +impl HomingIO for UvPipeStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvPipeStream { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + do this.home_for_io |self_| { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.pipe.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl UvPipeStream { + pub fn uv_pipe(&self) -> Pipe { self.pipe } +} + +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvTcpStream { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + do this.home_for_io |self_| { + let scheduler = Local::take::<Scheduler>(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler = Local::take::<Scheduler>(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl UvStream for UvTcpStream { + fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() } +} + +impl RtioSocket for UvTcpStream { + fn socket_name(&mut self) -> Result<SocketAddr, IoError> { + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } + } +} +impl RtioTcpStream for UvTcpStream { fn peer_name(&mut self) -> Result<SocketAddr, IoError> { do self.home_for_io |self_| { socket_name(TcpPeer, self_.watcher) @@ -813,7 +924,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -824,7 +935,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -838,7 +949,7 @@ impl RtioTcpStream for UvTcpStream { delay_in_seconds as c_uint) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -851,7 +962,7 @@ impl RtioTcpStream for UvTcpStream { uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -963,7 +1074,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -979,7 +1090,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -993,7 +1104,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1007,7 +1118,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1021,7 +1132,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1035,7 +1146,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1049,7 +1160,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1063,7 +1174,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(self_.watcher, r) { + match status_to_maybe_uv_error(r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1250,6 +1361,89 @@ impl RtioFileStream for UvFileStream { } } +pub struct UvProcess { + process: process::Process, + + // Sadly, this structure must be created before we return it, so in that + // brief interim the `home` is None. + home: Option<SchedHandle>, + + // All None until the process exits (exit_error may stay None) + priv exit_status: Option<int>, + priv term_signal: Option<int>, + priv exit_error: Option<UvError>, + + // Used to store which task to wake up from the exit_cb + priv descheduled: Option<BlockedTask>, +} + +impl HomingIO for UvProcess { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } +} + +impl Drop for UvProcess { + fn drop(&self) { + // FIXME(#4330): should not need a transmute + let this = unsafe { cast::transmute_mut(self) }; + + let close = |self_: &mut UvProcess| { + let scheduler = Local::take::<Scheduler>(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task = Cell::new(task); + do self_.process.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task.take()); + } + } + }; + + // If home is none, then this process never actually successfully + // spawned, so there's no need to switch event loops + if this.home.is_none() { + close(this) + } else { + this.home_for_io(close) + } + } +} + +impl RtioProcess for UvProcess { + fn id(&self) -> pid_t { + self.process.pid() + } + + fn kill(&mut self, signal: int) -> Result<(), IoError> { + do self.home_for_io |self_| { + match self_.process.kill(signal) { + Ok(()) => Ok(()), + Err(uverr) => Err(uv_error_to_io_error(uverr)) + } + } + } + + fn wait(&mut self) -> int { + // Make sure (on the home scheduler) that we have an exit status listed + do self.home_for_io |self_| { + match self_.exit_status { + Some(*) => {} + None => { + // If there's no exit code previously listed, then the + // process's exit callback has yet to be invoked. We just + // need to deschedule ourselves and wait to be reawoken. + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + assert!(self_.descheduled.is_none()); + self_.descheduled = Some(task); + } + assert!(self_.exit_status.is_some()); + } + } + } + + self.exit_status.unwrap() + } +} + #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { |
