diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-06-04 00:00:59 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-06-06 22:19:57 -0700 |
| commit | 550c347d7b9fbeb10dcf039f0e7e0c16de83dc4a (patch) | |
| tree | e8fd9224bc7abaa698f89bc6a8c7c716208af01b | |
| parent | 51348b068b88d71426d9de93762575cd2bb9a5f5 (diff) | |
| download | rust-550c347d7b9fbeb10dcf039f0e7e0c16de83dc4a.tar.gz rust-550c347d7b9fbeb10dcf039f0e7e0c16de83dc4a.zip | |
rustuv: Deal with the rtio changes
| -rw-r--r-- | src/librustuv/addrinfo.rs | 68 | ||||
| -rw-r--r-- | src/librustuv/async.rs | 30 | ||||
| -rw-r--r-- | src/librustuv/file.rs | 108 | ||||
| -rw-r--r-- | src/librustuv/homing.rs | 13 | ||||
| -rw-r--r-- | src/librustuv/lib.rs | 90 | ||||
| -rw-r--r-- | src/librustuv/net.rs | 151 | ||||
| -rw-r--r-- | src/librustuv/pipe.rs | 49 | ||||
| -rw-r--r-- | src/librustuv/process.rs | 27 | ||||
| -rw-r--r-- | src/librustuv/queue.rs | 2 | ||||
| -rw-r--r-- | src/librustuv/signal.rs | 38 | ||||
| -rw-r--r-- | src/librustuv/timeout.rs | 2 | ||||
| -rw-r--r-- | src/librustuv/timer.rs | 172 | ||||
| -rw-r--r-- | src/librustuv/tty.rs | 11 | ||||
| -rw-r--r-- | src/librustuv/uvio.rs | 104 |
14 files changed, 319 insertions, 546 deletions
diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index 1e18f2ea9ec..daca3005f12 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -8,12 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use ai = std::io::net::addrinfo; use libc::c_int; use libc; use std::mem; use std::ptr::null; use std::rt::task::BlockedTask; +use std::rt::rtio; use net; use super::{Loop, UvError, Request, wait_until_woken_after, wakeup}; @@ -33,7 +33,9 @@ pub struct GetAddrInfoRequest; impl GetAddrInfoRequest { pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>, - hints: Option<ai::Hint>) -> Result<Vec<ai::Info>, UvError> { + hints: Option<rtio::AddrinfoHint>) + -> Result<Vec<rtio::AddrinfoInfo>, UvError> + { assert!(node.is_some() || service.is_some()); let (_c_node, c_node_ptr) = match node { Some(n) => { @@ -54,20 +56,11 @@ impl GetAddrInfoRequest { }; let hint = hints.map(|hint| { - let mut flags = 0; - each_ai_flag(|cval, aival| { - if hint.flags & (aival as uint) != 0 { - flags |= cval as i32; - } - }); - let socktype = 0; - let protocol = 0; - libc::addrinfo { - ai_flags: flags, + ai_flags: 0, ai_family: hint.family as c_int, - ai_socktype: socktype, - ai_protocol: protocol, + ai_socktype: 0, + ai_protocol: 0, ai_addrlen: 0, ai_canonname: null(), ai_addr: null(), @@ -119,22 +112,8 @@ impl Drop for Addrinfo { } } -fn each_ai_flag(_f: |c_int, ai::Flag|) { - /* FIXME: do we really want to support these? - unsafe { - f(uvll::rust_AI_ADDRCONFIG(), ai::AddrConfig); - f(uvll::rust_AI_ALL(), ai::All); - f(uvll::rust_AI_CANONNAME(), ai::CanonName); - f(uvll::rust_AI_NUMERICHOST(), ai::NumericHost); - f(uvll::rust_AI_NUMERICSERV(), ai::NumericServ); - f(uvll::rust_AI_PASSIVE(), ai::Passive); - f(uvll::rust_AI_V4MAPPED(), ai::V4Mapped); - } - */ -} - // Traverse the addrinfo linked list, producing a vector of Rust socket addresses -pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<ai::Info> { +pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<rtio::AddrinfoInfo> { unsafe { let mut addr = addr.handle; @@ -143,35 +122,12 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<ai::Info> { let rustaddr = net::sockaddr_to_addr(mem::transmute((*addr).ai_addr), (*addr).ai_addrlen as uint); - let mut flags = 0; - each_ai_flag(|cval, aival| { - if (*addr).ai_flags & cval != 0 { - flags |= aival as uint; - } - }); - - /* FIXME: do we really want to support these - let protocol = match (*addr).ai_protocol { - p if p == uvll::rust_IPPROTO_UDP() => Some(ai::UDP), - p if p == uvll::rust_IPPROTO_TCP() => Some(ai::TCP), - _ => None, - }; - let socktype = match (*addr).ai_socktype { - p if p == uvll::rust_SOCK_STREAM() => Some(ai::Stream), - p if p == uvll::rust_SOCK_DGRAM() => Some(ai::Datagram), - p if p == uvll::rust_SOCK_RAW() => Some(ai::Raw), - _ => None, - }; - */ - let protocol = None; - let socktype = None; - - addrs.push(ai::Info { + addrs.push(rtio::AddrinfoInfo { address: rustaddr, family: (*addr).ai_family as uint, - socktype: socktype, - protocol: protocol, - flags: flags, + socktype: 0, + protocol: 0, + flags: 0, }); if (*addr).ai_next.is_not_null() { addr = (*addr).ai_next; diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index 7a16baaa9f2..5167ce5aff2 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -8,9 +8,10 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; use std::mem; +use std::rt::exclusive::Exclusive; use std::rt::rtio::{Callback, RemoteCallback}; -use std::unstable::sync::Exclusive; use uvll; use super::{Loop, UvHandle}; @@ -22,12 +23,12 @@ pub struct AsyncWatcher { // A flag to tell the callback to exit, set from the dtor. This is // almost never contested - only in rare races with the dtor. - exit_flag: Exclusive<bool> + exit_flag: Arc<Exclusive<bool>>, } struct Payload { callback: Box<Callback:Send>, - exit_flag: Exclusive<bool>, + exit_flag: Arc<Exclusive<bool>>, } impl AsyncWatcher { @@ -36,7 +37,7 @@ impl AsyncWatcher { assert_eq!(unsafe { uvll::uv_async_init(loop_.handle, handle, async_cb) }, 0); - let flag = Exclusive::new(false); + let flag = Arc::new(Exclusive::new(false)); let payload = box Payload { callback: cb, exit_flag: flag.clone() }; unsafe { let payload: *u8 = mem::transmute(payload); @@ -80,9 +81,7 @@ extern fn async_cb(handle: *uvll::uv_async_t) { // could be called in the other thread, missing the final // callback while still destroying the handle. - let should_exit = unsafe { - payload.exit_flag.with_imm(|&should_exit| should_exit) - }; + let should_exit = unsafe { *payload.exit_flag.lock() }; payload.callback.call(); @@ -108,16 +107,13 @@ impl RemoteCallback for AsyncWatcher { impl Drop for AsyncWatcher { fn drop(&mut self) { - unsafe { - self.exit_flag.with(|should_exit| { - // NB: These two things need to happen atomically. Otherwise - // the event handler could wake up due to a *previous* - // signal and see the exit flag, destroying the handle - // before the final send. - *should_exit = true; - uvll::uv_async_send(self.handle) - }) - } + let mut should_exit = unsafe { self.exit_flag.lock() }; + // NB: These two things need to happen atomically. Otherwise + // the event handler could wake up due to a *previous* + // signal and see the exit flag, destroying the handle + // before the final send. + *should_exit = true; + unsafe { uvll::uv_async_send(self.handle) } } } diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index acf3cbdc1ae..4b1343045de 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -12,9 +12,9 @@ use libc::{c_int, c_char, c_void, ssize_t}; use libc; use std::c_str::CString; use std::c_str; -use std::io::{FileStat, IoError}; -use std::io; use std::mem; +use std::os; +use std::rt::rtio::{IoResult, IoError}; use std::rt::rtio; use std::rt::task::BlockedTask; @@ -56,21 +56,23 @@ impl FsRequest { }) } - pub fn lstat(loop_: &Loop, path: &CString) -> Result<FileStat, UvError> { + pub fn lstat(loop_: &Loop, path: &CString) + -> Result<rtio::FileStat, UvError> + { execute(|req, cb| unsafe { uvll::uv_fs_lstat(loop_.handle, req, path.with_ref(|p| p), cb) }).map(|req| req.mkstat()) } - pub fn stat(loop_: &Loop, path: &CString) -> Result<FileStat, UvError> { + pub fn stat(loop_: &Loop, path: &CString) -> Result<rtio::FileStat, UvError> { execute(|req, cb| unsafe { uvll::uv_fs_stat(loop_.handle, req, path.with_ref(|p| p), cb) }).map(|req| req.mkstat()) } - pub fn fstat(loop_: &Loop, fd: c_int) -> Result<FileStat, UvError> { + pub fn fstat(loop_: &Loop, fd: c_int) -> Result<rtio::FileStat, UvError> { execute(|req, cb| unsafe { uvll::uv_fs_fstat(loop_.handle, req, fd, cb) }).map(|req| req.mkstat()) @@ -269,40 +271,30 @@ impl FsRequest { unsafe { uvll::get_ptr_from_fs_req(self.req) } } - pub fn mkstat(&self) -> FileStat { + pub fn mkstat(&self) -> rtio::FileStat { let stat = self.get_stat(); fn to_msec(stat: uvll::uv_timespec_t) -> u64 { // Be sure to cast to u64 first to prevent overflowing if the tv_sec // field is a 32-bit integer. (stat.tv_sec as u64) * 1000 + (stat.tv_nsec as u64) / 1000000 } - let kind = match (stat.st_mode as c_int) & libc::S_IFMT { - libc::S_IFREG => io::TypeFile, - libc::S_IFDIR => io::TypeDirectory, - libc::S_IFIFO => io::TypeNamedPipe, - libc::S_IFBLK => io::TypeBlockSpecial, - libc::S_IFLNK => io::TypeSymlink, - _ => io::TypeUnknown, - }; - FileStat { + rtio::FileStat { size: stat.st_size as u64, - kind: kind, - perm: io::FilePermission::from_bits_truncate(stat.st_mode as u32), + kind: stat.st_mode as u64, + perm: stat.st_mode as u64, created: to_msec(stat.st_birthtim), modified: to_msec(stat.st_mtim), accessed: to_msec(stat.st_atim), - unstable: io::UnstableFileStat { - device: stat.st_dev as u64, - inode: stat.st_ino as u64, - rdev: stat.st_rdev as u64, - nlink: stat.st_nlink as u64, - uid: stat.st_uid as u64, - gid: stat.st_gid as u64, - blksize: stat.st_blksize as u64, - blocks: stat.st_blocks as u64, - flags: stat.st_flags as u64, - gen: stat.st_gen as u64, - } + device: stat.st_dev as u64, + inode: stat.st_ino as u64, + rdev: stat.st_rdev as u64, + nlink: stat.st_nlink as u64, + uid: stat.st_uid as u64, + gid: stat.st_gid as u64, + blksize: stat.st_blksize as u64, + blocks: stat.st_blocks as u64, + flags: stat.st_flags as u64, + gen: stat.st_gen as u64, } } } @@ -369,29 +361,26 @@ impl FileWatcher { } } - fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> { + fn base_read(&mut self, buf: &mut [u8], offset: i64) -> IoResult<int> { let _m = self.fire_homing_missile(); let r = FsRequest::read(&self.loop_, self.fd, buf, offset); r.map_err(uv_error_to_io_error) } - fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { + fn base_write(&mut self, buf: &[u8], offset: i64) -> IoResult<()> { let _m = self.fire_homing_missile(); let r = FsRequest::write(&self.loop_, self.fd, buf, offset); r.map_err(uv_error_to_io_error) } - fn seek_common(&self, pos: i64, whence: c_int) -> - Result<u64, IoError>{ - unsafe { - match libc::lseek(self.fd, pos as libc::off_t, whence) { - -1 => { - Err(IoError { - kind: io::OtherIoError, - desc: "Failed to lseek.", - detail: None - }) - }, - n => Ok(n as u64) - } + fn seek_common(&self, pos: i64, whence: c_int) -> IoResult<u64>{ + match unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) } { + -1 => { + Err(IoError { + code: os::errno() as uint, + extra: 0, + detail: None, + }) + }, + n => Ok(n as u64) } } } @@ -425,47 +414,47 @@ impl Drop for FileWatcher { } impl rtio::RtioFileStream for FileWatcher { - fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> { + fn read(&mut self, buf: &mut [u8]) -> IoResult<int> { self.base_read(buf, -1) } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.base_write(buf, -1) } - fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> { + fn pread(&mut self, buf: &mut [u8], offset: u64) -> IoResult<int> { self.base_read(buf, offset as i64) } - fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { + fn pwrite(&mut self, buf: &[u8], offset: u64) -> IoResult<()> { self.base_write(buf, offset as i64) } - fn seek(&mut self, pos: i64, whence: io::SeekStyle) -> Result<u64, IoError> { + fn seek(&mut self, pos: i64, whence: rtio::SeekStyle) -> IoResult<u64> { use libc::{SEEK_SET, SEEK_CUR, SEEK_END}; let whence = match whence { - io::SeekSet => SEEK_SET, - io::SeekCur => SEEK_CUR, - io::SeekEnd => SEEK_END + rtio::SeekSet => SEEK_SET, + rtio::SeekCur => SEEK_CUR, + rtio::SeekEnd => SEEK_END }; self.seek_common(pos, whence) } - fn tell(&self) -> Result<u64, IoError> { + fn tell(&self) -> IoResult<u64> { use libc::SEEK_CUR; self.seek_common(0, SEEK_CUR) } - fn fsync(&mut self) -> Result<(), IoError> { + fn fsync(&mut self) -> IoResult<()> { let _m = self.fire_homing_missile(); FsRequest::fsync(&self.loop_, self.fd).map_err(uv_error_to_io_error) } - fn datasync(&mut self) -> Result<(), IoError> { + fn datasync(&mut self) -> IoResult<()> { let _m = self.fire_homing_missile(); FsRequest::datasync(&self.loop_, self.fd).map_err(uv_error_to_io_error) } - fn truncate(&mut self, offset: i64) -> Result<(), IoError> { + fn truncate(&mut self, offset: i64) -> IoResult<()> { let _m = self.fire_homing_missile(); let r = FsRequest::truncate(&self.loop_, self.fd, offset); r.map_err(uv_error_to_io_error) } - fn fstat(&mut self) -> Result<FileStat, IoError> { + fn fstat(&mut self) -> IoResult<rtio::FileStat> { let _m = self.fire_homing_missile(); FsRequest::fstat(&self.loop_, self.fd).map_err(uv_error_to_io_error) } @@ -475,7 +464,6 @@ impl rtio::RtioFileStream for FileWatcher { mod test { use libc::c_int; use libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR}; - use std::io; use std::str; use super::FsRequest; use super::super::Loop; @@ -562,10 +550,6 @@ mod test { let result = FsRequest::mkdir(l(), path, mode); assert!(result.is_ok()); - let result = FsRequest::stat(l(), path); - assert!(result.is_ok()); - assert!(result.unwrap().kind == io::TypeDirectory); - let result = FsRequest::rmdir(l(), path); assert!(result.is_ok()); diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs index 4f565de6791..644ac4e45f6 100644 --- a/src/librustuv/homing.rs +++ b/src/librustuv/homing.rs @@ -153,8 +153,7 @@ mod test { use green::sched; use green::{SchedPool, PoolConfig}; use std::rt::rtio::RtioUdpSocket; - use std::io::test::next_test_ip4; - use std::task::TaskOpts; + use std::rt::task::TaskOpts; use net::UdpWatcher; use super::super::local_loop; @@ -172,7 +171,7 @@ mod test { }); pool.spawn(TaskOpts::new(), proc() { - let listener = UdpWatcher::bind(local_loop(), next_test_ip4()); + let listener = UdpWatcher::bind(local_loop(), ::next_test_ip4()); tx.send(listener.unwrap()); }); @@ -193,18 +192,18 @@ mod test { }); pool.spawn(TaskOpts::new(), proc() { - let addr1 = next_test_ip4(); - let addr2 = next_test_ip4(); + let addr1 = ::next_test_ip4(); + let addr2 = ::next_test_ip4(); let listener = UdpWatcher::bind(local_loop(), addr2); tx.send((listener.unwrap(), addr1)); let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap(); - listener.sendto([1, 2, 3, 4], addr2).unwrap(); + listener.sendto([1, 2, 3, 4], addr2).ok().unwrap(); }); let task = pool.task(TaskOpts::new(), proc() { let (mut watcher, addr) = rx.recv(); let mut buf = [0, ..10]; - assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr)); + assert!(watcher.recvfrom(buf).ok().unwrap() == (4, addr)); }); pool.spawn_sched().send(sched::TaskFromFriend(task)); diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 20893b9e84c..cbbb961e048 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -51,16 +51,14 @@ extern crate alloc; use libc::{c_int, c_void}; use std::fmt; -use std::io::IoError; -use std::io; use std::mem; use std::ptr::null; use std::ptr; use std::rt::local::Local; use std::rt::rtio; +use std::rt::rtio::{IoResult, IoError}; use std::rt::task::{BlockedTask, Task}; use std::str::raw::from_c_str; -use std::str; use std::task; pub use self::async::AsyncWatcher; @@ -391,40 +389,39 @@ fn error_smoke_test() { assert_eq!(err.to_str(), "EOF: end of file".to_string()); } +#[cfg(unix)] pub fn uv_error_to_io_error(uverr: UvError) -> IoError { - unsafe { - // Importing error constants - - // uv error descriptions are static - let UvError(errcode) = uverr; - let c_desc = uvll::uv_strerror(errcode); - let desc = str::raw::c_str_to_static_slice(c_desc); - - let kind = match errcode { - uvll::UNKNOWN => io::OtherIoError, - uvll::OK => io::OtherIoError, - uvll::EOF => io::EndOfFile, - uvll::EACCES => io::PermissionDenied, - uvll::ECONNREFUSED => io::ConnectionRefused, - uvll::ECONNRESET => io::ConnectionReset, - uvll::ENOTCONN => io::NotConnected, - uvll::ENOENT => io::FileNotFound, - uvll::EPIPE => io::BrokenPipe, - uvll::ECONNABORTED => io::ConnectionAborted, - uvll::EADDRNOTAVAIL => io::ConnectionRefused, - uvll::ECANCELED => io::TimedOut, + let UvError(errcode) = uverr; + IoError { + code: if errcode == uvll::EOF {libc::EOF as uint} else {-errcode as uint}, + extra: 0, + detail: Some(uverr.desc()), + } +} + +#[cfg(windows)] +pub fn uv_error_to_io_error(uverr: UvError) -> IoError { + let UvError(errcode) = uverr; + IoError { + code: match errcode { + uvll::EOF => io::EOF, + uvll::EACCES => io::ERROR_ACCESS_DENIED, + uvll::ECONNREFUSED => io::WSAECONNREFUSED, + uvll::ECONNRESET => io::WSAECONNRESET, + uvll::ENOTCONN => io::WSAENOTCONN, + uvll::ENOENT => io::ERROR_NOT_FOUND, + uvll::EPIPE => io::ERROR_BROKEN_PIPE, + uvll::ECONNABORTED => io::WSAECONNABORTED, + uvll::EADDRNOTAVAIL => io::WSAEADDRNOTAVAIL, + uvll::ECANCELED => libc::ERROR_OPERATION_ABORTED, err => { uvdebug!("uverr.code {}", err as int); // FIXME: Need to map remaining uv error types - io::OtherIoError + -1 } - }; - - IoError { - kind: kind, - desc: desc, - detail: None - } + }, + extra: 0, + detail: Some(uverr.desc()), } } @@ -437,7 +434,7 @@ pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> { } } -pub fn status_to_io_result(status: c_int) -> Result<(), IoError> { +pub fn status_to_io_result(status: c_int) -> IoResult<()> { if status >= 0 {Ok(())} else {Err(uv_error_to_io_error(UvError(status)))} } @@ -472,6 +469,33 @@ fn local_loop() -> &'static mut uvio::UvIoFactory { } #[cfg(test)] +fn next_test_ip4() -> std::rt::rtio::SocketAddr { + use std::io; + use std::rt::rtio; + + let io::net::ip::SocketAddr { ip, port } = io::test::next_test_ip4(); + let ip = match ip { + io::net::ip::Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d), + _ => unreachable!(), + }; + rtio::SocketAddr { ip: ip, port: port } +} + +#[cfg(test)] +fn next_test_ip6() -> std::rt::rtio::SocketAddr { + use std::io; + use std::rt::rtio; + + let io::net::ip::SocketAddr { ip, port } = io::test::next_test_ip6(); + let ip = match ip { + io::net::ip::Ipv6Addr(a, b, c, d, e, f, g, h) => + rtio::Ipv6Addr(a, b, c, d, e, f, g, h), + _ => unreachable!(), + }; + rtio::SocketAddr { ip: ip, port: port } +} + +#[cfg(test)] mod test { use std::mem::transmute; use std::rt::thread::Thread; diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 2f35e48b847..e7bdc25a1fd 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -10,12 +10,10 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; -use std::io; -use std::io::IoError; -use std::io::net::ip; use std::mem; use std::ptr; use std::rt::rtio; +use std::rt::rtio::IoError; use std::rt::task::BlockedTask; use homing::{HomingIO, HomeHandle}; @@ -36,7 +34,7 @@ pub fn htons(u: u16) -> u16 { mem::to_be16(u) } pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) } pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, - len: uint) -> ip::SocketAddr { + len: uint) -> rtio::SocketAddr { match storage.ss_family as c_int { libc::AF_INET => { assert!(len as uint >= mem::size_of::<libc::sockaddr_in>()); @@ -48,8 +46,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, let b = (ip >> 16) as u8; let c = (ip >> 8) as u8; let d = (ip >> 0) as u8; - ip::SocketAddr { - ip: ip::Ipv4Addr(a, b, c, d), + rtio::SocketAddr { + ip: rtio::Ipv4Addr(a, b, c, d), port: ntohs(storage.sin_port), } } @@ -66,8 +64,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, let f = ntohs(storage.sin6_addr.s6_addr[5]); let g = ntohs(storage.sin6_addr.s6_addr[6]); let h = ntohs(storage.sin6_addr.s6_addr[7]); - ip::SocketAddr { - ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h), + rtio::SocketAddr { + ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h), port: ntohs(storage.sin6_port), } } @@ -77,11 +75,11 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } -fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { +fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) { unsafe { let mut storage: libc::sockaddr_storage = mem::zeroed(); let len = match addr.ip { - ip::Ipv4Addr(a, b, c, d) => { + rtio::Ipv4Addr(a, b, c, d) => { let ip = (a as u32 << 24) | (b as u32 << 16) | (c as u32 << 8) | @@ -95,7 +93,7 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { }; mem::size_of::<libc::sockaddr_in>() } - ip::Ipv6Addr(a, b, c, d, e, f, g, h) => { + rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { let storage: &mut libc::sockaddr_in6 = mem::transmute(&mut storage); storage.sin6_family = libc::AF_INET6 as libc::sa_family_t; @@ -126,7 +124,7 @@ enum SocketNameKind { } fn socket_name(sk: SocketNameKind, - handle: *c_void) -> Result<ip::SocketAddr, IoError> { + handle: *c_void) -> Result<rtio::SocketAddr, IoError> { let getsockname = match sk { TcpPeer => uvll::uv_tcp_getpeername, Tcp => uvll::uv_tcp_getsockname, @@ -201,7 +199,7 @@ impl TcpWatcher { } pub fn connect(io: &mut UvIoFactory, - address: ip::SocketAddr, + address: rtio::SocketAddr, timeout: Option<u64>) -> Result<TcpWatcher, UvError> { let tcp = TcpWatcher::new(io); let cx = ConnectCtx { status: -1, task: None, timer: None }; @@ -218,7 +216,7 @@ impl HomingIO for TcpWatcher { } impl rtio::RtioSocket for TcpWatcher { - fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> { + fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> { let _m = self.fire_homing_missile(); socket_name(Tcp, self.handle) } @@ -231,7 +229,7 @@ impl rtio::RtioTcpStream for TcpWatcher { // see comments in close_read about this check if guard.access.is_closed() { - return Err(io::standard_error(io::EndOfFile)) + return Err(uv_error_to_io_error(UvError(uvll::EOF))) } self.stream.read(buf).map_err(uv_error_to_io_error) @@ -243,7 +241,7 @@ impl rtio::RtioTcpStream for TcpWatcher { self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } - fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> { + fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> { let _m = self.fire_homing_missile(); socket_name(TcpPeer, self.handle) } @@ -350,7 +348,7 @@ impl Drop for TcpWatcher { // TCP listeners (unbound servers) impl TcpListener { - pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr) + pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr) -> Result<Box<TcpListener>, UvError> { let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; assert_eq!(unsafe { @@ -385,7 +383,7 @@ impl UvHandle<uvll::uv_tcp_t> for TcpListener { } impl rtio::RtioSocket for TcpListener { - fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> { + fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> { let _m = self.fire_homing_missile(); socket_name(Tcp, self.handle) } @@ -439,7 +437,7 @@ impl HomingIO for TcpAcceptor { } impl rtio::RtioSocket for TcpAcceptor { - fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> { + fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> { let _m = self.fire_homing_missile(); socket_name(Tcp, self.listener.handle) } @@ -492,7 +490,7 @@ pub struct UdpWatcher { struct UdpRecvCtx { task: Option<BlockedTask>, buf: Option<Buf>, - result: Option<(ssize_t, Option<ip::SocketAddr>)>, + result: Option<(ssize_t, Option<rtio::SocketAddr>)>, } struct UdpSendCtx { @@ -502,7 +500,7 @@ struct UdpSendCtx { } impl UdpWatcher { - pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr) + pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr) -> Result<UdpWatcher, UvError> { let udp = UdpWatcher { handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) }, @@ -536,7 +534,7 @@ impl HomingIO for UdpWatcher { } impl rtio::RtioSocket for UdpWatcher { - fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> { + fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> { let _m = self.fire_homing_missile(); socket_name(Udp, self.handle) } @@ -544,7 +542,7 @@ impl rtio::RtioSocket for UdpWatcher { impl rtio::RtioUdpSocket for UdpWatcher { fn recvfrom(&mut self, buf: &mut [u8]) - -> Result<(uint, ip::SocketAddr), IoError> + -> Result<(uint, rtio::SocketAddr), IoError> { let loop_ = self.uv_loop(); let m = self.fire_homing_missile(); @@ -609,7 +607,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { } } - fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { + fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> { let m = self.fire_homing_missile(); let loop_ = self.uv_loop(); let guard = try!(self.write_access.grant(m)); @@ -675,7 +673,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { } } - fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> { + fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { multi.to_str().with_c_str(|m_addr| { @@ -686,7 +684,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { }) } - fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> { + fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> { let _m = self.fire_homing_missile(); status_to_io_result(unsafe { multi.to_str().with_c_str(|m_addr| { @@ -843,14 +841,13 @@ pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> mod test { use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, RtioUdpSocket}; - use std::io::test::{next_test_ip4, next_test_ip6}; use super::{UdpWatcher, TcpWatcher, TcpListener}; use super::super::local_loop; #[test] fn connect_close_ip4() { - match TcpWatcher::connect(local_loop(), next_test_ip4(), None) { + match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) { Ok(..) => fail!(), Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()), } @@ -858,7 +855,7 @@ mod test { #[test] fn connect_close_ip6() { - match TcpWatcher::connect(local_loop(), next_test_ip6(), None) { + match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) { Ok(..) => fail!(), Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()), } @@ -866,7 +863,7 @@ mod test { #[test] fn udp_bind_close_ip4() { - match UdpWatcher::bind(local_loop(), next_test_ip4()) { + match UdpWatcher::bind(local_loop(), ::next_test_ip4()) { Ok(..) => {} Err(..) => fail!() } @@ -874,7 +871,7 @@ mod test { #[test] fn udp_bind_close_ip6() { - match UdpWatcher::bind(local_loop(), next_test_ip6()) { + match UdpWatcher::bind(local_loop(), ::next_test_ip6()) { Ok(..) => {} Err(..) => fail!() } @@ -883,7 +880,7 @@ mod test { #[test] fn listen_ip4() { let (tx, rx) = channel(); - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); spawn(proc() { let w = match TcpListener::bind(local_loop(), addr) { @@ -919,7 +916,7 @@ mod test { #[test] fn listen_ip6() { let (tx, rx) = channel(); - let addr = next_test_ip6(); + let addr = ::next_test_ip6(); spawn(proc() { let w = match TcpListener::bind(local_loop(), addr) { @@ -955,8 +952,8 @@ mod test { #[test] fn udp_recv_ip4() { let (tx, rx) = channel(); - let client = next_test_ip4(); - let server = next_test_ip4(); + let client = ::next_test_ip4(); + let server = ::next_test_ip4(); spawn(proc() { match UdpWatcher::bind(local_loop(), server) { @@ -964,7 +961,7 @@ mod test { tx.send(()); let mut buf = [0u8, ..10]; match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), + Ok((10, addr)) => assert!(addr == client), e => fail!("{:?}", e), } for i in range(0, 10u8) { @@ -987,8 +984,8 @@ mod test { #[test] fn udp_recv_ip6() { let (tx, rx) = channel(); - let client = next_test_ip6(); - let server = next_test_ip6(); + let client = ::next_test_ip6(); + let server = ::next_test_ip6(); spawn(proc() { match UdpWatcher::bind(local_loop(), server) { @@ -996,7 +993,7 @@ mod test { tx.send(()); let mut buf = [0u8, ..10]; match w.recvfrom(buf) { - Ok((10, addr)) => assert_eq!(addr, client), + Ok((10, addr)) => assert!(addr == client), e => fail!("{:?}", e), } for i in range(0, 10u8) { @@ -1018,15 +1015,15 @@ mod test { #[test] fn test_read_read_read() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); static MAX: uint = 5000; let (tx, rx) = channel(); spawn(proc() { let listener = TcpListener::bind(local_loop(), addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); + let mut acceptor = listener.listen().ok().unwrap(); tx.send(()); - let mut stream = acceptor.accept().unwrap(); + let mut stream = acceptor.accept().ok().unwrap(); let buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX { @@ -1041,7 +1038,7 @@ mod test { let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); + let nread = stream.read(buf).ok().unwrap(); total_bytes_read += nread; for i in range(0u, nread) { assert_eq!(buf[i], 1); @@ -1053,8 +1050,8 @@ mod test { #[test] #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet fn test_udp_twice() { - let server_addr = next_test_ip4(); - let client_addr = next_test_ip4(); + let server_addr = ::next_test_ip4(); + let client_addr = ::next_test_ip4(); let (tx, rx) = channel(); spawn(proc() { @@ -1068,22 +1065,22 @@ mod test { tx.send(()); let mut buf1 = [0]; let mut buf2 = [0]; - let (nread1, src1) = server.recvfrom(buf1).unwrap(); - let (nread2, src2) = server.recvfrom(buf2).unwrap(); + let (nread1, src1) = server.recvfrom(buf1).ok().unwrap(); + let (nread2, src2) = server.recvfrom(buf2).ok().unwrap(); assert_eq!(nread1, 1); assert_eq!(nread2, 1); - assert_eq!(src1, client_addr); - assert_eq!(src2, client_addr); + assert!(src1 == client_addr); + assert!(src2 == client_addr); assert_eq!(buf1[0], 1); assert_eq!(buf2[0], 2); } #[test] fn test_udp_many_read() { - let server_out_addr = next_test_ip4(); - let server_in_addr = next_test_ip4(); - let client_out_addr = next_test_ip4(); - let client_in_addr = next_test_ip4(); + let server_out_addr = ::next_test_ip4(); + let server_in_addr = ::next_test_ip4(); + let client_out_addr = ::next_test_ip4(); + let client_in_addr = ::next_test_ip4(); static MAX: uint = 500_000; let (tx1, rx1) = channel::<()>(); @@ -1106,9 +1103,9 @@ mod test { // check if the client has received enough let res = server_in.recvfrom(buf); assert!(res.is_ok()); - let (nread, src) = res.unwrap(); + let (nread, src) = res.ok().unwrap(); assert_eq!(nread, 1); - assert_eq!(src, client_out_addr); + assert!(src == client_out_addr); } assert!(total_bytes_sent >= MAX); }); @@ -1127,8 +1124,8 @@ mod test { // wait for data let res = client_in.recvfrom(buf); assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(src, server_out_addr); + let (nread, src) = res.ok().unwrap(); + assert!(src == server_out_addr); total_bytes_recv += nread; for i in range(0u, nread) { assert_eq!(buf[i], 1); @@ -1140,25 +1137,25 @@ mod test { #[test] fn test_read_and_block() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); let (tx, rx) = channel::<Receiver<()>>(); spawn(proc() { let rx = rx.recv(); let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap(); rx.recv(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap(); rx.recv(); }); let listener = TcpListener::bind(local_loop(), addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); + let mut acceptor = listener.listen().ok().unwrap(); let (tx2, rx2) = channel(); tx.send(rx2); - let mut stream = acceptor.accept().unwrap(); + let mut stream = acceptor.accept().ok().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -1166,7 +1163,7 @@ mod test { let mut reads = 0; while current < expected { - let nread = stream.read(buf).unwrap(); + let nread = stream.read(buf).ok().unwrap(); for i in range(0u, nread) { let val = buf[i] as uint; assert_eq!(val, current % 8); @@ -1183,14 +1180,14 @@ mod test { #[test] fn test_simple_tcp_server_and_client_on_diff_threads() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); spawn(proc() { let listener = TcpListener::bind(local_loop(), addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let mut stream = acceptor.accept().unwrap(); + let mut acceptor = listener.listen().ok().unwrap(); + let mut stream = acceptor.accept().ok().unwrap(); let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); + let nread = stream.read(buf).ok().unwrap(); assert_eq!(nread, 8); for i in range(0u, nread) { assert_eq!(buf[i], i as u8); @@ -1201,27 +1198,27 @@ mod test { while stream.is_err() { stream = TcpWatcher::connect(local_loop(), addr, None); } - stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap(); + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap(); } #[should_fail] #[test] fn tcp_listener_fail_cleanup() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); let w = TcpListener::bind(local_loop(), addr).unwrap(); - let _w = w.listen().unwrap(); + let _w = w.listen().ok().unwrap(); fail!(); } #[should_fail] #[test] fn tcp_stream_fail_cleanup() { let (tx, rx) = channel(); - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); spawn(proc() { let w = TcpListener::bind(local_loop(), addr).unwrap(); - let mut w = w.listen().unwrap(); + let mut w = w.listen().ok().unwrap(); tx.send(()); - drop(w.accept().unwrap()); + drop(w.accept().ok().unwrap()); }); rx.recv(); let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap(); @@ -1230,14 +1227,14 @@ mod test { #[should_fail] #[test] fn udp_listener_fail_cleanup() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); let _w = UdpWatcher::bind(local_loop(), addr).unwrap(); fail!(); } #[should_fail] #[test] fn udp_fail_other_task() { - let addr = next_test_ip4(); + let addr = ::next_test_ip4(); let (tx, rx) = channel(); // force the handle to be created on a different scheduler, failure in diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index cf3d4f672e6..e5c134b6b92 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -10,10 +10,9 @@ use libc; use std::c_str::CString; -use std::io::IoError; -use std::io; use std::mem; -use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; +use std::rt::rtio; +use std::rt::rtio::IoResult; use std::rt::task::BlockedTask; use homing::{HomingIO, HomeHandle}; @@ -39,8 +38,8 @@ pub struct PipeWatcher { pub struct PipeListener { home: HomeHandle, pipe: *uvll::uv_pipe_t, - outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>, - incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>, + outgoing: Sender<IoResult<Box<rtio::RtioPipe:Send>>>, + incoming: Receiver<IoResult<Box<rtio::RtioPipe:Send>>>, } pub struct PipeAcceptor { @@ -111,26 +110,26 @@ impl PipeWatcher { } } -impl RtioPipe for PipeWatcher { - fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { +impl rtio::RtioPipe for PipeWatcher { + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let m = self.fire_homing_missile(); let guard = try!(self.read_access.grant(m)); // see comments in close_read about this check if guard.access.is_closed() { - return Err(io::standard_error(io::EndOfFile)) + return Err(uv_error_to_io_error(UvError(uvll::EOF))) } self.stream.read(buf).map_err(uv_error_to_io_error) } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { let m = self.fire_homing_missile(); let guard = try!(self.write_access.grant(m)); self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error) } - fn clone(&self) -> Box<RtioPipe:Send> { + fn clone(&self) -> Box<rtio::RtioPipe:Send> { box PipeWatcher { stream: StreamWatcher::new(self.stream.handle), defused: false, @@ -138,10 +137,10 @@ impl RtioPipe for PipeWatcher { refcount: self.refcount.clone(), read_access: self.read_access.clone(), write_access: self.write_access.clone(), - } as Box<RtioPipe:Send> + } as Box<rtio::RtioPipe:Send> } - fn close_read(&mut self) -> Result<(), IoError> { + fn close_read(&mut self) -> IoResult<()> { // The current uv_shutdown method only shuts the writing half of the // connection, and no method is provided to shut down the reading half // of the connection. With a lack of method, we emulate shutting down @@ -168,7 +167,7 @@ impl RtioPipe for PipeWatcher { Ok(()) } - fn close_write(&mut self) -> Result<(), IoError> { + fn close_write(&mut self) -> IoResult<()> { let _m = self.fire_homing_missile(); net::shutdown(self.stream.handle, &self.uv_loop()) } @@ -248,8 +247,8 @@ impl PipeListener { } } -impl RtioUnixListener for PipeListener { - fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> { +impl rtio::RtioUnixListener for PipeListener { + fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor:Send>> { // create the acceptor object from ourselves let mut acceptor = box PipeAcceptor { listener: self, @@ -259,7 +258,7 @@ impl RtioUnixListener for PipeListener { let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { - 0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>), + 0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor:Send>), n => Err(uv_error_to_io_error(UvError(n))), } } @@ -284,7 +283,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { }); let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false); assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0); - Ok(box client as Box<RtioPipe:Send>) + Ok(box client as Box<rtio::RtioPipe:Send>) } n => Err(uv_error_to_io_error(UvError(n))) }; @@ -300,8 +299,8 @@ impl Drop for PipeListener { // PipeAcceptor implementation and traits -impl RtioUnixAcceptor for PipeAcceptor { - fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> { +impl rtio::RtioUnixAcceptor for PipeAcceptor { + fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> { self.timeout.accept(&self.listener.incoming) } @@ -366,11 +365,11 @@ mod tests { spawn(proc() { let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap(); - let mut p = p.listen().unwrap(); + let mut p = p.listen().ok().unwrap(); tx.send(()); - let mut client = p.accept().unwrap(); + let mut client = p.accept().ok().unwrap(); let mut buf = [0]; - assert!(client.read(buf).unwrap() == 1); + assert!(client.read(buf).ok().unwrap() == 1); assert_eq!(buf[0], 1); assert!(client.write([2]).is_ok()); }); @@ -378,7 +377,7 @@ mod tests { let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); assert!(c.write([1]).is_ok()); let mut buf = [0]; - assert!(c.read(buf).unwrap() == 1); + assert!(c.read(buf).ok().unwrap() == 1); assert_eq!(buf[0], 2); } @@ -390,9 +389,9 @@ mod tests { spawn(proc() { let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap(); - let mut p = p.listen().unwrap(); + let mut p = p.listen().ok().unwrap(); tx.send(()); - drop(p.accept().unwrap()); + drop(p.accept().ok().unwrap()); }); rx.recv(); let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index f6fcf3e4816..aa87582da26 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -10,11 +10,10 @@ use libc::c_int; use libc; -use std::io::IoError; -use std::io::process; use std::ptr; use std::c_str::CString; -use std::rt::rtio::{ProcessConfig, RtioProcess}; +use std::rt::rtio; +use std::rt::rtio::IoResult; use std::rt::task::BlockedTask; use homing::{HomingIO, HomeHandle}; @@ -33,7 +32,7 @@ pub struct Process { to_wake: Option<BlockedTask>, /// Collected from the exit_cb - exit_status: Option<process::ProcessExit>, + exit_status: Option<rtio::ProcessExit>, /// Lazily initialized timeout timer timer: Option<Box<TimerWatcher>>, @@ -51,7 +50,7 @@ impl Process { /// /// Returns either the corresponding process object or an error which /// occurred. - pub fn spawn(io_loop: &mut UvIoFactory, cfg: ProcessConfig) + pub fn spawn(io_loop: &mut UvIoFactory, cfg: rtio::ProcessConfig) -> Result<(Box<Process>, Vec<Option<PipeWatcher>>), UvError> { let mut io = vec![cfg.stdin, cfg.stdout, cfg.stderr]; for slot in cfg.extra_io.iter() { @@ -137,8 +136,8 @@ extern fn on_exit(handle: *uvll::uv_process_t, assert!(p.exit_status.is_none()); p.exit_status = Some(match term_signal { - 0 => process::ExitStatus(exit_status as int), - n => process::ExitSignal(n as int), + 0 => rtio::ExitStatus(exit_status as int), + n => rtio::ExitSignal(n as int), }); if p.to_wake.is_none() { return } @@ -146,19 +145,19 @@ extern fn on_exit(handle: *uvll::uv_process_t, } unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, - io: &process::StdioContainer, + io: &rtio::StdioContainer, io_loop: &mut UvIoFactory) -> Option<PipeWatcher> { match *io { - process::Ignored => { + rtio::Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); None } - process::InheritFd(fd) => { + rtio::InheritFd(fd) => { uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD); uvll::set_stdio_container_fd(dst, fd); None } - process::CreatePipe(readable, writable) => { + rtio::CreatePipe(readable, writable) => { let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; if readable { flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; @@ -231,12 +230,12 @@ impl UvHandle<uvll::uv_process_t> for Process { fn uv_handle(&self) -> *uvll::uv_process_t { self.handle } } -impl RtioProcess for Process { +impl rtio::RtioProcess for Process { fn id(&self) -> libc::pid_t { unsafe { uvll::process_pid(self.handle) as libc::pid_t } } - fn kill(&mut self, signal: int) -> Result<(), IoError> { + fn kill(&mut self, signal: int) -> IoResult<()> { let _m = self.fire_homing_missile(); match unsafe { uvll::uv_process_kill(self.handle, signal as libc::c_int) @@ -246,7 +245,7 @@ impl RtioProcess for Process { } } - fn wait(&mut self) -> Result<process::ProcessExit, IoError> { + fn wait(&mut self) -> IoResult<rtio::ProcessExit> { // Make sure (on the home scheduler) that we have an exit status listed let _m = self.fire_homing_missile(); match self.exit_status { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 98ae865cb1d..a3694bfe9c2 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -23,8 +23,8 @@ use alloc::arc::Arc; use libc::c_void; use std::mem; +use std::rt::mutex::NativeMutex; use std::rt::task::BlockedTask; -use std::unstable::mutex::NativeMutex; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index b2e1c752012..fd0b6acb8ae 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -9,8 +9,7 @@ // except according to those terms. use libc::c_int; -use std::io::signal::Signum; -use std::rt::rtio::RtioSignal; +use std::rt::rtio::{RtioSignal, Callback}; use homing::{HomingIO, HomeHandle}; use super::{UvError, UvHandle}; @@ -21,18 +20,16 @@ pub struct SignalWatcher { handle: *uvll::uv_signal_t, home: HomeHandle, - channel: Sender<Signum>, - signal: Signum, + cb: Box<Callback:Send>, } impl SignalWatcher { - pub fn new(io: &mut UvIoFactory, signum: Signum, channel: Sender<Signum>) + pub fn new(io: &mut UvIoFactory, signum: int, cb: Box<Callback:Send>) -> Result<Box<SignalWatcher>, UvError> { let s = box SignalWatcher { handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL), home: io.make_handle(), - channel: channel, - signal: signum, + cb: cb, }; assert_eq!(unsafe { uvll::uv_signal_init(io.uv_loop(), s.handle) @@ -48,10 +45,9 @@ impl SignalWatcher { } } -extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { +extern fn signal_cb(handle: *uvll::uv_signal_t, _signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; - assert_eq!(signum as int, s.signal as int); - let _ = s.channel.send_opt(s.signal); + let _ = s.cb.call(); } impl HomingIO for SignalWatcher { @@ -70,25 +66,3 @@ impl Drop for SignalWatcher { self.close(); } } - -#[cfg(test)] -mod test { - use super::super::local_loop; - use std::io::signal; - use super::SignalWatcher; - - #[test] - fn closing_channel_during_drop_doesnt_kill_everything() { - // see issue #10375, relates to timers as well. - let (tx, rx) = channel(); - let _signal = SignalWatcher::new(local_loop(), signal::Interrupt, - tx); - - spawn(proc() { - let _ = rx.recv_opt(); - }); - - // when we drop the SignalWatcher we're going to destroy the channel, - // which must wake up the task on the other end - } -} diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs index 15add60b59c..1c191d476ed 100644 --- a/src/librustuv/timeout.rs +++ b/src/librustuv/timeout.rs @@ -9,9 +9,9 @@ // except according to those terms. use libc::c_int; -use std::io::IoResult; use std::mem; use std::rt::task::BlockedTask; +use std::rt::rtio::IoResult; use access; use homing::{HomeHandle, HomingMissile, HomingIO}; diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 525539f8b36..b940774323a 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::mem; -use std::rt::rtio::RtioTimer; +use std::rt::rtio::{RtioTimer, Callback}; use std::rt::task::BlockedTask; use homing::{HomeHandle, HomingIO}; @@ -27,8 +27,8 @@ pub struct TimerWatcher { pub enum NextAction { WakeTask, - SendOnce(Sender<()>), - SendMany(Sender<()>, uint), + CallOnce(Box<Callback:Send>), + CallMany(Box<Callback:Send>, uint), } impl TimerWatcher { @@ -103,9 +103,7 @@ impl RtioTimer for TimerWatcher { self.stop(); } - fn oneshot(&mut self, msecs: u64) -> Receiver<()> { - let (tx, rx) = channel(); - + fn oneshot(&mut self, msecs: u64, cb: Box<Callback:Send>) { // similarly to the destructor, we must drop the previous action outside // of the homing missile let _prev_action = { @@ -113,15 +111,11 @@ impl RtioTimer for TimerWatcher { self.id += 1; self.stop(); self.start(timer_cb, msecs, 0); - mem::replace(&mut self.action, Some(SendOnce(tx))) + mem::replace(&mut self.action, Some(CallOnce(cb))) }; - - return rx; } - fn period(&mut self, msecs: u64) -> Receiver<()> { - let (tx, rx) = channel(); - + fn period(&mut self, msecs: u64, cb: Box<Callback:Send>) { // similarly to the destructor, we must drop the previous action outside // of the homing missile let _prev_action = { @@ -129,10 +123,8 @@ impl RtioTimer for TimerWatcher { self.id += 1; self.stop(); self.start(timer_cb, msecs, msecs); - mem::replace(&mut self.action, Some(SendMany(tx, self.id))) + mem::replace(&mut self.action, Some(CallMany(cb, self.id))) }; - - return rx; } } @@ -145,9 +137,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t) { let task = timer.blocker.take_unwrap(); let _ = task.wake().map(|t| t.reawaken()); } - SendOnce(chan) => { let _ = chan.send_opt(()); } - SendMany(chan, id) => { - let _ = chan.send_opt(()); + CallOnce(mut cb) => { cb.call() } + CallMany(mut cb, id) => { + cb.call(); // Note that the above operation could have performed some form of // scheduling. This means that the timer may have decided to insert @@ -158,7 +150,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t) { // for you. We're guaranteed to all be running on the same thread, // so there's no need for any synchronization here. if timer.id == id { - timer.action = Some(SendMany(chan, id)); + timer.action = Some(CallMany(cb, id)); } } } @@ -179,145 +171,3 @@ impl Drop for TimerWatcher { }; } } - -#[cfg(test)] -mod test { - use std::rt::rtio::RtioTimer; - use super::super::local_loop; - use super::TimerWatcher; - - #[test] - fn oneshot() { - let mut timer = TimerWatcher::new(local_loop()); - let port = timer.oneshot(1); - port.recv(); - let port = timer.oneshot(1); - port.recv(); - } - - #[test] - fn override() { - let mut timer = TimerWatcher::new(local_loop()); - let oport = timer.oneshot(1); - let pport = timer.period(1); - timer.sleep(1); - assert_eq!(oport.recv_opt(), Err(())); - assert_eq!(pport.recv_opt(), Err(())); - timer.oneshot(1).recv(); - } - - #[test] - fn period() { - let mut timer = TimerWatcher::new(local_loop()); - let port = timer.period(1); - port.recv(); - port.recv(); - let port2 = timer.period(1); - port2.recv(); - port2.recv(); - } - - #[test] - fn sleep() { - let mut timer = TimerWatcher::new(local_loop()); - timer.sleep(1); - timer.sleep(1); - } - - #[test] #[should_fail] - fn oneshot_fail() { - let mut timer = TimerWatcher::new(local_loop()); - let _port = timer.oneshot(1); - fail!(); - } - - #[test] #[should_fail] - fn period_fail() { - let mut timer = TimerWatcher::new(local_loop()); - let _port = timer.period(1); - fail!(); - } - - #[test] #[should_fail] - fn normal_fail() { - let _timer = TimerWatcher::new(local_loop()); - fail!(); - } - - #[test] - fn closing_channel_during_drop_doesnt_kill_everything() { - // see issue #10375 - let mut timer = TimerWatcher::new(local_loop()); - let timer_port = timer.period(1000); - - spawn(proc() { - let _ = timer_port.recv_opt(); - }); - - // when we drop the TimerWatcher we're going to destroy the channel, - // which must wake up the task on the other end - } - - #[test] - fn reset_doesnt_switch_tasks() { - // similar test to the one above. - let mut timer = TimerWatcher::new(local_loop()); - let timer_port = timer.period(1000); - - spawn(proc() { - let _ = timer_port.recv_opt(); - }); - - drop(timer.oneshot(1)); - } - #[test] - fn reset_doesnt_switch_tasks2() { - // similar test to the one above. - let mut timer = TimerWatcher::new(local_loop()); - let timer_port = timer.period(1000); - - spawn(proc() { - let _ = timer_port.recv_opt(); - }); - - timer.sleep(1); - } - - #[test] - fn sender_goes_away_oneshot() { - let port = { - let mut timer = TimerWatcher::new(local_loop()); - timer.oneshot(1000) - }; - assert_eq!(port.recv_opt(), Err(())); - } - - #[test] - fn sender_goes_away_period() { - let port = { - let mut timer = TimerWatcher::new(local_loop()); - timer.period(1000) - }; - assert_eq!(port.recv_opt(), Err(())); - } - - #[test] - fn receiver_goes_away_oneshot() { - let mut timer1 = TimerWatcher::new(local_loop()); - drop(timer1.oneshot(1)); - let mut timer2 = TimerWatcher::new(local_loop()); - // while sleeping, the prevous timer should fire and not have its - // callback do something terrible. - timer2.sleep(2); - } - - #[test] - fn receiver_goes_away_period() { - let mut timer1 = TimerWatcher::new(local_loop()); - drop(timer1.period(1)); - let mut timer2 = TimerWatcher::new(local_loop()); - // while sleeping, the prevous timer should fire and not have its - // callback do something terrible. - timer2.sleep(2); - } -} diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index f70c3b4c1bd..828a3d0c63b 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -9,9 +9,8 @@ // except according to those terms. use libc; -use std::io::IoError; use std::ptr; -use std::rt::rtio::RtioTTY; +use std::rt::rtio::{RtioTTY, IoResult}; use homing::{HomingIO, HomeHandle}; use stream::StreamWatcher; @@ -80,17 +79,17 @@ impl TtyWatcher { } impl RtioTTY for TtyWatcher { - fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let _m = self.fire_homing_missile(); self.stream.read(buf).map_err(uv_error_to_io_error) } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { let _m = self.fire_homing_missile(); self.stream.write(buf, false).map_err(uv_error_to_io_error) } - fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { + fn set_raw(&mut self, raw: bool) -> IoResult<()> { let raw = raw as libc::c_int; let _m = self.fire_homing_missile(); match unsafe { uvll::uv_tty_set_mode(self.tty, raw) } { @@ -100,7 +99,7 @@ impl RtioTTY for TtyWatcher { } #[allow(unused_mut)] - fn get_winsize(&mut self) -> Result<(int, int), IoError> { + fn get_winsize(&mut self) -> IoResult<(int, int)> { let mut width: libc::c_int = 0; let mut height: libc::c_int = 0; let widthptr: *libc::c_int = &width; diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index c55b5f64f9d..cf2a2d73d4d 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -11,20 +11,13 @@ //! The implementation of `rtio` for libuv use std::c_str::CString; -use std::io::IoError; -use std::io::net::ip::SocketAddr; -use std::io::signal::Signum; -use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write, - ReadWrite, FileStat}; -use std::io; use std::mem; use libc::c_int; use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, S_IWUSR}; use libc; use std::rt::rtio; -use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop}; -use ai = std::io::net::addrinfo; +use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop, IoResult}; #[cfg(test)] use std::rt::thread::Thread; @@ -147,36 +140,38 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>) - -> Result<Box<rtio::RtioTcpStream:Send>, IoError> { + fn tcp_connect(&mut self, addr: rtio::SocketAddr, timeout: Option<u64>) + -> IoResult<Box<rtio::RtioTcpStream:Send>> { match TcpWatcher::connect(self, addr, timeout) { Ok(t) => Ok(box t as Box<rtio::RtioTcpStream:Send>), Err(e) => Err(uv_error_to_io_error(e)), } } - fn tcp_bind(&mut self, addr: SocketAddr) - -> Result<Box<rtio::RtioTcpListener:Send>, IoError> { + fn tcp_bind(&mut self, addr: rtio::SocketAddr) + -> IoResult<Box<rtio::RtioTcpListener:Send>> { match TcpListener::bind(self, addr) { Ok(t) => Ok(t as Box<rtio::RtioTcpListener:Send>), Err(e) => Err(uv_error_to_io_error(e)), } } - fn udp_bind(&mut self, addr: SocketAddr) - -> Result<Box<rtio::RtioUdpSocket:Send>, IoError> { + fn udp_bind(&mut self, addr: rtio::SocketAddr) + -> IoResult<Box<rtio::RtioUdpSocket:Send>> { match UdpWatcher::bind(self, addr) { Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket:Send>), Err(e) => Err(uv_error_to_io_error(e)), } } - fn timer_init(&mut self) -> Result<Box<rtio::RtioTimer:Send>, IoError> { + fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer:Send>> { Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer:Send>) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, - hint: Option<ai::Hint>) -> Result<Vec<ai::Info>, IoError> { + hint: Option<rtio::AddrinfoHint>) + -> IoResult<Vec<rtio::AddrinfoInfo>> + { let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint); r.map_err(uv_error_to_io_error) } @@ -187,20 +182,22 @@ impl IoFactory for UvIoFactory { Box<rtio::RtioFileStream:Send> } - fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) - -> Result<Box<rtio::RtioFileStream:Send>, IoError> { + fn fs_open(&mut self, path: &CString, fm: rtio::FileMode, + fa: rtio::FileAccess) + -> IoResult<Box<rtio::RtioFileStream:Send>> + { let flags = match fm { - io::Open => 0, - io::Append => libc::O_APPEND, - io::Truncate => libc::O_TRUNC, + rtio::Open => 0, + rtio::Append => libc::O_APPEND, + rtio::Truncate => libc::O_TRUNC, }; // Opening with a write permission must silently create the file. let (flags, mode) = match fa { - io::Read => (flags | libc::O_RDONLY, 0), - io::Write => (flags | libc::O_WRONLY | libc::O_CREAT, - libc::S_IRUSR | libc::S_IWUSR), - io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT, - libc::S_IRUSR | libc::S_IWUSR), + rtio::Read => (flags | libc::O_RDONLY, 0), + rtio::Write => (flags | libc::O_WRONLY | libc::O_CREAT, + libc::S_IRUSR | libc::S_IWUSR), + rtio::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT, + libc::S_IRUSR | libc::S_IWUSR), }; match FsRequest::open(self, path, flags as int, mode as int) { @@ -209,69 +206,66 @@ impl IoFactory for UvIoFactory { } } - fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> { + fn fs_unlink(&mut self, path: &CString) -> IoResult<()> { let r = FsRequest::unlink(&self.loop_, path); r.map_err(uv_error_to_io_error) } - fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> { + fn fs_lstat(&mut self, path: &CString) -> IoResult<rtio::FileStat> { let r = FsRequest::lstat(&self.loop_, path); r.map_err(uv_error_to_io_error) } - fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> { + fn fs_stat(&mut self, path: &CString) -> IoResult<rtio::FileStat> { let r = FsRequest::stat(&self.loop_, path); r.map_err(uv_error_to_io_error) } - fn fs_mkdir(&mut self, path: &CString, - perm: io::FilePermission) -> Result<(), IoError> { - let r = FsRequest::mkdir(&self.loop_, path, perm.bits() as c_int); + fn fs_mkdir(&mut self, path: &CString, perm: uint) -> IoResult<()> { + let r = FsRequest::mkdir(&self.loop_, path, perm as c_int); r.map_err(uv_error_to_io_error) } - fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> { + fn fs_rmdir(&mut self, path: &CString) -> IoResult<()> { let r = FsRequest::rmdir(&self.loop_, path); r.map_err(uv_error_to_io_error) } - fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> { + fn fs_rename(&mut self, path: &CString, to: &CString) -> IoResult<()> { let r = FsRequest::rename(&self.loop_, path, to); r.map_err(uv_error_to_io_error) } - fn fs_chmod(&mut self, path: &CString, - perm: io::FilePermission) -> Result<(), IoError> { - let r = FsRequest::chmod(&self.loop_, path, perm.bits() as c_int); + fn fs_chmod(&mut self, path: &CString, perm: uint) -> IoResult<()> { + let r = FsRequest::chmod(&self.loop_, path, perm as c_int); r.map_err(uv_error_to_io_error) } fn fs_readdir(&mut self, path: &CString, flags: c_int) - -> Result<Vec<CString>, IoError> + -> IoResult<Vec<CString>> { let r = FsRequest::readdir(&self.loop_, path, flags); r.map_err(uv_error_to_io_error) } - fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { + fn fs_link(&mut self, src: &CString, dst: &CString) -> IoResult<()> { let r = FsRequest::link(&self.loop_, src, dst); r.map_err(uv_error_to_io_error) } - fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> { + fn fs_symlink(&mut self, src: &CString, dst: &CString) -> IoResult<()> { let r = FsRequest::symlink(&self.loop_, src, dst); r.map_err(uv_error_to_io_error) } - fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> { + fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> IoResult<()> { let r = FsRequest::chown(&self.loop_, path, uid, gid); r.map_err(uv_error_to_io_error) } - fn fs_readlink(&mut self, path: &CString) -> Result<CString, IoError> { + fn fs_readlink(&mut self, path: &CString) -> IoResult<CString> { let r = FsRequest::readlink(&self.loop_, path); r.map_err(uv_error_to_io_error) } fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64) - -> Result<(), IoError> + -> IoResult<()> { let r = FsRequest::utime(&self.loop_, path, atime, mtime); r.map_err(uv_error_to_io_error) } fn spawn(&mut self, cfg: ProcessConfig) - -> Result<(Box<rtio::RtioProcess:Send>, - Vec<Option<Box<rtio::RtioPipe:Send>>>), - IoError> + -> IoResult<(Box<rtio::RtioProcess:Send>, + Vec<Option<Box<rtio::RtioPipe:Send>>>)> { match Process::spawn(self, cfg) { Ok((p, io)) => { @@ -284,12 +278,12 @@ impl IoFactory for UvIoFactory { } } - fn kill(&mut self, pid: libc::pid_t, signum: int) -> Result<(), IoError> { + fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> { Process::kill(pid, signum).map_err(uv_error_to_io_error) } fn unix_bind(&mut self, path: &CString) - -> Result<Box<rtio::RtioUnixListener:Send>, IoError> { + -> IoResult<Box<rtio::RtioUnixListener:Send>> { match PipeListener::bind(self, path) { Ok(p) => Ok(p as Box<rtio::RtioUnixListener:Send>), Err(e) => Err(uv_error_to_io_error(e)), @@ -297,7 +291,7 @@ impl IoFactory for UvIoFactory { } fn unix_connect(&mut self, path: &CString, timeout: Option<u64>) - -> Result<Box<rtio::RtioPipe:Send>, IoError> { + -> IoResult<Box<rtio::RtioPipe:Send>> { match PipeWatcher::connect(self, path, timeout) { Ok(p) => Ok(box p as Box<rtio::RtioPipe:Send>), Err(e) => Err(uv_error_to_io_error(e)), @@ -305,7 +299,7 @@ impl IoFactory for UvIoFactory { } fn tty_open(&mut self, fd: c_int, readable: bool) - -> Result<Box<rtio::RtioTTY:Send>, IoError> { + -> IoResult<Box<rtio::RtioTTY:Send>> { match TtyWatcher::new(self, fd, readable) { Ok(tty) => Ok(box tty as Box<rtio::RtioTTY:Send>), Err(e) => Err(uv_error_to_io_error(e)) @@ -313,16 +307,18 @@ impl IoFactory for UvIoFactory { } fn pipe_open(&mut self, fd: c_int) - -> Result<Box<rtio::RtioPipe:Send>, IoError> { + -> IoResult<Box<rtio::RtioPipe:Send>> + { match PipeWatcher::open(self, fd) { Ok(s) => Ok(box s as Box<rtio::RtioPipe:Send>), Err(e) => Err(uv_error_to_io_error(e)) } } - fn signal(&mut self, signum: Signum, channel: Sender<Signum>) - -> Result<Box<rtio::RtioSignal:Send>, IoError> { - match SignalWatcher::new(self, signum, channel) { + fn signal(&mut self, signum: int, cb: Box<rtio::Callback:Send>) + -> IoResult<Box<rtio::RtioSignal:Send>> + { + match SignalWatcher::new(self, signum, cb) { Ok(s) => Ok(s as Box<rtio::RtioSignal:Send>), Err(e) => Err(uv_error_to_io_error(e)), } |
