diff options
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/io/file.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/io/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/io/net/tcp.rs | 8 | ||||
| -rw-r--r-- | src/libstd/rt/io/pipe.rs | 77 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 22 | ||||
| -rw-r--r-- | src/libstd/rt/uv/async.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/uv/file.rs | 25 | ||||
| -rw-r--r-- | src/libstd/rt/uv/idle.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/uv/mod.rs | 65 | ||||
| -rw-r--r-- | src/libstd/rt/uv/net.rs | 17 | ||||
| -rw-r--r-- | src/libstd/rt/uv/pipe.rs | 66 | ||||
| -rw-r--r-- | src/libstd/rt/uv/process.rs | 264 | ||||
| -rw-r--r-- | src/libstd/rt/uv/timer.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 294 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvll.rs | 162 |
15 files changed, 169 insertions, 845 deletions
diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs index 534e308a1a6..f4e9c4d7c11 100644 --- a/src/libstd/rt/io/file.rs +++ b/src/libstd/rt/io/file.rs @@ -71,6 +71,9 @@ pub struct FileStream { last_nread: int, } +impl FileStream { +} + impl Reader for FileStream { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { match self.fd.read(buf) { diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 038fca9a1ad..116d240308a 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -268,9 +268,6 @@ pub use self::extensions::WriterByteConversions; /// Synchronous, non-blocking file I/O. pub mod file; -/// Synchronous, in-memory I/O. -pub mod pipe; - /// Synchronous, non-blocking network I/O. pub mod net { pub mod tcp; diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index dc7135f4a61..9be5540de48 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -16,7 +16,7 @@ use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, RtioSocket, RtioTcpListener, RtioTcpListenerObject, RtioTcpStream, - RtioTcpStreamObject, RtioStream}; + RtioTcpStreamObject}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); @@ -69,7 +69,7 @@ impl TcpStream { impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { - match (***self).read(buf) { + match (**self).read(buf) { Ok(read) => Some(read), Err(ioerr) => { // EOF is indicated by returning None @@ -86,7 +86,7 @@ impl Reader for TcpStream { impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { - match (***self).write(buf) { + match (**self).write(buf) { Ok(_) => (), Err(ioerr) => io_error::cond.raise(ioerr), } @@ -166,7 +166,7 @@ mod test { do run_in_newsched_task { let mut called = false; do io_error::cond.trap(|e| { - assert_eq!(e.kind, ConnectionRefused); + assert!(e.kind == ConnectionRefused); called = true; }).inside { let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs deleted file mode 100644 index 02b3d0fe57d..00000000000 --- a/src/libstd/rt/io/pipe.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Synchronous, in-memory pipes. -//! -//! Currently these aren't particularly useful, there only exists bindings -//! enough so that pipes can be created to child processes. - -use prelude::*; -use super::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::local::Local; -use rt::rtio::{RtioPipeObject, RtioStream, IoFactoryObject, IoFactory}; -use rt::uv::pipe; - -pub struct PipeStream(~RtioPipeObject); - -impl PipeStream { - /// Creates a new pipe initialized, but not bound to any particular - /// source/destination - pub fn new() -> Option<PipeStream> { - let pipe = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).pipe_init(false) - }; - match pipe { - Ok(p) => Some(PipeStream(p)), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None - } - } - } - - /// Extracts the underlying libuv pipe to be bound to another source. - pub fn uv_pipe(&self) -> pipe::Pipe { - // Did someone say multiple layers of indirection? - (**self).uv_pipe() - } -} - -impl Reader for PipeStream { - fn read(&mut self, buf: &mut [u8]) -> Option<uint> { - match (***self).read(buf) { - Ok(read) => Some(read), - Err(ioerr) => { - // EOF is indicated by returning None - if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); - } - return None; - } - } - } - - fn eof(&mut self) -> bool { fail!() } -} - -impl Writer for PipeStream { - fn write(&mut self, buf: &[u8]) { - match (***self).write(buf) { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); - } - } - } - - fn flush(&mut self) { fail!() } -} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 1a7ef6ea309..1788b7a04e3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -8,14 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc; use option::*; use result::*; use libc::c_int; use rt::io::IoError; use super::io::net::ip::{IpAddr, SocketAddr}; -use rt::uv; use rt::uv::uvio; use path::Path; use super::io::support::PathLike; @@ -32,9 +30,6 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; -pub type RtioPipeObject = uvio::UvPipeStream; -pub type RtioProcessObject = uvio::UvProcess; -pub type RtioProcessConfig<'self> = uv::process::Config<'self>; pub trait EventLoop { fn run(&mut self); @@ -77,13 +72,6 @@ pub trait IoFactory { fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError>; - fn spawn(&mut self, config: &RtioProcessConfig) -> Result<~RtioProcessObject, IoError>; -} - -pub trait RtioStream { - fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; - fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } pub trait RtioTcpListener : RtioSocket { @@ -92,7 +80,9 @@ pub trait RtioTcpListener : RtioSocket { fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } -pub trait RtioTcpStream : RtioSocket + RtioStream { +pub trait RtioTcpStream : RtioSocket { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; fn peer_name(&mut self) -> Result<SocketAddr, IoError>; fn control_congestion(&mut self) -> Result<(), IoError>; fn nodelay(&mut self) -> Result<(), IoError>; @@ -134,9 +124,3 @@ pub trait RtioFileStream { fn tell(&self) -> Result<u64, IoError>; fn flush(&mut self) -> Result<(), IoError>; } - -pub trait RtioProcess { - fn id(&self) -> libc::pid_t; - fn kill(&mut self, signal: int) -> Result<(), IoError>; - fn wait(&mut self) -> int; -} diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index ff7bb9dd03a..d0ca38317cb 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -34,7 +34,7 @@ impl AsyncWatcher { extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(watcher, status); let data = watcher.get_watcher_data(); let cb = data.async_cb.get_ref(); (*cb)(watcher, status); diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index 5c77181d7eb..405dfe0a7f0 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -11,8 +11,8 @@ use prelude::*; use ptr::null; use libc::c_void; -use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, UvError}; -use rt::uv::status_to_maybe_uv_error; +use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, + status_to_maybe_uv_error_with_loop, UvError}; use rt::uv::uvll; use rt::uv::uvll::*; use super::super::io::support::PathLike; @@ -62,7 +62,7 @@ impl FsRequest { pub fn open_sync<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int) -> Result<int, UvError> { let result = FsRequest::open_common(loop_, path, flags, mode, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn unlink_common<P: PathLike>(loop_: &Loop, path: &P, cb: Option<FsCallback>) -> int { @@ -83,11 +83,11 @@ impl FsRequest { } pub fn unlink<P: PathLike>(loop_: &Loop, path: &P, cb: FsCallback) { let result = FsRequest::unlink_common(loop_, path, Some(cb)); - sync_cleanup(result); + sync_cleanup(loop_, result); } pub fn unlink_sync<P: PathLike>(loop_: &Loop, path: &P) -> Result<int, UvError> { let result = FsRequest::unlink_common(loop_, path, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } pub fn install_req_data(&self, cb: Option<FsCallback>) { @@ -139,8 +139,9 @@ impl NativeHandle<*uvll::uv_fs_t> for FsRequest { match self { &FsRequest(ptr) => ptr } } } - fn sync_cleanup(result: int) -> Result<int, UvError> { - match status_to_maybe_uv_error(result as i32) { + fn sync_cleanup(loop_: &Loop, result: int) + -> Result<int, UvError> { + match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) { Some(err) => Err(err), None => Ok(result) } @@ -183,7 +184,7 @@ impl FileDescriptor { pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result<int, UvError> { let result = self.write_common(loop_, buf, offset, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn read_common(&mut self, loop_: &Loop, buf: Buf, @@ -211,7 +212,7 @@ impl FileDescriptor { pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64) -> Result<int, UvError> { let result = self.read_common(loop_, buf, offset, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } fn close_common(self, loop_: &Loop, cb: Option<FsCallback>) -> int { @@ -233,11 +234,12 @@ impl FileDescriptor { } pub fn close_sync(self, loop_: &Loop) -> Result<int, UvError> { let result = self.close_common(loop_, None); - sync_cleanup(result) + sync_cleanup(loop_, result) } } extern fn compl_cb(req: *uv_fs_t) { let mut req: FsRequest = NativeHandle::from_native_handle(req); + let loop_ = req.get_loop(); // pull the user cb out of the req data let cb = { let data = req.get_req_data(); @@ -248,7 +250,8 @@ extern fn compl_cb(req: *uv_fs_t) { // in uv_fs_open calls, the result will be the fd in the // case of success, otherwise it's -1 indicating an error let result = req.get_result(); - let status = status_to_maybe_uv_error(result); + let status = status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result); // we have a req and status, call the user cb.. // only giving the user a ref to the FsRequest, as we // have to clean it up, afterwards (and they aren't really diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index 8cbcd7b77c0..a21146620ca 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -43,7 +43,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(idle_watcher, status); (*cb)(idle_watcher, status); } } @@ -57,7 +57,7 @@ impl IdleWatcher { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); let data = idle_watcher.get_watcher_data(); let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(idle_watcher, status); (*cb)(idle_watcher, status); } } diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 700b80c7398..75b9a5ac553 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -58,8 +58,6 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; -pub use self::process::Process; -pub use self::pipe::Pipe; /// The implementation of `rtio` for libuv pub mod uvio; @@ -72,8 +70,6 @@ pub mod net; pub mod idle; pub mod timer; pub mod async; -pub mod process; -pub mod pipe; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -130,8 +126,6 @@ pub type NullCallback = ~fn(); pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>); pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>); pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>); -// first int is exit_status, second is term_signal -pub type ExitCallback = ~fn(Process, int, int, Option<UvError>); pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>); pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>); @@ -149,8 +143,7 @@ struct WatcherData { timer_cb: Option<TimerCallback>, async_cb: Option<AsyncCallback>, udp_recv_cb: Option<UdpReceiveCallback>, - udp_send_cb: Option<UdpSendCallback>, - exit_cb: Option<ExitCallback>, + udp_send_cb: Option<UdpSendCallback> } pub trait WatcherInterop { @@ -182,8 +175,7 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { timer_cb: None, async_cb: None, udp_recv_cb: None, - udp_send_cb: None, - exit_cb: None, + udp_send_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -210,12 +202,12 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { // XXX: Need to define the error constants like EOF so they can be // compared to the UvError type -pub struct UvError(c_int); +pub struct UvError(uvll::uv_err_t); impl UvError { pub fn name(&self) -> ~str { unsafe { - let inner = match self { &UvError(a) => a }; + let inner = match self { &UvError(ref a) => a }; let name_str = uvll::err_name(inner); assert!(name_str.is_not_null()); from_c_str(name_str) @@ -224,7 +216,7 @@ impl UvError { pub fn desc(&self) -> ~str { unsafe { - let inner = match self { &UvError(a) => a }; + let inner = match self { &UvError(ref a) => a }; let desc_str = uvll::strerror(inner); assert!(desc_str.is_not_null()); from_c_str(desc_str) @@ -232,7 +224,7 @@ impl UvError { } pub fn is_eof(&self) -> bool { - **self == uvll::EOF + self.code == uvll::EOF } } @@ -244,10 +236,18 @@ impl ToStr for UvError { #[test] fn error_smoke_test() { - let err: UvError = UvError(uvll::EOF); + let err = uvll::uv_err_t { code: 1, sys_errno_: 1 }; + let err: UvError = UvError(err); assert_eq!(err.to_str(), ~"EOF: end of file"); } +pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError { + unsafe { + let loop_ = watcher.event_loop(); + UvError(uvll::last_error(loop_.native_handle())) + } +} + pub fn uv_error_to_io_error(uverr: UvError) -> IoError { unsafe { // Importing error constants @@ -255,10 +255,10 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { use rt::io::*; // uv error descriptions are static - let c_desc = uvll::strerror(*uverr); + let c_desc = uvll::strerror(&*uverr); let desc = str::raw::c_str_to_static_slice(c_desc); - let kind = match *uverr { + let kind = match uverr.code { UNKNOWN => OtherIoError, OK => OtherIoError, EOF => EndOfFile, @@ -266,8 +266,8 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { ECONNREFUSED => ConnectionRefused, ECONNRESET => ConnectionReset, EPIPE => BrokenPipe, - err => { - rtdebug!("uverr.code %d", err as int); + _ => { + rtdebug!("uverr.code %u", uverr.code as uint); // XXX: Need to map remaining uv error types OtherIoError } @@ -281,12 +281,31 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError { } } -/// Convert a callback status to a UvError -pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> { - if status >= 0 { +/// Given a uv handle, convert a callback status to a UvError +pub fn status_to_maybe_uv_error_with_loop( + loop_: *uvll::uv_loop_t, + status: c_int) -> Option<UvError> { + if status != -1 { + None + } else { + unsafe { + rtdebug!("loop: %x", loop_ as uint); + let err = uvll::last_error(loop_); + Some(UvError(err)) + } + } +} +/// Given a uv handle, convert a callback status to a UvError +pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(handle: U, + status: c_int) -> Option<UvError> { + if status != -1 { None } else { - Some(UvError(status)) + unsafe { + rtdebug!("handle: %x", handle.native_handle() as uint); + let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle()); + status_to_maybe_uv_error_with_loop(loop_, status) + } } } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index 1581b017087..e8d0296e543 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -16,6 +16,7 @@ use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, status_to_maybe_uv_error}; use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use rt::uv::last_uv_error; use vec; use str; use from_str::{FromStr}; @@ -136,7 +137,7 @@ impl StreamWatcher { rtdebug!("buf len: %d", buf.len as int); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); let cb = stream_watcher.get_watcher_data().read_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); + let status = status_to_maybe_uv_error(stream_watcher, nread as c_int); (*cb)(stream_watcher, nread as int, buf, status); } } @@ -166,7 +167,7 @@ impl StreamWatcher { let mut stream_watcher = write_request.stream(); write_request.delete(); let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); cb(stream_watcher, status); } } @@ -231,7 +232,7 @@ impl TcpWatcher { }; match result { 0 => Ok(()), - _ => Err(UvError(result)), + _ => Err(last_uv_error(self)), } } } @@ -259,7 +260,7 @@ impl TcpWatcher { let mut stream_watcher = connect_request.stream(); connect_request.delete(); let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); cb(stream_watcher, status); } } @@ -282,7 +283,7 @@ impl TcpWatcher { rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(stream_watcher, status); (*cb)(stream_watcher, status); } } @@ -326,7 +327,7 @@ impl UdpWatcher { }; match result { 0 => Ok(()), - _ => Err(UvError(result)), + _ => Err(last_uv_error(self)), } } } @@ -359,7 +360,7 @@ impl UdpWatcher { rtdebug!("buf len: %d", buf.len as int); let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); - let status = status_to_maybe_uv_error(nread as c_int); + let status = status_to_maybe_uv_error(udp_watcher, nread as c_int); let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr)); (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); } @@ -394,7 +395,7 @@ impl UdpWatcher { let mut udp_watcher = send_request.handle(); send_request.delete(); let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(udp_watcher, status); cb(udp_watcher, status); } } diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs deleted file mode 100644 index 1147c731a60..00000000000 --- a/src/libstd/rt/uv/pipe.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use prelude::*; -use libc; - -use rt::uv; -use rt::uv::net; -use rt::uv::uvll; - -pub struct Pipe(*uvll::uv_pipe_t); - -impl uv::Watcher for Pipe {} - -impl Pipe { - pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe { - unsafe { - let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); - assert!(handle.is_not_null()); - let ipc = ipc as libc::c_int; - assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0); - let mut ret: Pipe = - uv::NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - ret - } - } - - pub fn as_stream(&self) -> net::StreamWatcher { - net::StreamWatcher(**self as *uvll::uv_stream_t) - } - - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_pipe_t) { - let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } -} - -impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { - fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe { - Pipe(handle) - } - fn native_handle(&self) -> *uvll::uv_pipe_t { - match self { &Pipe(ptr) => ptr } - } -} diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs deleted file mode 100644 index a02cf67ec26..00000000000 --- a/src/libstd/rt/uv/process.rs +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use prelude::*; -use libc; -use ptr; -use vec; -use cell::Cell; - -use rt::uv; -use rt::uv::net; -use rt::uv::pipe; -use rt::uv::uvll; - -/// A process wraps the handle of the underlying uv_process_t. -pub struct Process(*uvll::uv_process_t); - -/// This configuration describes how a new process should be spawned. This is -/// translated to libuv's own configuration -pub struct Config<'self> { - /// Path to the program to run - program: &'self str, - - /// Arguments to pass to the program (doesn't include the program itself) - args: &'self [~str], - - /// Optional environment to specify for the program. If this is None, then - /// it will inherit the current process's environment. - env: Option<&'self [(~str, ~str)]>, - - /// Optional working directory for the new process. If this is None, then - /// the current directory of the running process is inherited. - cwd: Option<&'self str>, - - /// Any number of streams/file descriptors/pipes may be attached to this - /// process. This list enumerates the file descriptors and such for the - /// process to be spawned, and the file descriptors inherited will start at - /// 0 and go to the length of this array. - /// - /// Standard file descriptors are: - /// - /// 0 - stdin - /// 1 - stdout - /// 2 - stderr - io: &'self [StdioContainer] -} - -/// Describes what to do with a standard io stream for a child process. -pub enum StdioContainer { - /// This stream will be ignored. This is the equivalent of attaching the - /// stream to `/dev/null` - Ignored, - - /// The specified file descriptor is inherited for the stream which it is - /// specified for. - InheritFd(libc::c_int), - - /// The specified libuv stream is inherited for the corresponding file - /// descriptor it is assigned to. - InheritStream(net::StreamWatcher), - - /// Creates a pipe for the specified file descriptor which will be directed - /// into the previously-initialized pipe passed in. - /// - /// The first boolean argument is whether the pipe is readable, and the - /// second is whether it is writable. These properties are from the view of - /// the *child* process, not the parent process. - CreatePipe(pipe::Pipe, bool /* readable */, bool /* writable */), -} - -impl uv::Watcher for Process {} - -impl Process { - /// Creates a new process, ready to spawn inside an event loop - pub fn new() -> Process { - let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) }; - assert!(handle.is_not_null()); - let mut ret: Process = uv::NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - return ret; - } - - /// Spawn a new process inside the specified event loop. - /// - /// The `config` variable will be passed down to libuv, and the `exit_cb` - /// will be run only once, when the process exits. - /// - /// Returns either the corresponding process object or an error which - /// occurred. - pub fn spawn(&mut self, loop_: &uv::Loop, config: &Config, - exit_cb: uv::ExitCallback) -> Result<(), uv::UvError> { - let cwd = config.cwd.map_move(|s| s.to_c_str()); - - extern fn on_exit(p: *uvll::uv_process_t, - exit_status: libc::c_int, - term_signal: libc::c_int) { - let mut p: Process = uv::NativeHandle::from_native_handle(p); - let err = match exit_status { - 0 => None, - _ => uv::status_to_maybe_uv_error(-1) - }; - p.get_watcher_data().exit_cb.take_unwrap()(p, - exit_status as int, - term_signal as int, - err); - } - - let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>( - config.io.len()); - unsafe { - vec::raw::set_len(&mut stdio, config.io.len()); - for (slot, &other) in stdio.iter().zip(config.io.iter()) { - set_stdio(slot as *uvll::uv_stdio_container_t, other); - } - } - - let exit_cb = Cell::new(exit_cb); - do with_argv(config.program, config.args) |argv| { - do with_env(config.env) |envp| { - let options = uvll::uv_process_options_t { - exit_cb: on_exit, - file: unsafe { *argv }, - args: argv, - env: envp, - cwd: match cwd { - Some(ref cwd) => cwd.with_ref(|p| p), - None => ptr::null(), - }, - flags: 0, - stdio_count: stdio.len() as libc::c_int, - stdio: stdio.as_imm_buf(|p, _| p), - uid: 0, - gid: 0, - }; - - match unsafe { - uvll::spawn(loop_.native_handle(), **self, options) - } { - 0 => { - (*self).get_watcher_data().exit_cb = Some(exit_cb.take()); - Ok(()) - } - err => Err(uv::UvError(err)) - } - } - } - } - - /// Sends a signal to this process. - /// - /// This is a wrapper around `uv_process_kill` - pub fn kill(&self, signum: int) -> Result<(), uv::UvError> { - match unsafe { - uvll::process_kill(self.native_handle(), signum as libc::c_int) - } { - 0 => Ok(()), - err => Err(uv::UvError(err)) - } - } - - /// Returns the process id of a spawned process - pub fn pid(&self) -> libc::pid_t { - unsafe { uvll::process_pid(**self) as libc::pid_t } - } - - /// Closes this handle, invoking the specified callback once closed - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_process_t) { - let mut process: Process = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } -} - -unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: StdioContainer) { - match io { - Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); } - InheritFd(fd) => { - uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); - uvll::set_stdio_container_fd(dst, fd); - } - InheritStream(stream) => { - uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_STREAM); - uvll::set_stdio_container_stream(dst, stream.native_handle()); - } - CreatePipe(pipe, readable, writable) => { - let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; - if readable { - flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; - } - if writable { - flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; - } - uvll::set_stdio_container_flags(dst, flags); - uvll::set_stdio_container_stream(dst, - pipe.as_stream().native_handle()); - } - } -} - -/// Converts the program and arguments to the argv array expected by libuv -fn with_argv<T>(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T { - // First, allocation space to put all the C-strings (we need to have - // ownership of them somewhere - let mut c_strs = vec::with_capacity(args.len() + 1); - c_strs.push(prog.to_c_str()); - for arg in args.iter() { - c_strs.push(arg.to_c_str()); - } - - // Next, create the char** array - let mut c_args = vec::with_capacity(c_strs.len() + 1); - for s in c_strs.iter() { - c_args.push(s.with_ref(|p| p)); - } - c_args.push(ptr::null()); - c_args.as_imm_buf(|buf, _| f(buf)) -} - -/// Converts the environment to the env array expected by libuv -fn with_env<T>(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T { - let env = match env { - Some(s) => s, - None => { return f(ptr::null()); } - }; - // As with argv, create some temporary storage and then the actual array - let mut envp = vec::with_capacity(env.len()); - for &(ref key, ref value) in env.iter() { - envp.push(fmt!("%s=%s", *key, *value).to_c_str()); - } - let mut c_envp = vec::with_capacity(envp.len() + 1); - for s in envp.iter() { - c_envp.push(s.with_ref(|p| p)); - } - c_envp.push(ptr::null()); - c_envp.as_imm_buf(|buf, _| f(buf)) -} - -impl uv::NativeHandle<*uvll::uv_process_t> for Process { - fn from_native_handle(handle: *uvll::uv_process_t) -> Process { - Process(handle) - } - fn native_handle(&self) -> *uvll::uv_process_t { - match self { &Process(ptr) => ptr } - } -} diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs index 7b09cf2eb0e..eaa5e77a6da 100644 --- a/src/libstd/rt/uv/timer.rs +++ b/src/libstd/rt/uv/timer.rs @@ -43,7 +43,7 @@ impl TimerWatcher { let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle); let data = watcher.get_watcher_data(); let cb = data.timer_cb.get_ref(); - let status = status_to_maybe_uv_error(status); + let status = status_to_maybe_uv_error(watcher, status); (*cb)(watcher, status); } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index c771f93cef5..e620ab274b1 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -13,7 +13,7 @@ use cast::transmute; use cast; use cell::Cell; use clone::Clone; -use libc::{c_int, c_uint, c_void, pid_t}; +use libc::{c_int, c_uint, c_void}; use ops::Drop; use option::*; use ptr; @@ -22,7 +22,6 @@ use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd}; -use rt::kill::BlockedTask; use rt::local::Local; use rt::rtio::*; use rt::sched::{Scheduler, SchedHandle}; @@ -149,7 +148,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind, }; if r != 0 { - let status = status_to_maybe_uv_error(r); + let status = status_to_maybe_uv_error(handle, r); return Err(uv_error_to_io_error(status.unwrap())); } @@ -592,63 +591,6 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } - - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> { - let home = get_handle_to_current_scheduler!(); - Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) - } - - fn spawn(&mut self, - config: &process::Config) -> Result<~RtioProcessObject, IoError> { - // Sadly, we must create the UvProcess before we actually call uv_spawn - // so that the exit_cb can close over it and notify it when the process - // has exited. - let mut ret = ~UvProcess { - process: Process::new(), - home: None, - exit_status: None, - term_signal: None, - exit_error: None, - descheduled: None, - }; - let ret_ptr = unsafe { - *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret) - }; - - // The purpose of this exit callback is to record the data about the - // exit and then wake up the task which may be waiting for the process - // to exit. This is all performed in the current io-loop, and the - // implementation of UvProcess ensures that reading these fields always - // occurs on the current io-loop. - let exit_cb: ExitCallback = |_, exit_status, term_signal, error| { - unsafe { - assert!((*ret_ptr).exit_status.is_none()); - (*ret_ptr).exit_status = Some(exit_status); - (*ret_ptr).term_signal = Some(term_signal); - (*ret_ptr).exit_error = error; - match (*ret_ptr).descheduled.take() { - Some(task) => { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task); - } - None => {} - } - } - }; - - match ret.process.spawn(self.uv_loop(), config, exit_cb) { - Ok(()) => { - // Only now do we actually get a handle to this scheduler. - ret.home = Some(get_handle_to_current_scheduler!()); - Ok(ret) - } - Err(uverr) => { - // We still need to close the process handle we created, but - // that's taken care for us in the destructor of UvProcess - Err(uv_error_to_io_error(uverr)) - } - } - } } pub struct UvTcpListener { @@ -737,7 +679,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher(), r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -750,7 +692,7 @@ impl RtioTcpListener for UvTcpListener { uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher(), r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -758,15 +700,40 @@ impl RtioTcpListener for UvTcpListener { } } -trait UvStream: HomingIO { - fn as_stream(&mut self) -> StreamWatcher; +pub struct UvTcpStream { + watcher: TcpWatcher, + home: SchedHandle, +} + +impl HomingIO for UvTcpStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } -// FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but -// that has conflicts with other traits that also have methods -// called `read` and `write` -macro_rules! rtiostream(($t:ident) => { -impl RtioStream for $t { +impl Drop for UvTcpStream { + fn drop(&self) { + // XXX need mutable finalizer + let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) }; + do this.home_for_io_with_sched |self_, scheduler| { + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.as_stream().close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +impl RtioSocket for UvTcpStream { + fn socket_name(&mut self) -> Result<SocketAddr, IoError> { + do self.home_for_io |self_| { + socket_name(Tcp, self_.watcher) + } + } +} + +impl RtioTcpStream for UvTcpStream { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { do self.home_for_io_with_sched |self_, scheduler| { let result_cell = Cell::new_empty(); @@ -780,7 +747,7 @@ impl RtioStream for $t { let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.as_stream(); + let mut watcher = self_.watcher.as_stream(); do watcher.read_start(alloc) |mut watcher, nread, _buf, status| { // Stop reading so that no read callbacks are @@ -816,7 +783,7 @@ impl RtioStream for $t { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let mut watcher = self_.as_stream(); + let mut watcher = self_.watcher.as_stream(); do watcher.write(buf) |_watcher, status| { let result = if status.is_none() { Ok(()) @@ -835,85 +802,7 @@ impl RtioStream for $t { result_cell.take() } } -} -}) - -rtiostream!(UvPipeStream) -rtiostream!(UvTcpStream) - -pub struct UvPipeStream { - pipe: Pipe, - home: SchedHandle, -} - -impl UvStream for UvPipeStream { - fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() } -} - -impl HomingIO for UvPipeStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvPipeStream { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - do this.home_for_io |self_| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.pipe.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl UvPipeStream { - pub fn uv_pipe(&self) -> Pipe { self.pipe } -} -pub struct UvTcpStream { - watcher: TcpWatcher, - home: SchedHandle, -} - -impl HomingIO for UvTcpStream { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvTcpStream { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - do this.home_for_io |self_| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self_.watcher.as_stream().close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } - } -} - -impl UvStream for UvTcpStream { - fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() } -} - -impl RtioSocket for UvTcpStream { - fn socket_name(&mut self) -> Result<SocketAddr, IoError> { - do self.home_for_io |self_| { - socket_name(Tcp, self_.watcher) - } - } -} - -impl RtioTcpStream for UvTcpStream { fn peer_name(&mut self) -> Result<SocketAddr, IoError> { do self.home_for_io |self_| { socket_name(TcpPeer, self_.watcher) @@ -924,7 +813,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -935,7 +824,7 @@ impl RtioTcpStream for UvTcpStream { do self.home_for_io |self_| { let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -949,7 +838,7 @@ impl RtioTcpStream for UvTcpStream { delay_in_seconds as c_uint) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -962,7 +851,7 @@ impl RtioTcpStream for UvTcpStream { uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1074,7 +963,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1090,7 +979,7 @@ impl RtioUdpSocket for UvUdpSocket { } }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1104,7 +993,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1118,7 +1007,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1132,7 +1021,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1146,7 +1035,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1160,7 +1049,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1174,7 +1063,7 @@ impl RtioUdpSocket for UvUdpSocket { uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int) }; - match status_to_maybe_uv_error(r) { + match status_to_maybe_uv_error(self_.watcher, r) { Some(err) => Err(uv_error_to_io_error(err)), None => Ok(()) } @@ -1361,89 +1250,6 @@ impl RtioFileStream for UvFileStream { } } -pub struct UvProcess { - process: process::Process, - - // Sadly, this structure must be created before we return it, so in that - // brief interim the `home` is None. - home: Option<SchedHandle>, - - // All None until the process exits (exit_error may stay None) - priv exit_status: Option<int>, - priv term_signal: Option<int>, - priv exit_error: Option<UvError>, - - // Used to store which task to wake up from the exit_cb - priv descheduled: Option<BlockedTask>, -} - -impl HomingIO for UvProcess { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() } -} - -impl Drop for UvProcess { - fn drop(&self) { - // FIXME(#4330): should not need a transmute - let this = unsafe { cast::transmute_mut(self) }; - - let close = |self_: &mut UvProcess| { - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task = Cell::new(task); - do self_.process.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task.take()); - } - } - }; - - // If home is none, then this process never actually successfully - // spawned, so there's no need to switch event loops - if this.home.is_none() { - close(this) - } else { - this.home_for_io(close) - } - } -} - -impl RtioProcess for UvProcess { - fn id(&self) -> pid_t { - self.process.pid() - } - - fn kill(&mut self, signal: int) -> Result<(), IoError> { - do self.home_for_io |self_| { - match self_.process.kill(signal) { - Ok(()) => Ok(()), - Err(uverr) => Err(uv_error_to_io_error(uverr)) - } - } - } - - fn wait(&mut self) -> int { - // Make sure (on the home scheduler) that we have an exit status listed - do self.home_for_io |self_| { - match self_.exit_status { - Some(*) => {} - None => { - // If there's no exit code previously listed, then the - // process's exit callback has yet to be invoked. We just - // need to deschedule ourselves and wait to be reawoken. - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - assert!(self_.descheduled.is_none()); - self_.descheduled = Some(task); - } - assert!(self_.exit_status.is_some()); - } - } - } - - self.exit_status.unwrap() - } -} - #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 24e070ca239..1e189e90885 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -37,74 +37,28 @@ use libc::{malloc, free}; use libc; use prelude::*; use ptr; +use str; use vec; -pub use self::errors::*; - +pub static UNKNOWN: c_int = -1; pub static OK: c_int = 0; -pub static EOF: c_int = -4095; -pub static UNKNOWN: c_int = -4094; - -// uv-errno.h redefines error codes for windows, but not for unix... - -#[cfg(windows)] -pub mod errors { - use libc::c_int; +pub static EOF: c_int = 1; +pub static EADDRINFO: c_int = 2; +pub static EACCES: c_int = 3; +pub static ECONNREFUSED: c_int = 12; +pub static ECONNRESET: c_int = 13; +pub static EPIPE: c_int = 36; - pub static EACCES: c_int = -4093; - pub static ECONNREFUSED: c_int = -4079; - pub static ECONNRESET: c_int = -4078; - pub static EPIPE: c_int = -4048; +pub struct uv_err_t { + code: c_int, + sys_errno_: c_int } -#[cfg(not(windows))] -pub mod errors { - use libc; - use libc::c_int; - - pub static EACCES: c_int = -libc::EACCES; - pub static ECONNREFUSED: c_int = -libc::ECONNREFUSED; - pub static ECONNRESET: c_int = -libc::ECONNRESET; - pub static EPIPE: c_int = -libc::EPIPE; -} - -pub static PROCESS_SETUID: c_int = 1 << 0; -pub static PROCESS_SETGID: c_int = 1 << 1; -pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2; -pub static PROCESS_DETACHED: c_int = 1 << 3; -pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4; - -pub static STDIO_IGNORE: c_int = 0x00; -pub static STDIO_CREATE_PIPE: c_int = 0x01; -pub static STDIO_INHERIT_FD: c_int = 0x02; -pub static STDIO_INHERIT_STREAM: c_int = 0x04; -pub static STDIO_READABLE_PIPE: c_int = 0x10; -pub static STDIO_WRITABLE_PIPE: c_int = 0x20; pub struct uv_buf_t { base: *u8, len: libc::size_t, } -pub struct uv_process_options_t { - exit_cb: uv_exit_cb, - file: *libc::c_char, - args: **libc::c_char, - env: **libc::c_char, - cwd: *libc::c_char, - flags: libc::c_uint, - stdio_count: libc::c_int, - stdio: *uv_stdio_container_t, - uid: uv_uid_t, - gid: uv_gid_t, -} - -// These fields are private because they must be interfaced with through the -// functions below. -pub struct uv_stdio_container_t { - priv flags: libc::c_int, - priv stream: *uv_stream_t, -} - pub type uv_handle_t = c_void; pub type uv_loop_t = c_void; pub type uv_idle_t = c_void; @@ -118,8 +72,6 @@ pub type uv_timer_t = c_void; pub type uv_stream_t = c_void; pub type uv_fs_t = c_void; pub type uv_udp_send_t = c_void; -pub type uv_process_t = c_void; -pub type uv_pipe_t = c_void; #[cfg(stage0)] pub type uv_idle_cb = *u8; @@ -145,8 +97,6 @@ pub type uv_connection_cb = *u8; pub type uv_timer_cb = *u8; #[cfg(stage0)] pub type uv_write_cb = *u8; -#[cfg(stage0)] -pub type uv_exit_cb = *u8; #[cfg(not(stage0))] pub type uv_idle_cb = extern "C" fn(handle: *uv_idle_t, @@ -187,21 +137,12 @@ pub type uv_timer_cb = extern "C" fn(handle: *uv_timer_t, #[cfg(not(stage0))] pub type uv_write_cb = extern "C" fn(handle: *uv_write_t, status: c_int); -#[cfg(not(stage0))] -pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, - exit_status: c_int, - term_signal: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; pub type sockaddr_in6 = c_void; pub type sockaddr_storage = c_void; -#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t; -#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t; -#[cfg(windows)] pub type uv_uid_t = libc::c_uchar; -#[cfg(windows)] pub type uv_gid_t = libc::c_uchar; - #[deriving(Eq)] pub enum uv_handle_type { UV_UNKNOWN_HANDLE, @@ -546,12 +487,20 @@ pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int { return rust_uv_read_stop(stream as *c_void); } -pub unsafe fn strerror(err: c_int) -> *c_char { +pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t { #[fixed_stack_segment]; #[inline(never)]; + + return rust_uv_last_error(loop_handle); +} + +pub unsafe fn strerror(err: *uv_err_t) -> *c_char { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_strerror(err); } -pub unsafe fn err_name(err: c_int) -> *c_char { +pub unsafe fn err_name(err: *uv_err_t) -> *c_char { #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_err_name(err); } @@ -705,45 +654,6 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) { rust_uv_fs_req_cleanup(req); } -pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t, - options: uv_process_options_t) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_spawn(loop_ptr, result, options); -} - -pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_process_kill(p, signum); -} - -pub unsafe fn process_pid(p: *uv_process_t) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - return rust_uv_process_pid(p); -} - -pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t, - flags: libc::c_int) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_flags(c, flags); -} - -pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t, - fd: libc::c_int) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_fd(c, fd); -} - -pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t, - stream: *uv_stream_t) { - #[fixed_stack_segment]; #[inline(never)]; - rust_set_stdio_container_stream(c, stream); -} - -pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - rust_uv_pipe_init(loop_ptr, p, ipc) -} - // data access helpers pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -810,6 +720,22 @@ pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t { return rust_uv_get_len_from_buf(buf); } +pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str { + let err = last_error(uv_loop); + let err_ptr = ptr::to_unsafe_ptr(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + return fmt!("LIBUV ERROR: name: %s msg: %s", + err_name, err_msg); +} + +pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data { + let err = last_error(uv_loop); + let err_ptr = ptr::to_unsafe_ptr(&err); + let err_name = str::raw::from_c_str(err_name(err_ptr)); + let err_msg = str::raw::from_c_str(strerror(err_ptr)); + uv_err_data { err_name: err_name, err_msg: err_msg } +} pub struct uv_err_data { err_name: ~str, @@ -842,8 +768,9 @@ extern { cb: uv_async_cb) -> c_int; fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int; fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t); - fn rust_uv_strerror(err: c_int) -> *c_char; - fn rust_uv_err_name(err: c_int) -> *c_char; + fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t; + fn rust_uv_strerror(err: *uv_err_t) -> *c_char; + fn rust_uv_err_name(err: *uv_err_t) -> *c_char; fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in; fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6; fn rust_uv_free_ip4_addr(addr: *sockaddr_in); @@ -929,13 +856,4 @@ extern { fn rust_uv_set_data_for_req(req: *c_void, data: *c_void); fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8; fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t; - fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t, - options: uv_process_options_t) -> c_int; - fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int; - fn rust_uv_process_pid(p: *uv_process_t) -> c_int; - fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int); - fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int); - fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, - stream: *uv_stream_t); - fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; } |
