diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-10-10 10:11:49 -0700 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-08 20:40:38 -0800 |
| commit | d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41 (patch) | |
| tree | bdb9af03a1b73d4edc9ae5e6193a010c9b2b4edc /src/libnative | |
| parent | 0c1e1ff1e300868a29405a334e65eae690df971d (diff) | |
| download | rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.tar.gz rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.zip | |
Runtime removal: refactor pipes and networking
This patch continues the runtime removal by moving pipe and networking-related code into `sys`. Because this eliminates APIs in `libnative` and `librustrt`, it is a: [breaking-change] This functionality is likely to be available publicly, in some form, from `std` in the future.
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/mod.rs | 54 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 1103 | ||||
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 339 | ||||
| -rw-r--r-- | src/libnative/io/pipe_windows.rs | 769 |
4 files changed, 0 insertions, 2265 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index baf58b83dcd..2a76bc29f7c 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -35,8 +35,6 @@ pub use self::process::Process; mod helper_thread; // Native I/O implementations -pub mod addrinfo; -pub mod net; pub mod process; mod util; @@ -53,14 +51,6 @@ pub mod timer; #[path = "timer_windows.rs"] pub mod timer; -#[cfg(unix)] -#[path = "pipe_unix.rs"] -pub mod pipe; - -#[cfg(windows)] -#[path = "pipe_windows.rs"] -pub mod pipe; - #[cfg(windows)] #[path = "tty_windows.rs"] mod tty; @@ -126,52 +116,11 @@ pub struct IoFactory { impl IoFactory { pub fn new() -> IoFactory { - net::init(); IoFactory { _cannot_construct_outside_of_this_module: () } } } impl rtio::IoFactory for IoFactory { - // networking - fn tcp_connect(&mut self, addr: rtio::SocketAddr, - timeout: Option<u64>) - -> IoResult<Box<rtio::RtioTcpStream + Send>> - { - net::TcpStream::connect(addr, timeout).map(|s| { - box s as Box<rtio::RtioTcpStream + Send> - }) - } - fn tcp_bind(&mut self, addr: rtio::SocketAddr) - -> IoResult<Box<rtio::RtioTcpListener + Send>> { - net::TcpListener::bind(addr).map(|s| { - box s as Box<rtio::RtioTcpListener + Send> - }) - } - fn udp_bind(&mut self, addr: rtio::SocketAddr) - -> IoResult<Box<rtio::RtioUdpSocket + Send>> { - net::UdpSocket::bind(addr).map(|u| { - box u as Box<rtio::RtioUdpSocket + Send> - }) - } - fn unix_bind(&mut self, path: &CString) - -> IoResult<Box<rtio::RtioUnixListener + Send>> { - pipe::UnixListener::bind(path).map(|s| { - box s as Box<rtio::RtioUnixListener + Send> - }) - } - fn unix_connect(&mut self, path: &CString, - timeout: Option<u64>) -> IoResult<Box<rtio::RtioPipe + Send>> { - pipe::UnixStream::connect(path, timeout).map(|s| { - box s as Box<rtio::RtioPipe + Send> - }) - } - fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, - hint: Option<rtio::AddrinfoHint>) - -> IoResult<Vec<rtio::AddrinfoInfo>> - { - addrinfo::GetAddrInfoRequest::run(host, servname, hint) - } - // misc fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer + Send>> { timer::Timer::new().map(|t| box t as Box<rtio::RtioTimer + Send>) @@ -189,9 +138,6 @@ impl rtio::IoFactory for IoFactory { fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> { process::Process::kill(pid, signum) } - fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<rtio::RtioPipe + Send>> { - Ok(box file::FileDesc::new(fd, true) as Box<rtio::RtioPipe + Send>) - } #[cfg(unix)] fn tty_open(&mut self, fd: c_int, _readable: bool) -> IoResult<Box<rtio::RtioTTY + Send>> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs deleted file mode 100644 index a4b97a3eb84..00000000000 --- a/src/libnative/io/net.rs +++ /dev/null @@ -1,1103 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use alloc::arc::Arc; -use libc; -use std::mem; -use std::ptr; -use std::rt::mutex; -use std::rt::rtio::{mod, IoResult, IoError}; -use std::sync::atomic; - -use super::{retry, keep_going}; -use super::c; -use super::util; - -#[cfg(unix)] use super::process; -#[cfg(unix)] use super::file::FileDesc; - -pub use self::os::{init, sock_t, last_error}; - -//////////////////////////////////////////////////////////////////////////////// -// sockaddr and misc bindings -//////////////////////////////////////////////////////////////////////////////// - -pub fn htons(u: u16) -> u16 { - u.to_be() -} -pub fn ntohs(u: u16) -> u16 { - Int::from_be(u) -} - -enum InAddr { - In4Addr(libc::in_addr), - In6Addr(libc::in6_addr), -} - -fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr { - match ip { - rtio::Ipv4Addr(a, b, c, d) => { - let ip = (a as u32 << 24) | - (b as u32 << 16) | - (c as u32 << 8) | - (d as u32 << 0); - In4Addr(libc::in_addr { - s_addr: Int::from_be(ip) - }) - } - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { - In6Addr(libc::in6_addr { - s6_addr: [ - htons(a), - htons(b), - htons(c), - htons(d), - htons(e), - htons(f), - htons(g), - htons(h), - ] - }) - } - } -} - -fn addr_to_sockaddr(addr: rtio::SocketAddr, - storage: &mut libc::sockaddr_storage) - -> libc::socklen_t { - unsafe { - let len = match ip_to_inaddr(addr.ip) { - In4Addr(inaddr) => { - let storage = storage as *mut _ as *mut libc::sockaddr_in; - (*storage).sin_family = libc::AF_INET as libc::sa_family_t; - (*storage).sin_port = htons(addr.port); - (*storage).sin_addr = inaddr; - mem::size_of::<libc::sockaddr_in>() - } - In6Addr(inaddr) => { - let storage = storage as *mut _ as *mut libc::sockaddr_in6; - (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t; - (*storage).sin6_port = htons(addr.port); - (*storage).sin6_addr = inaddr; - mem::size_of::<libc::sockaddr_in6>() - } - }; - return len as libc::socklen_t; - } -} - -fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { - unsafe { - let fam = match addr.ip { - rtio::Ipv4Addr(..) => libc::AF_INET, - rtio::Ipv6Addr(..) => libc::AF_INET6, - }; - match libc::socket(fam, ty, 0) { - -1 => Err(os::last_error()), - fd => Ok(fd), - } - } -} - -fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, - payload: T) -> IoResult<()> { - unsafe { - let payload = &payload as *const T as *const libc::c_void; - let ret = libc::setsockopt(fd, opt, val, - payload, - mem::size_of::<T>() as libc::socklen_t); - if ret != 0 { - Err(os::last_error()) - } else { - Ok(()) - } - } -} - -pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, - val: libc::c_int) -> IoResult<T> { - unsafe { - let mut slot: T = mem::zeroed(); - let mut len = mem::size_of::<T>() as libc::socklen_t; - let ret = c::getsockopt(fd, opt, val, - &mut slot as *mut _ as *mut _, - &mut len); - if ret != 0 { - Err(os::last_error()) - } else { - assert!(len as uint == mem::size_of::<T>()); - Ok(slot) - } - } -} - -fn sockname(fd: sock_t, - f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr, - *mut libc::socklen_t) -> libc::c_int) - -> IoResult<rtio::SocketAddr> -{ - let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; - let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; - unsafe { - let storage = &mut storage as *mut libc::sockaddr_storage; - let ret = f(fd, - storage as *mut libc::sockaddr, - &mut len as *mut libc::socklen_t); - if ret != 0 { - return Err(os::last_error()) - } - } - return sockaddr_to_addr(&storage, len as uint); -} - -pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, - len: uint) -> IoResult<rtio::SocketAddr> { - match storage.ss_family as libc::c_int { - libc::AF_INET => { - assert!(len as uint >= mem::size_of::<libc::sockaddr_in>()); - let storage: &libc::sockaddr_in = unsafe { - mem::transmute(storage) - }; - let ip = (storage.sin_addr.s_addr as u32).to_be(); - let a = (ip >> 24) as u8; - let b = (ip >> 16) as u8; - let c = (ip >> 8) as u8; - let d = (ip >> 0) as u8; - Ok(rtio::SocketAddr { - ip: rtio::Ipv4Addr(a, b, c, d), - port: ntohs(storage.sin_port), - }) - } - libc::AF_INET6 => { - assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>()); - let storage: &libc::sockaddr_in6 = unsafe { - mem::transmute(storage) - }; - let a = ntohs(storage.sin6_addr.s6_addr[0]); - let b = ntohs(storage.sin6_addr.s6_addr[1]); - let c = ntohs(storage.sin6_addr.s6_addr[2]); - let d = ntohs(storage.sin6_addr.s6_addr[3]); - let e = ntohs(storage.sin6_addr.s6_addr[4]); - let f = ntohs(storage.sin6_addr.s6_addr[5]); - let g = ntohs(storage.sin6_addr.s6_addr[6]); - let h = ntohs(storage.sin6_addr.s6_addr[7]); - Ok(rtio::SocketAddr { - ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h), - port: ntohs(storage.sin6_port), - }) - } - _ => { - #[cfg(unix)] use libc::EINVAL as ERROR; - #[cfg(windows)] use libc::WSAEINVAL as ERROR; - Err(IoError { - code: ERROR as uint, - extra: 0, - detail: None, - }) - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// TCP streams -//////////////////////////////////////////////////////////////////////////////// - -pub struct TcpStream { - inner: Arc<Inner>, - read_deadline: u64, - write_deadline: u64, -} - -struct Inner { - fd: sock_t, - - // Unused on Linux, where this lock is not necessary. - #[allow(dead_code)] - lock: mutex::NativeMutex -} - -pub struct Guard<'a> { - pub fd: sock_t, - pub guard: mutex::LockGuard<'a>, -} - -impl Inner { - fn new(fd: sock_t) -> Inner { - Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } - } -} - -impl TcpStream { - pub fn connect(addr: rtio::SocketAddr, - timeout: Option<u64>) -> IoResult<TcpStream> { - let fd = try!(socket(addr, libc::SOCK_STREAM)); - let ret = TcpStream::new(Inner::new(fd)); - - let mut storage = unsafe { mem::zeroed() }; - let len = addr_to_sockaddr(addr, &mut storage); - let addrp = &storage as *const _ as *const libc::sockaddr; - - match timeout { - Some(timeout) => { - try!(util::connect_timeout(fd, addrp, len, timeout)); - Ok(ret) - }, - None => { - match retry(|| unsafe { libc::connect(fd, addrp, len) }) { - -1 => Err(os::last_error()), - _ => Ok(ret), - } - } - } - } - - fn new(inner: Inner) -> TcpStream { - TcpStream { - inner: Arc::new(inner), - read_deadline: 0, - write_deadline: 0, - } - } - - pub fn fd(&self) -> sock_t { self.inner.fd } - - fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, - nodelay as libc::c_int) - } - - fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { - let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, - seconds.is_some() as libc::c_int); - match seconds { - Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), - None => ret, - } - } - - #[cfg(any(target_os = "macos", target_os = "ios"))] - fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, - seconds as libc::c_int) - } - #[cfg(any(target_os = "freebsd", target_os = "dragonfly"))] - fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, - seconds as libc::c_int) - } - #[cfg(not(any(target_os = "macos", - target_os = "ios", - target_os = "freebsd", - target_os = "dragonfly")))] - fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { - Ok(()) - } - - #[cfg(target_os = "linux")] - fn lock_nonblocking(&self) {} - - #[cfg(not(target_os = "linux"))] - fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { - let ret = Guard { - fd: self.fd(), - guard: unsafe { self.inner.lock.lock() }, - }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); - ret - } -} - -#[cfg(windows)] type wrlen = libc::c_int; -#[cfg(not(windows))] type wrlen = libc::size_t; - -impl rtio::RtioTcpStream for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - let fd = self.fd(); - let dolock = || self.lock_nonblocking(); - let doread = |nb| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::recv(fd, - buf.as_mut_ptr() as *mut libc::c_void, - buf.len() as wrlen, - flags) as libc::c_int - }; - read(fd, self.read_deadline, dolock, doread) - } - - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let fd = self.fd(); - let dolock = || self.lock_nonblocking(); - let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::send(fd, - buf as *const _, - len as wrlen, - flags) as i64 - }; - match write(fd, self.write_deadline, buf, true, dolock, dowrite) { - Ok(_) => Ok(()), - Err(e) => Err(e) - } - } - fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getpeername) - } - fn control_congestion(&mut self) -> IoResult<()> { - self.set_nodelay(false) - } - fn nodelay(&mut self) -> IoResult<()> { - self.set_nodelay(true) - } - fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> { - self.set_keepalive(Some(delay_in_seconds)) - } - fn letdie(&mut self) -> IoResult<()> { - self.set_keepalive(None) - } - - fn clone(&self) -> Box<rtio::RtioTcpStream + Send> { - box TcpStream { - inner: self.inner.clone(), - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioTcpStream + Send> - } - - fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) - } - fn close_read(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) - } - - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - self.read_deadline = deadline; - self.write_deadline = deadline; - } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } -} - -impl rtio::RtioSocket for TcpStream { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -impl Drop for Inner { - fn drop(&mut self) { unsafe { os::close(self.fd); } } -} - -#[unsafe_destructor] -impl<'a> Drop for Guard<'a> { - fn drop(&mut self) { - assert!(util::set_nonblocking(self.fd, false).is_ok()); - } -} - -//////////////////////////////////////////////////////////////////////////////// -// TCP listeners -//////////////////////////////////////////////////////////////////////////////// - -pub struct TcpListener { - inner: Inner, -} - -impl TcpListener { - pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> { - let fd = try!(socket(addr, libc::SOCK_STREAM)); - let ret = TcpListener { inner: Inner::new(fd) }; - - let mut storage = unsafe { mem::zeroed() }; - let len = addr_to_sockaddr(addr, &mut storage); - let addrp = &storage as *const _ as *const libc::sockaddr; - - // On platforms with Berkeley-derived sockets, this allows - // to quickly rebind a socket, without needing to wait for - // the OS to clean up the previous one. - if cfg!(unix) { - try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, - 1 as libc::c_int)); - } - - match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(os::last_error()), - _ => Ok(ret), - } - } - - pub fn fd(&self) -> sock_t { self.inner.fd } - - pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> { - match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { - -1 => Err(os::last_error()), - - #[cfg(unix)] - _ => { - let (reader, writer) = try!(process::pipe()); - try!(util::set_nonblocking(reader.fd(), true)); - try!(util::set_nonblocking(writer.fd(), true)); - try!(util::set_nonblocking(self.fd(), true)); - Ok(TcpAcceptor { - inner: Arc::new(AcceptorInner { - listener: self, - reader: reader, - writer: writer, - closed: atomic::AtomicBool::new(false), - }), - deadline: 0, - }) - } - - #[cfg(windows)] - _ => { - let accept = try!(os::Event::new()); - let ret = unsafe { - c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) - }; - if ret != 0 { - return Err(os::last_error()) - } - Ok(TcpAcceptor { - inner: Arc::new(AcceptorInner { - listener: self, - abort: try!(os::Event::new()), - accept: accept, - closed: atomic::AtomicBool::new(false), - }), - deadline: 0, - }) - } - } - } -} - -impl rtio::RtioTcpListener for TcpListener { - fn listen(self: Box<TcpListener>) - -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> { - self.native_listen(128).map(|a| { - box a as Box<rtio::RtioTcpAcceptor + Send> - }) - } -} - -impl rtio::RtioSocket for TcpListener { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -pub struct TcpAcceptor { - inner: Arc<AcceptorInner>, - deadline: u64, -} - -#[cfg(unix)] -struct AcceptorInner { - listener: TcpListener, - reader: FileDesc, - writer: FileDesc, - closed: atomic::AtomicBool, -} - -#[cfg(windows)] -struct AcceptorInner { - listener: TcpListener, - abort: os::Event, - accept: os::Event, - closed: atomic::AtomicBool, -} - -impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.inner.listener.fd() } - - #[cfg(unix)] - pub fn native_accept(&mut self) -> IoResult<TcpStream> { - // In implementing accept, the two main concerns are dealing with - // close_accept() and timeouts. The unix implementation is based on a - // nonblocking accept plus a call to select(). Windows ends up having - // an entirely separate implementation than unix, which is explained - // below. - // - // To implement timeouts, all blocking is done via select() instead of - // accept() by putting the socket in non-blocking mode. Because - // select() takes a timeout argument, we just pass through the timeout - // to select(). - // - // To implement close_accept(), we have a self-pipe to ourselves which - // is passed to select() along with the socket being accepted on. The - // self-pipe is never written to unless close_accept() is called. - let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; - - while !self.inner.closed.load(atomic::SeqCst) { - match retry(|| unsafe { - libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) - }) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))), - } - try!(util::await([self.fd(), self.inner.reader.fd()], - deadline, util::Readable)); - } - - Err(util::eof()) - } - - #[cfg(windows)] - pub fn native_accept(&mut self) -> IoResult<TcpStream> { - // Unlink unix, windows cannot invoke `select` on arbitrary file - // descriptors like pipes, only sockets. Consequently, windows cannot - // use the same implementation as unix for accept() when close_accept() - // is considered. - // - // In order to implement close_accept() and timeouts, windows uses - // event handles. An acceptor-specific abort event is created which - // will only get set in close_accept(), and it will never be un-set. - // Additionally, another acceptor-specific event is associated with the - // FD_ACCEPT network event. - // - // These two events are then passed to WaitForMultipleEvents to see - // which one triggers first, and the timeout passed to this function is - // the local timeout for the acceptor. - // - // If the wait times out, then the accept timed out. If the wait - // succeeds with the abort event, then we were closed, and if the wait - // succeeds otherwise, then we do a nonblocking poll via `accept` to - // see if we can accept a connection. The connection is candidate to be - // stolen, so we do all of this in a loop as well. - let events = [self.inner.abort.handle(), self.inner.accept.handle()]; - - while !self.inner.closed.load(atomic::SeqCst) { - let ms = if self.deadline == 0 { - c::WSA_INFINITE as u64 - } else { - let now = ::io::timer::now(); - if self.deadline < now {0} else {self.deadline - now} - }; - let ret = unsafe { - c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, - ms as libc::DWORD, libc::FALSE) - }; - match ret { - c::WSA_WAIT_TIMEOUT => { - return Err(util::timeout("accept timed out")) - } - c::WSA_WAIT_FAILED => return Err(os::last_error()), - c::WSA_WAIT_EVENT_0 => break, - n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), - } - - let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; - let ret = unsafe { - c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) - }; - if ret != 0 { return Err(os::last_error()) } - - if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } - match unsafe { - libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) - } { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - - // Accepted sockets inherit the same properties as the caller, - // so we need to deregister our event and switch the socket back - // to blocking mode - fd => { - let stream = TcpStream::new(Inner::new(fd)); - let ret = unsafe { - c::WSAEventSelect(fd, events[1], 0) - }; - if ret != 0 { return Err(os::last_error()) } - try!(util::set_nonblocking(fd, false)); - return Ok(stream) - } - } - } - - Err(util::eof()) - } -} - -impl rtio::RtioSocket for TcpAcceptor { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -impl rtio::RtioTcpAcceptor for TcpAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>) - } - - fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } - fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - - fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> { - box TcpAcceptor { - inner: self.inner.clone(), - deadline: 0, - } as Box<rtio::RtioTcpAcceptor + Send> - } - - #[cfg(unix)] - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let mut fd = FileDesc::new(self.inner.writer.fd(), false); - match fd.inner_write([0]) { - Ok(..) => Ok(()), - Err(..) if util::wouldblock() => Ok(()), - Err(e) => Err(e), - } - } - - #[cfg(windows)] - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; - if ret == libc::TRUE { - Ok(()) - } else { - Err(os::last_error()) - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// UDP -//////////////////////////////////////////////////////////////////////////////// - -pub struct UdpSocket { - inner: Arc<Inner>, - read_deadline: u64, - write_deadline: u64, -} - -impl UdpSocket { - pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> { - let fd = try!(socket(addr, libc::SOCK_DGRAM)); - let ret = UdpSocket { - inner: Arc::new(Inner::new(fd)), - read_deadline: 0, - write_deadline: 0, - }; - - let mut storage = unsafe { mem::zeroed() }; - let len = addr_to_sockaddr(addr, &mut storage); - let addrp = &storage as *const _ as *const libc::sockaddr; - - match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(os::last_error()), - _ => Ok(ret), - } - } - - pub fn fd(&self) -> sock_t { self.inner.fd } - - pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, - on as libc::c_int) - } - - pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, - on as libc::c_int) - } - - pub fn set_membership(&mut self, addr: rtio::IpAddr, - opt: libc::c_int) -> IoResult<()> { - match ip_to_inaddr(addr) { - In4Addr(addr) => { - let mreq = libc::ip_mreq { - imr_multiaddr: addr, - // interface == INADDR_ANY - imr_interface: libc::in_addr { s_addr: 0x0 }, - }; - setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) - } - In6Addr(addr) => { - let mreq = libc::ip6_mreq { - ipv6mr_multiaddr: addr, - ipv6mr_interface: 0, - }; - setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) - } - } - } - - #[cfg(target_os = "linux")] - fn lock_nonblocking(&self) {} - - #[cfg(not(target_os = "linux"))] - fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { - let ret = Guard { - fd: self.fd(), - guard: unsafe { self.inner.lock.lock() }, - }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); - ret - } -} - -impl rtio::RtioSocket for UdpSocket { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -#[cfg(windows)] type msglen_t = libc::c_int; -#[cfg(unix)] type msglen_t = libc::size_t; - -impl rtio::RtioUdpSocket for UdpSocket { - fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> { - let fd = self.fd(); - let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; - let storagep = &mut storage as *mut _ as *mut libc::sockaddr; - let mut addrlen: libc::socklen_t = - mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; - - let dolock = || self.lock_nonblocking(); - let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::recvfrom(fd, - buf.as_mut_ptr() as *mut libc::c_void, - buf.len() as msglen_t, - flags, - storagep, - &mut addrlen) as libc::c_int - })); - sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { - Ok((n as uint, addr)) - }) - } - - fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> { - let mut storage = unsafe { mem::zeroed() }; - let dstlen = addr_to_sockaddr(dst, &mut storage); - let dstp = &storage as *const _ as *const libc::sockaddr; - - let fd = self.fd(); - let dolock = || self.lock_nonblocking(); - let dowrite = |nb, buf: *const u8, len: uint| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::sendto(fd, - buf as *const libc::c_void, - len as msglen_t, - flags, - dstp, - dstlen) as i64 - }; - - let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite)); - if n != buf.len() { - Err(util::short_write(n, "couldn't send entire packet at once")) - } else { - Ok(()) - } - } - - fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { - match multi { - rtio::Ipv4Addr(..) => { - self.set_membership(multi, libc::IP_ADD_MEMBERSHIP) - } - rtio::Ipv6Addr(..) => { - self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP) - } - } - } - fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { - match multi { - rtio::Ipv4Addr(..) => { - self.set_membership(multi, libc::IP_DROP_MEMBERSHIP) - } - rtio::Ipv6Addr(..) => { - self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP) - } - } - } - - fn loop_multicast_locally(&mut self) -> IoResult<()> { - self.set_multicast_loop(true) - } - fn dont_loop_multicast_locally(&mut self) -> IoResult<()> { - self.set_multicast_loop(false) - } - - fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, - ttl as libc::c_int) - } - fn time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) - } - - fn hear_broadcasts(&mut self) -> IoResult<()> { - self.set_broadcast(true) - } - fn ignore_broadcasts(&mut self) -> IoResult<()> { - self.set_broadcast(false) - } - - fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> { - box UdpSocket { - inner: self.inner.clone(), - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioUdpSocket + Send> - } - - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - self.read_deadline = deadline; - self.write_deadline = deadline; - } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Timeout helpers -// -// The read/write functions below are the helpers for reading/writing a socket -// with a possible deadline specified. This is generally viewed as a timed out -// I/O operation. -// -// From the application's perspective, timeouts apply to the I/O object, not to -// the underlying file descriptor (it's one timeout per object). This means that -// we can't use the SO_RCVTIMEO and corresponding send timeout option. -// -// The next idea to implement timeouts would be to use nonblocking I/O. An -// invocation of select() would wait (with a timeout) for a socket to be ready. -// Once its ready, we can perform the operation. Note that the operation *must* -// be nonblocking, even though select() says the socket is ready. This is -// because some other thread could have come and stolen our data (handles can be -// cloned). -// -// To implement nonblocking I/O, the first option we have is to use the -// O_NONBLOCK flag. Remember though that this is a global setting, affecting all -// I/O objects, so this was initially viewed as unwise. -// -// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to -// send/recv, but the niftiness wears off once you realize it only works well on -// Linux [1] [2]. This means that it's pretty easy to get a nonblocking -// operation on Linux (no flag fiddling, no affecting other objects), but not on -// other platforms. -// -// To work around this constraint on other platforms, we end up using the -// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this -// could cause other objects' blocking operations to suddenly become -// nonblocking. To get around this, a "blocking operation" which returns EAGAIN -// falls back to using the same code path as nonblocking operations, but with an -// infinite timeout (select + send/recv). This helps emulate blocking -// reads/writes despite the underlying descriptor being nonblocking, as well as -// optimizing the fast path of just hitting one syscall in the good case. -// -// As a final caveat, this implementation uses a mutex so only one thread is -// doing a nonblocking operation at at time. This is the operation that comes -// after the select() (at which point we think the socket is ready). This is -// done for sanity to ensure that the state of the O_NONBLOCK flag is what we -// expect (wouldn't want someone turning it on when it should be off!). All -// operations performed in the lock are *nonblocking* to avoid holding the mutex -// forever. -// -// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone -// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking -// reads/writes are still blocking. -// -// Fun, fun! -// -// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html -// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait - -pub fn read<T>(fd: sock_t, - deadline: u64, - lock: || -> T, - read: |bool| -> libc::c_int) -> IoResult<uint> { - let mut ret = -1; - if deadline == 0 { - ret = retry(|| read(false)); - } - - if deadline != 0 || (ret == -1 && util::wouldblock()) { - let deadline = match deadline { - 0 => None, - n => Some(n), - }; - loop { - // With a timeout, first we wait for the socket to become - // readable using select(), specifying the relevant timeout for - // our previously set deadline. - try!(util::await([fd], deadline, util::Readable)); - - // At this point, we're still within the timeout, and we've - // determined that the socket is readable (as returned by - // select). We must still read the socket in *nonblocking* mode - // because some other thread could come steal our data. If we - // fail to read some data, we retry (hence the outer loop) and - // wait for the socket to become readable again. - let _guard = lock(); - match retry(|| read(deadline.is_some())) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - n => { ret = n; break } - } - } - } - - match ret { - 0 => Err(util::eof()), - n if n < 0 => Err(os::last_error()), - n => Ok(n as uint) - } -} - -pub fn write<T>(fd: sock_t, - deadline: u64, - buf: &[u8], - write_everything: bool, - lock: || -> T, - write: |bool, *const u8, uint| -> i64) -> IoResult<uint> { - let mut ret = -1; - let mut written = 0; - if deadline == 0 { - if write_everything { - ret = keep_going(buf, |inner, len| { - written = buf.len() - len; - write(false, inner, len) - }); - } else { - ret = retry(|| { write(false, buf.as_ptr(), buf.len()) }); - if ret > 0 { written = ret as uint; } - } - } - - if deadline != 0 || (ret == -1 && util::wouldblock()) { - let deadline = match deadline { - 0 => None, - n => Some(n), - }; - while written < buf.len() && (write_everything || written == 0) { - // As with read(), first wait for the socket to be ready for - // the I/O operation. - match util::await([fd], deadline, util::Writable) { - Err(ref e) if e.code == libc::EOF as uint && written > 0 => { - assert!(deadline.is_some()); - return Err(util::short_write(written, "short write")) - } - Err(e) => return Err(e), - Ok(()) => {} - } - - // Also as with read(), we use MSG_DONTWAIT to guard ourselves - // against unforeseen circumstances. - let _guard = lock(); - let ptr = buf[written..].as_ptr(); - let len = buf.len() - written; - match retry(|| write(deadline.is_some(), ptr, len)) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - n => { written += n as uint; } - } - } - ret = 0; - } - if ret < 0 { - Err(os::last_error()) - } else { - Ok(written) - } -} - -#[cfg(windows)] -mod os { - use libc; - use std::mem; - use std::rt::rtio::{IoError, IoResult}; - - use io::c; - - pub type sock_t = libc::SOCKET; - pub struct Event(c::WSAEVENT); - - impl Event { - pub fn new() -> IoResult<Event> { - let event = unsafe { c::WSACreateEvent() }; - if event == c::WSA_INVALID_EVENT { - Err(last_error()) - } else { - Ok(Event(event)) - } - } - - pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } - } - - impl Drop for Event { - fn drop(&mut self) { - unsafe { let _ = c::WSACloseEvent(self.handle()); } - } - } - - pub fn init() { - unsafe { - use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - static mut INITIALIZED: bool = false; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - - let _guard = LOCK.lock(); - if !INITIALIZED { - let mut data: c::WSADATA = mem::zeroed(); - let ret = c::WSAStartup(0x202, // version 2.2 - &mut data); - assert_eq!(ret, 0); - INITIALIZED = true; - } - } - } - - pub fn last_error() -> IoError { - use std::os; - let code = unsafe { c::WSAGetLastError() as uint }; - IoError { - code: code, - extra: 0, - detail: Some(os::error_string(code)), - } - } - - pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } -} - -#[cfg(unix)] -mod os { - use libc; - use std::rt::rtio::IoError; - use io; - - pub type sock_t = io::file::fd_t; - - pub fn init() {} - pub fn last_error() -> IoError { io::last_error() } - pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } -} diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs deleted file mode 100644 index 48f31615339..00000000000 --- a/src/libnative/io/pipe_unix.rs +++ /dev/null @@ -1,339 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use alloc::arc::Arc; -use libc; -use std::c_str::CString; -use std::mem; -use std::rt::mutex; -use std::rt::rtio; -use std::rt::rtio::{IoResult, IoError}; -use std::sync::atomic; - -use super::retry; -use super::net; -use super::util; -use super::c; -use super::process; -use super::file::{fd_t, FileDesc}; - -fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { - match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { - -1 => Err(super::last_error()), - fd => Ok(fd) - } -} - -fn addr_to_sockaddr_un(addr: &CString, - storage: &mut libc::sockaddr_storage) - -> IoResult<libc::socklen_t> { - // the sun_path length is limited to SUN_LEN (with null) - assert!(mem::size_of::<libc::sockaddr_storage>() >= - mem::size_of::<libc::sockaddr_un>()); - let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) }; - - let len = addr.len(); - if len > s.sun_path.len() - 1 { - #[cfg(unix)] use libc::EINVAL as ERROR; - #[cfg(windows)] use libc::WSAEINVAL as ERROR; - return Err(IoError { - code: ERROR as uint, - extra: 0, - detail: Some("path must be smaller than SUN_LEN".to_string()), - }) - } - s.sun_family = libc::AF_UNIX as libc::sa_family_t; - for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) { - *slot = value; - } - - // count the null terminator - let len = mem::size_of::<libc::sa_family_t>() + len + 1; - return Ok(len as libc::socklen_t); -} - -struct Inner { - fd: fd_t, - - // Unused on Linux, where this lock is not necessary. - #[allow(dead_code)] - lock: mutex::NativeMutex -} - -impl Inner { - fn new(fd: fd_t) -> Inner { - Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } - } -} - -impl Drop for Inner { - fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } -} - -fn connect(addr: &CString, ty: libc::c_int, - timeout: Option<u64>) -> IoResult<Inner> { - let mut storage = unsafe { mem::zeroed() }; - let len = try!(addr_to_sockaddr_un(addr, &mut storage)); - let inner = Inner::new(try!(unix_socket(ty))); - let addrp = &storage as *const _ as *const libc::sockaddr; - - match timeout { - None => { - match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { - -1 => Err(super::last_error()), - _ => Ok(inner) - } - } - Some(timeout_ms) => { - try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); - Ok(inner) - } - } -} - -fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> { - let mut storage = unsafe { mem::zeroed() }; - let len = try!(addr_to_sockaddr_un(addr, &mut storage)); - let inner = Inner::new(try!(unix_socket(ty))); - let addrp = &storage as *const _ as *const libc::sockaddr; - match unsafe { - libc::bind(inner.fd, addrp, len) - } { - -1 => Err(super::last_error()), - _ => Ok(inner) - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Unix Streams -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixStream { - inner: Arc<Inner>, - read_deadline: u64, - write_deadline: u64, -} - -impl UnixStream { - pub fn connect(addr: &CString, - timeout: Option<u64>) -> IoResult<UnixStream> { - connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { - UnixStream::new(Arc::new(inner)) - }) - } - - fn new(inner: Arc<Inner>) -> UnixStream { - UnixStream { - inner: inner, - read_deadline: 0, - write_deadline: 0, - } - } - - fn fd(&self) -> fd_t { self.inner.fd } - - #[cfg(target_os = "linux")] - fn lock_nonblocking(&self) {} - - #[cfg(not(target_os = "linux"))] - fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { - let ret = net::Guard { - fd: self.fd(), - guard: unsafe { self.inner.lock.lock() }, - }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); - ret - } -} - -impl rtio::RtioPipe for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - let fd = self.fd(); - let dolock = || self.lock_nonblocking(); - let doread = |nb| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::recv(fd, - buf.as_mut_ptr() as *mut libc::c_void, - buf.len() as libc::size_t, - flags) as libc::c_int - }; - net::read(fd, self.read_deadline, dolock, doread) - } - - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let fd = self.fd(); - let dolock = || self.lock_nonblocking(); - let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { - let flags = if nb {c::MSG_DONTWAIT} else {0}; - libc::send(fd, - buf as *const _, - len as libc::size_t, - flags) as i64 - }; - match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) { - Ok(_) => Ok(()), - Err(e) => Err(e) - } - } - - fn clone(&self) -> Box<rtio::RtioPipe + Send> { - box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send> - } - - fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) - } - fn close_read(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) - } - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - self.read_deadline = deadline; - self.write_deadline = deadline; - } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Unix Listener -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixListener { - inner: Inner, - path: CString, -} - -impl UnixListener { - pub fn bind(addr: &CString) -> IoResult<UnixListener> { - bind(addr, libc::SOCK_STREAM).map(|fd| { - UnixListener { inner: fd, path: addr.clone() } - }) - } - - fn fd(&self) -> fd_t { self.inner.fd } - - pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> { - match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { - -1 => Err(super::last_error()), - - #[cfg(unix)] - _ => { - let (reader, writer) = try!(process::pipe()); - try!(util::set_nonblocking(reader.fd(), true)); - try!(util::set_nonblocking(writer.fd(), true)); - try!(util::set_nonblocking(self.fd(), true)); - Ok(UnixAcceptor { - inner: Arc::new(AcceptorInner { - listener: self, - reader: reader, - writer: writer, - closed: atomic::AtomicBool::new(false), - }), - deadline: 0, - }) - } - } - } -} - -impl rtio::RtioUnixListener for UnixListener { - fn listen(self: Box<UnixListener>) - -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> { - self.native_listen(128).map(|a| { - box a as Box<rtio::RtioUnixAcceptor + Send> - }) - } -} - -pub struct UnixAcceptor { - inner: Arc<AcceptorInner>, - deadline: u64, -} - -#[cfg(unix)] -struct AcceptorInner { - listener: UnixListener, - reader: FileDesc, - writer: FileDesc, - closed: atomic::AtomicBool, -} - -impl UnixAcceptor { - fn fd(&self) -> fd_t { self.inner.listener.fd() } - - pub fn native_accept(&mut self) -> IoResult<UnixStream> { - let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; - - while !self.inner.closed.load(atomic::SeqCst) { - unsafe { - let mut storage: libc::sockaddr_storage = mem::zeroed(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::<libc::sockaddr_storage>(); - let mut size = size as libc::socklen_t; - match retry(|| { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) { - -1 if util::wouldblock() => {} - -1 => return Err(super::last_error()), - fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), - } - } - try!(util::await([self.fd(), self.inner.reader.fd()], - deadline, util::Readable)); - } - - Err(util::eof()) - } -} - -impl rtio::RtioUnixAcceptor for UnixAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>) - } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - - fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { - box UnixAcceptor { - inner: self.inner.clone(), - deadline: 0, - } as Box<rtio::RtioUnixAcceptor + Send> - } - - #[cfg(unix)] - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let mut fd = FileDesc::new(self.inner.writer.fd(), false); - match fd.inner_write([0]) { - Ok(..) => Ok(()), - Err(..) if util::wouldblock() => Ok(()), - Err(e) => Err(e), - } - } -} - -impl Drop for UnixListener { - fn drop(&mut self) { - // Unlink the path to the socket to ensure that it doesn't linger. We're - // careful to unlink the path before we close the file descriptor to - // prevent races where we unlink someone else's path. - unsafe { - let _ = libc::unlink(self.path.as_ptr()); - } - } -} diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs deleted file mode 100644 index f764470f37d..00000000000 --- a/src/libnative/io/pipe_windows.rs +++ /dev/null @@ -1,769 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Named pipes implementation for windows -//! -//! If are unfortunate enough to be reading this code, I would like to first -//! apologize. This was my first encounter with windows named pipes, and it -//! didn't exactly turn out very cleanly. If you, too, are new to named pipes, -//! read on as I'll try to explain some fun things that I ran into. -//! -//! # Unix pipes vs Named pipes -//! -//! As with everything else, named pipes on windows are pretty different from -//! unix pipes on unix. On unix, you use one "server pipe" to accept new client -//! pipes. So long as this server pipe is active, new children pipes can -//! connect. On windows, you instead have a number of "server pipes", and each -//! of these server pipes can throughout their lifetime be attached to a client -//! or not. Once attached to a client, a server pipe may then disconnect at a -//! later date. -//! -//! # Accepting clients -//! -//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces -//! are built around the unix flavors. This means that we have one "server -//! pipe" to which many clients can connect. In order to make this compatible -//! with the windows model, each connected client consumes ownership of a server -//! pipe, and then a new server pipe is created for the next client. -//! -//! Note that the server pipes attached to clients are never given back to the -//! listener for recycling. This could possibly be implemented with a channel so -//! the listener half can re-use server pipes, but for now I err'd on the simple -//! side of things. Each stream accepted by a listener will destroy the server -//! pipe after the stream is dropped. -//! -//! This model ends up having a small race or two, and you can find more details -//! on the `native_accept` method. -//! -//! # Simultaneous reads and writes -//! -//! In testing, I found that two simultaneous writes and two simultaneous reads -//! on a pipe ended up working out just fine, but problems were encountered when -//! a read was executed simultaneously with a write. After some googling around, -//! it sounded like named pipes just weren't built for this kind of interaction, -//! and the suggested solution was to use overlapped I/O. -//! -//! I don't really know what overlapped I/O is, but my basic understanding after -//! reading about it is that you have an external Event which is used to signal -//! I/O completion, passed around in some OVERLAPPED structures. As to what this -//! is, I'm not exactly sure. -//! -//! This problem implies that all named pipes are created with the -//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is -//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and -//! inside of this structure is a HANDLE from CreateEvent. After the I/O is -//! determined to be pending (may complete in the future), the -//! GetOverlappedResult function is used to block on the event, waiting for the -//! I/O to finish. -//! -//! This scheme ended up working well enough. There were two snags that I ran -//! into, however: -//! -//! * Each UnixStream instance needs its own read/write events to wait on. These -//! can't be shared among clones of the same stream because the documentation -//! states that it unsets the event when the I/O is started (would possibly -//! corrupt other events simultaneously waiting). For convenience's sake, -//! these events are lazily initialized. -//! -//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition -//! to all pipes created through `connect`. Notably this means that the -//! ConnectNamedPipe function is nonblocking, implying that the Listener needs -//! to have yet another event to do the actual blocking. -//! -//! # Conclusion -//! -//! The conclusion here is that I probably don't know the best way to work with -//! windows named pipes, but the solution here seems to work well enough to get -//! the test suite passing (the suite is in libstd), and that's good enough for -//! me! - -use alloc::arc::Arc; -use libc; -use std::c_str::CString; -use std::mem; -use std::os; -use std::ptr; -use std::rt::rtio; -use std::rt::rtio::{IoResult, IoError}; -use std::sync::atomic; -use std::rt::mutex; - -use super::c; -use super::util; -use super::file::to_utf16; - -struct Event(libc::HANDLE); - -impl Event { - fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> { - let event = unsafe { - libc::CreateEventW(ptr::null_mut(), - manual_reset as libc::BOOL, - initial_state as libc::BOOL, - ptr::null()) - }; - if event as uint == 0 { - Err(super::last_error()) - } else { - Ok(Event(event)) - } - } - - fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } -} - -impl Drop for Event { - fn drop(&mut self) { - unsafe { let _ = libc::CloseHandle(self.handle()); } - } -} - -struct Inner { - handle: libc::HANDLE, - lock: mutex::NativeMutex, - read_closed: atomic::AtomicBool, - write_closed: atomic::AtomicBool, -} - -impl Inner { - fn new(handle: libc::HANDLE) -> Inner { - Inner { - handle: handle, - lock: unsafe { mutex::NativeMutex::new() }, - read_closed: atomic::AtomicBool::new(false), - write_closed: atomic::AtomicBool::new(false), - } - } -} - -impl Drop for Inner { - fn drop(&mut self) { - unsafe { - let _ = libc::FlushFileBuffers(self.handle); - let _ = libc::CloseHandle(self.handle); - } - } -} - -unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE { - libc::CreateNamedPipeW( - name, - libc::PIPE_ACCESS_DUPLEX | - if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} | - libc::FILE_FLAG_OVERLAPPED, - libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | - libc::PIPE_WAIT, - libc::PIPE_UNLIMITED_INSTANCES, - 65536, - 65536, - 0, - ptr::null_mut() - ) -} - -pub fn await(handle: libc::HANDLE, deadline: u64, - events: &[libc::HANDLE]) -> IoResult<uint> { - use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0}; - - // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo - // to figure out if we should indeed get the result. - let ms = if deadline == 0 { - libc::INFINITE as u64 - } else { - let now = ::io::timer::now(); - if deadline < now {0} else {deadline - now} - }; - let ret = unsafe { - c::WaitForMultipleObjects(events.len() as libc::DWORD, - events.as_ptr(), - libc::FALSE, - ms as libc::DWORD) - }; - match ret { - WAIT_FAILED => Err(super::last_error()), - WAIT_TIMEOUT => unsafe { - let _ = c::CancelIo(handle); - Err(util::timeout("operation timed out")) - }, - n => Ok((n - WAIT_OBJECT_0) as uint) - } -} - -fn epipe() -> IoError { - IoError { - code: libc::ERROR_BROKEN_PIPE as uint, - extra: 0, - detail: None, - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Unix Streams -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixStream { - inner: Arc<Inner>, - write: Option<Event>, - read: Option<Event>, - read_deadline: u64, - write_deadline: u64, -} - -impl UnixStream { - fn try_connect(p: *const u16) -> Option<libc::HANDLE> { - // Note that most of this is lifted from the libuv implementation. - // The idea is that if we fail to open a pipe in read/write mode - // that we try afterwards in just read or just write - let mut result = unsafe { - libc::CreateFileW(p, - libc::GENERIC_READ | libc::GENERIC_WRITE, - 0, - ptr::null_mut(), - libc::OPEN_EXISTING, - libc::FILE_FLAG_OVERLAPPED, - ptr::null_mut()) - }; - if result != libc::INVALID_HANDLE_VALUE { - return Some(result) - } - - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { - result = unsafe { - libc::CreateFileW(p, - libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES, - 0, - ptr::null_mut(), - libc::OPEN_EXISTING, - libc::FILE_FLAG_OVERLAPPED, - ptr::null_mut()) - }; - if result != libc::INVALID_HANDLE_VALUE { - return Some(result) - } - } - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { - result = unsafe { - libc::CreateFileW(p, - libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES, - 0, - ptr::null_mut(), - libc::OPEN_EXISTING, - libc::FILE_FLAG_OVERLAPPED, - ptr::null_mut()) - }; - if result != libc::INVALID_HANDLE_VALUE { - return Some(result) - } - } - None - } - - pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> { - let addr = try!(to_utf16(addr)); - let start = ::io::timer::now(); - loop { - match UnixStream::try_connect(addr.as_ptr()) { - Some(handle) => { - let inner = Inner::new(handle); - let mut mode = libc::PIPE_TYPE_BYTE | - libc::PIPE_READMODE_BYTE | - libc::PIPE_WAIT; - let ret = unsafe { - libc::SetNamedPipeHandleState(inner.handle, - &mut mode, - ptr::null_mut(), - ptr::null_mut()) - }; - return if ret == 0 { - Err(super::last_error()) - } else { - Ok(UnixStream { - inner: Arc::new(inner), - read: None, - write: None, - read_deadline: 0, - write_deadline: 0, - }) - } - } - None => {} - } - - // On windows, if you fail to connect, you may need to call the - // `WaitNamedPipe` function, and this is indicated with an error - // code of ERROR_PIPE_BUSY. - let code = unsafe { libc::GetLastError() }; - if code as int != libc::ERROR_PIPE_BUSY as int { - return Err(super::last_error()) - } - - match timeout { - Some(timeout) => { - let now = ::io::timer::now(); - let timed_out = (now - start) >= timeout || unsafe { - let ms = (timeout - (now - start)) as libc::DWORD; - libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0 - }; - if timed_out { - return Err(util::timeout("connect timed out")) - } - } - - // An example I found on Microsoft's website used 20 - // seconds, libuv uses 30 seconds, hence we make the - // obvious choice of waiting for 25 seconds. - None => { - if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 { - return Err(super::last_error()) - } - } - } - } - } - - fn handle(&self) -> libc::HANDLE { self.inner.handle } - - fn read_closed(&self) -> bool { - self.inner.read_closed.load(atomic::SeqCst) - } - - fn write_closed(&self) -> bool { - self.inner.write_closed.load(atomic::SeqCst) - } - - fn cancel_io(&self) -> IoResult<()> { - match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } { - 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => { - Ok(()) - } - 0 => Err(super::last_error()), - _ => Ok(()) - } - } -} - -impl rtio::RtioPipe for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - if self.read.is_none() { - self.read = Some(try!(Event::new(true, false))); - } - - let mut bytes_read = 0; - let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; - overlapped.hEvent = self.read.as_ref().unwrap().handle(); - - // Pre-flight check to see if the reading half has been closed. This - // must be done before issuing the ReadFile request, but after we - // acquire the lock. - // - // See comments in close_read() about why this lock is necessary. - let guard = unsafe { self.inner.lock.lock() }; - if self.read_closed() { - return Err(util::eof()) - } - - // Issue a nonblocking requests, succeeding quickly if it happened to - // succeed. - let ret = unsafe { - libc::ReadFile(self.handle(), - buf.as_ptr() as libc::LPVOID, - buf.len() as libc::DWORD, - &mut bytes_read, - &mut overlapped) - }; - if ret != 0 { return Ok(bytes_read as uint) } - - // If our errno doesn't say that the I/O is pending, then we hit some - // legitimate error and return immediately. - if os::errno() != libc::ERROR_IO_PENDING as uint { - return Err(super::last_error()) - } - - // Now that we've issued a successful nonblocking request, we need to - // wait for it to finish. This can all be done outside the lock because - // we'll see any invocation of CancelIoEx. We also call this in a loop - // because we're woken up if the writing half is closed, we just need to - // realize that the reading half wasn't closed and we go right back to - // sleep. - drop(guard); - loop { - // Process a timeout if one is pending - let wait_succeeded = await(self.handle(), self.read_deadline, - [overlapped.hEvent]); - - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_read, - libc::TRUE) - }; - // If we succeeded, or we failed for some reason other than - // CancelIoEx, return immediately - if ret != 0 { return Ok(bytes_read as uint) } - if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { - return Err(super::last_error()) - } - - // If the reading half is now closed, then we're done. If we woke up - // because the writing half was closed, keep trying. - if wait_succeeded.is_err() { - return Err(util::timeout("read timed out")) - } - if self.read_closed() { - return Err(util::eof()) - } - } - } - - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - if self.write.is_none() { - self.write = Some(try!(Event::new(true, false))); - } - - let mut offset = 0; - let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; - overlapped.hEvent = self.write.as_ref().unwrap().handle(); - - while offset < buf.len() { - let mut bytes_written = 0; - - // This sequence below is quite similar to the one found in read(). - // Some careful looping is done to ensure that if close_write() is - // invoked we bail out early, and if close_read() is invoked we keep - // going after we woke up. - // - // See comments in close_read() about why this lock is necessary. - let guard = unsafe { self.inner.lock.lock() }; - if self.write_closed() { - return Err(epipe()) - } - let ret = unsafe { - libc::WriteFile(self.handle(), - buf[offset..].as_ptr() as libc::LPVOID, - (buf.len() - offset) as libc::DWORD, - &mut bytes_written, - &mut overlapped) - }; - let err = os::errno(); - drop(guard); - - if ret == 0 { - if err != libc::ERROR_IO_PENDING as uint { - return Err(IoError { - code: err as uint, - extra: 0, - detail: Some(os::error_string(err as uint)), - }) - } - // Process a timeout if one is pending - let wait_succeeded = await(self.handle(), self.write_deadline, - [overlapped.hEvent]); - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_written, - libc::TRUE) - }; - // If we weren't aborted, this was a legit error, if we were - // aborted, then check to see if the write half was actually - // closed or whether we woke up from the read half closing. - if ret == 0 { - if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { - return Err(super::last_error()) - } - if !wait_succeeded.is_ok() { - let amt = offset + bytes_written as uint; - return if amt > 0 { - Err(IoError { - code: libc::ERROR_OPERATION_ABORTED as uint, - extra: amt, - detail: Some("short write during write".to_string()), - }) - } else { - Err(util::timeout("write timed out")) - } - } - if self.write_closed() { - return Err(epipe()) - } - continue // retry - } - } - offset += bytes_written as uint; - } - Ok(()) - } - - fn clone(&self) -> Box<rtio::RtioPipe + Send> { - box UnixStream { - inner: self.inner.clone(), - read: None, - write: None, - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioPipe + Send> - } - - fn close_read(&mut self) -> IoResult<()> { - // On windows, there's no actual shutdown() method for pipes, so we're - // forced to emulate the behavior manually at the application level. To - // do this, we need to both cancel any pending requests, as well as - // prevent all future requests from succeeding. These two operations are - // not atomic with respect to one another, so we must use a lock to do - // so. - // - // The read() code looks like: - // - // 1. Make sure the pipe is still open - // 2. Submit a read request - // 3. Wait for the read request to finish - // - // The race this lock is preventing is if another thread invokes - // close_read() between steps 1 and 2. By atomically executing steps 1 - // and 2 with a lock with respect to close_read(), we're guaranteed that - // no thread will erroneously sit in a read forever. - let _guard = unsafe { self.inner.lock.lock() }; - self.inner.read_closed.store(true, atomic::SeqCst); - self.cancel_io() - } - - fn close_write(&mut self) -> IoResult<()> { - // see comments in close_read() for why this lock is necessary - let _guard = unsafe { self.inner.lock.lock() }; - self.inner.write_closed.store(true, atomic::SeqCst); - self.cancel_io() - } - - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - self.read_deadline = deadline; - self.write_deadline = deadline; - } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Unix Listener -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixListener { - handle: libc::HANDLE, - name: CString, -} - -impl UnixListener { - pub fn bind(addr: &CString) -> IoResult<UnixListener> { - // Although we technically don't need the pipe until much later, we - // create the initial handle up front to test the validity of the name - // and such. - let addr_v = try!(to_utf16(addr)); - let ret = unsafe { pipe(addr_v.as_ptr(), true) }; - if ret == libc::INVALID_HANDLE_VALUE { - Err(super::last_error()) - } else { - Ok(UnixListener { handle: ret, name: addr.clone() }) - } - } - - pub fn native_listen(self) -> IoResult<UnixAcceptor> { - Ok(UnixAcceptor { - listener: self, - event: try!(Event::new(true, false)), - deadline: 0, - inner: Arc::new(AcceptorState { - abort: try!(Event::new(true, false)), - closed: atomic::AtomicBool::new(false), - }), - }) - } -} - -impl Drop for UnixListener { - fn drop(&mut self) { - unsafe { let _ = libc::CloseHandle(self.handle); } - } -} - -impl rtio::RtioUnixListener for UnixListener { - fn listen(self: Box<UnixListener>) - -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> { - self.native_listen().map(|a| { - box a as Box<rtio::RtioUnixAcceptor + Send> - }) - } -} - -pub struct UnixAcceptor { - inner: Arc<AcceptorState>, - listener: UnixListener, - event: Event, - deadline: u64, -} - -struct AcceptorState { - abort: Event, - closed: atomic::AtomicBool, -} - -impl UnixAcceptor { - pub fn native_accept(&mut self) -> IoResult<UnixStream> { - // This function has some funky implementation details when working with - // unix pipes. On windows, each server named pipe handle can be - // connected to a one or zero clients. To the best of my knowledge, a - // named server is considered active and present if there exists at - // least one server named pipe for it. - // - // The model of this function is to take the current known server - // handle, connect a client to it, and then transfer ownership to the - // UnixStream instance. The next time accept() is invoked, it'll need a - // different server handle to connect a client to. - // - // Note that there is a possible race here. Once our server pipe is - // handed off to a `UnixStream` object, the stream could be closed, - // meaning that there would be no active server pipes, hence even though - // we have a valid `UnixAcceptor`, no one can connect to it. For this - // reason, we generate the next accept call's server pipe at the end of - // this function call. - // - // This provides us an invariant that we always have at least one server - // connection open at a time, meaning that all connects to this acceptor - // should succeed while this is active. - // - // The actual implementation of doing this is a little tricky. Once a - // server pipe is created, a client can connect to it at any time. I - // assume that which server a client connects to is nondeterministic, so - // we also need to guarantee that the only server able to be connected - // to is the one that we're calling ConnectNamedPipe on. This means that - // we have to create the second server pipe *after* we've already - // accepted a connection. In order to at least somewhat gracefully - // handle errors, this means that if the second server pipe creation - // fails that we disconnect the connected client and then just keep - // using the original server pipe. - let handle = self.listener.handle; - - // If we've had an artificial call to close_accept, be sure to never - // proceed in accepting new clients in the future - if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) } - - let name = try!(to_utf16(&self.listener.name)); - - // Once we've got a "server handle", we need to wait for a client to - // connect. The ConnectNamedPipe function will block this thread until - // someone on the other end connects. This function can "fail" if a - // client connects after we created the pipe but before we got down - // here. Thanks windows. - let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; - overlapped.hEvent = self.event.handle(); - if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { - let mut err = unsafe { libc::GetLastError() }; - - if err == libc::ERROR_IO_PENDING as libc::DWORD { - // Process a timeout if one is pending - let wait_succeeded = await(handle, self.deadline, - [self.inner.abort.handle(), - overlapped.hEvent]); - - // This will block until the overlapped I/O is completed. The - // timeout was previously handled, so this will either block in - // the normal case or succeed very quickly in the timeout case. - let ret = unsafe { - let mut transfer = 0; - libc::GetOverlappedResult(handle, - &mut overlapped, - &mut transfer, - libc::TRUE) - }; - if ret == 0 { - if wait_succeeded.is_ok() { - err = unsafe { libc::GetLastError() }; - } else { - return Err(util::timeout("accept timed out")) - } - } else { - // we succeeded, bypass the check below - err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; - } - } - if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD { - return Err(super::last_error()) - } - } - - // Now that we've got a connected client to our handle, we need to - // create a second server pipe. If this fails, we disconnect the - // connected client and return an error (see comments above). - let new_handle = unsafe { pipe(name.as_ptr(), false) }; - if new_handle == libc::INVALID_HANDLE_VALUE { - let ret = Err(super::last_error()); - // If our disconnection fails, then there's not really a whole lot - // that we can do, so panic - let err = unsafe { libc::DisconnectNamedPipe(handle) }; - assert!(err != 0); - return ret; - } else { - self.listener.handle = new_handle; - } - - // Transfer ownership of our handle into this stream - Ok(UnixStream { - inner: Arc::new(Inner::new(handle)), - read: None, - write: None, - read_deadline: 0, - write_deadline: 0, - }) - } -} - -impl rtio::RtioUnixAcceptor for UnixAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>) - } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); - } - - fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { - let name = to_utf16(&self.listener.name).ok().unwrap(); - box UnixAcceptor { - inner: self.inner.clone(), - event: Event::new(true, false).ok().unwrap(), - deadline: 0, - listener: UnixListener { - name: self.listener.name.clone(), - handle: unsafe { - let p = pipe(name.as_ptr(), false) ; - assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE); - p - }, - }, - } as Box<rtio::RtioUnixAcceptor + Send> - } - - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let ret = unsafe { - c::SetEvent(self.inner.abort.handle()) - }; - if ret == 0 { - Err(super::last_error()) - } else { - Ok(()) - } - } -} - |
