diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-04-22 18:38:59 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-04-24 16:24:09 -0700 |
| commit | 6328f7c199a1697aaee7e5fe2b397c457e6c311a (patch) | |
| tree | 21b8afe559302ed90be9dbb725c700513d9cfe7b /src/libnative | |
| parent | 67ee480936947aa5b1953b7b6e48a0c7a191501e (diff) | |
| download | rust-6328f7c199a1697aaee7e5fe2b397c457e6c311a.tar.gz rust-6328f7c199a1697aaee7e5fe2b397c457e6c311a.zip | |
std: Add timeouts to unix connect/accept
This adds support for connecting to a unix socket with a timeout (a named pipe on windows), and accepting a connection with a timeout. The goal is to bring unix pipes/named sockets back in line with TCP support for timeouts. Similarly to the TCP sockets, all methods are marked #[experimental] due to uncertainty about the type of the timeout argument. This internally involved a good bit of refactoring to share as much code as possible between TCP servers and pipe servers, but the core implementation did not change drastically as part of this commit. cc #13523
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/c_win32.rs | 2 | ||||
| -rw-r--r-- | src/libnative/io/mod.rs | 6 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 128 | ||||
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 59 | ||||
| -rw-r--r-- | src/libnative/io/pipe_win32.rs | 56 | ||||
| -rw-r--r-- | src/libnative/io/util.rs | 136 |
6 files changed, 228 insertions, 159 deletions
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index dbbb39b3b7b..6c84424e97a 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -59,4 +59,6 @@ extern "system" { optname: libc::c_int, optval: *mut libc::c_char, optlen: *mut libc::c_int) -> libc::c_int; + + pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 19cb5c5f1d4..944766e8fd0 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -44,6 +44,7 @@ pub use self::process::Process; pub mod addrinfo; pub mod net; pub mod process; +mod util; #[cfg(unix)] #[path = "file_unix.rs"] @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory { fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> { pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send) } - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> { - pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send) + fn unix_connect(&mut self, path: &CString, + timeout: Option<u64>) -> IoResult<~RtioPipe:Send> { + pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 93ec23e32ad..cc41da846b2 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -13,13 +13,12 @@ use std::cast; use std::io::net::ip; use std::io; use std::mem; -use std::os; -use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; use super::{IoResult, retry, keep_going}; use super::c; +use super::util; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, } } -fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, - val: libc::c_int) -> IoResult<T> { +pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult<T> { unsafe { let mut slot: T = mem::init(); let mut len = mem::size_of::<T>() as libc::socklen_t; @@ -145,21 +144,6 @@ fn last_error() -> io::IoError { super::last_error() } -fn ms_to_timeval(ms: u64) -> libc::timeval { - libc::timeval { - tv_sec: (ms / 1000) as libc::time_t, - tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, - } -} - -fn timeout(desc: &'static str) -> io::IoError { - io::IoError { - kind: io::TimedOut, - desc: desc, - detail: None, - } -} - #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -270,7 +254,7 @@ impl TcpStream { let addrp = &addr as *_ as *libc::sockaddr; match timeout { Some(timeout) => { - try!(TcpStream::connect_timeout(fd, addrp, len, timeout)); + try!(util::connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { @@ -282,84 +266,6 @@ impl TcpStream { } } - // See http://developerweb.net/viewtopic.php?id=3196 for where this is - // derived from. - fn connect_timeout(fd: sock_t, - addrp: *libc::sockaddr, - len: libc::socklen_t, - timeout_ms: u64) -> IoResult<()> { - #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; - #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; - #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; - #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; - - // Make sure the call to connect() doesn't block - try!(set_nonblocking(fd, true)); - - let ret = match unsafe { libc::connect(fd, addrp, len) } { - // If the connection is in progress, then we need to wait for it to - // finish (with a timeout). The current strategy for doing this is - // to use select() with a timeout. - -1 if os::errno() as int == INPROGRESS as int || - os::errno() as int == WOULDBLOCK as int => { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, fd); - match await(fd, &mut set, timeout_ms) { - 0 => Err(timeout("connection timed out")), - -1 => Err(last_error()), - _ => { - let err: libc::c_int = try!( - getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); - if err == 0 { - Ok(()) - } else { - Err(io::IoError::from_errno(err as uint, true)) - } - } - } - } - - -1 => Err(last_error()), - _ => Ok(()), - }; - - // be sure to turn blocking I/O back on - try!(set_nonblocking(fd, false)); - return ret; - - #[cfg(unix)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) - } - #[cfg(windows)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let mut set = nb as libc::c_ulong; - if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { - Err(last_error()) - } else { - Ok(()) - } - } - - #[cfg(unix)] - fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let start = ::io::timer::now(); - retry(|| unsafe { - // Recalculate the timeout each iteration (it is generally - // undefined what the value of the 'tv' is after select - // returns EINTR). - let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); - c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv) - }) - } - #[cfg(windows)] - fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let tv = ms_to_timeval(timeout); - unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } - } - } - pub fn fd(&self) -> sock_t { // This unsafety is fine because it's just a read-only arc unsafe { (*self.inner.get()).fd } @@ -533,7 +439,7 @@ impl TcpAcceptor { pub fn native_accept(&mut self) -> IoResult<TcpStream> { if self.deadline != 0 { - try!(self.accept_deadline()); + try!(util::accept_deadline(self.fd(), self.deadline)); } unsafe { let mut storage: libc::sockaddr_storage = mem::init(); @@ -550,25 +456,6 @@ impl TcpAcceptor { } } } - - fn accept_deadline(&mut self) -> IoResult<()> { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, self.fd()); - - match retry(|| { - // If we're past the deadline, then pass a 0 timeout to select() so - // we can poll the status of the socket. - let now = ::io::timer::now(); - let ms = if self.deadline > now {0} else {self.deadline - now}; - let tv = ms_to_timeval(ms); - let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1}; - unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } - }) { - -1 => Err(last_error()), - 0 => Err(timeout("accept timed out")), - _ => return Ok(()), - } - } } impl rtio::RtioSocket for TcpAcceptor { @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = match timeout { - None => 0, - Some(t) => ::io::timer::now() + t, - }; + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 5d13a6b5fc5..190cae05d43 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,16 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use std::c_str::CString; use std::cast; +use std::intrinsics; use std::io; -use libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; use super::{IoResult, retry, keep_going}; +use super::util; use super::file::fd_t; fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { @@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint return Ok((storage, len)); } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult<CString> { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::<libc::sockaddr_un>()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) - } - } - _ => Err(io::standard_error(io::InvalidInput)) - } -} - struct Inner { fd: fd_t, } @@ -76,16 +61,24 @@ impl Drop for Inner { fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } } -fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> { +fn connect(addr: &CString, ty: libc::c_int, + timeout: Option<u64>) -> IoResult<Inner> { let (addr, len) = try!(addr_to_sockaddr_un(addr)); let inner = Inner { fd: try!(unix_socket(ty)) }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| unsafe { - libc::connect(inner.fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => Err(super::last_error()), - _ => Ok(inner) + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + match timeout { + None => { + match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } + } + Some(timeout_ms) => { + try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); + Ok(inner) + } } } @@ -110,8 +103,9 @@ pub struct UnixStream { } impl UnixStream { - pub fn connect(addr: &CString) -> IoResult<UnixStream> { - connect(addr, libc::SOCK_STREAM).map(|inner| { + pub fn connect(addr: &CString, + timeout: Option<u64>) -> IoResult<UnixStream> { + connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { UnixStream { inner: UnsafeArc::new(inner) } }) } @@ -176,7 +170,7 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self }) + _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) } } } @@ -189,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, + deadline: u64, } impl UnixAcceptor { fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult<UnixStream> { + if self.deadline != 0 { + try!(util::accept_deadline(self.fd(), self.deadline)); + } let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; let storagep = &mut storage as *mut libc::sockaddr_storage; let size = mem::size_of::<libc::sockaddr_storage>(); @@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index 84b3d887c04..a4f09ded0ac 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc; use std::intrinsics; use super::IoResult; +use super::c; +use super::util; struct Event(libc::HANDLE); @@ -210,8 +212,9 @@ impl UnixStream { None } - pub fn connect(addr: &CString) -> IoResult<UnixStream> { + pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> { as_utf16_p(addr.as_str().unwrap(), |p| { + let start = ::io::timer::now(); loop { match UnixStream::try_connect(p) { Some(handle) => { @@ -246,11 +249,26 @@ impl UnixStream { return Err(super::last_error()) } - // An example I found on microsoft's website used 20 seconds, - // libuv uses 30 seconds, hence we make the obvious choice of - // waiting for 25 seconds. - if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { - return Err(super::last_error()) + match timeout { + Some(timeout) => { + let now = ::io::timer::now(); + let timed_out = (now - start) >= timeout || unsafe { + let ms = (timeout - (now - start)) as libc::DWORD; + libc::WaitNamedPipeW(p, ms) == 0 + }; + if timed_out { + return Err(util::timeout("connect timed out")) + } + } + + // An example I found on microsoft's website used 20 + // seconds, libuv uses 30 seconds, hence we make the + // obvious choice of waiting for 25 seconds. + None => { + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } } } }) @@ -372,6 +390,7 @@ impl UnixListener { Ok(UnixAcceptor { listener: self, event: try!(Event::new(true, false)), + deadline: 0, }) } } @@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, event: Event, + deadline: u64, } impl UnixAcceptor { @@ -438,7 +458,28 @@ impl UnixAcceptor { overlapped.hEvent = self.event.handle(); if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + // If we've got a timeout, use WaitForSingleObject in tandem + // with CancelIo to figure out if we should indeed get the + // result. + if self.deadline != 0 { + let now = ::io::timer::now(); + let timeout = self.deadline < now || unsafe { + let ms = (self.deadline - now) as libc::DWORD; + let r = libc::WaitForSingleObject(overlapped.hEvent, + ms); + r != libc::WAIT_OBJECT_0 + }; + if timeout { + unsafe { let _ = c::CancelIo(handle); } + return Err(util::timeout("accept timed out")) + } + } + + // This will block until the overlapped I/O is completed. The + // timeout was previously handled, so this will either block in + // the normal case or succeed very quickly in the timeout case. let ret = unsafe { let mut transfer = 0; libc::GetOverlappedResult(handle, @@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + } } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs new file mode 100644 index 00000000000..0aaac8f8ad8 --- /dev/null +++ b/src/libnative/io/util.rs @@ -0,0 +1,136 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc; +use std::io::IoResult; +use std::io; +use std::mem; +use std::ptr; + +use super::c; +use super::net; +use super::{retry, last_error}; + +pub fn timeout(desc: &'static str) -> io::IoError { + io::IoError { + kind: io::TimedOut, + desc: desc, + detail: None, + } +} + +pub fn ms_to_timeval(ms: u64) -> libc::timeval { + libc::timeval { + tv_sec: (ms / 1000) as libc::time_t, + tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: net::sock_t, + addrp: *libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + use std::os; + #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; + #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; + #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; + #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_error()), + _ => { + let err: libc::c_int = try!( + net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(io::IoError::from_errno(err as uint, true)) + } + } + } + } + + -1 => Err(last_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + } + + #[cfg(windows)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn await(fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let start = ::io::timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); + c::select(fd + 1, ptr::null(), set as *mut _ as *_, + ptr::null(), &tv) + }) + } + #[cfg(windows)] + fn await(_fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } + } +} + +pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + + match retry(|| { + // If we're past the deadline, then pass a 0 timeout to select() so + // we can poll the status of the socket. + let now = ::io::timer::now(); + let ms = if deadline < now {0} else {deadline - now}; + let tv = ms_to_timeval(ms); + let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; + unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } + }) { + -1 => Err(last_error()), + 0 => Err(timeout("accept timed out")), + _ => return Ok(()), + } +} |
