about summary refs log tree commit diff
path: root/src/libnative
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-04-21 20:30:07 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-04-23 19:07:31 -0700
commite5d3e5180f667f8850cdd96af60fc5511746b1bd (patch)
treeb960b71ae8310007eabf6e2565173f7081cab267 /src/libnative
parentbb580f1a56138bd5a96ccc95c0f61caab72cf975 (diff)
downloadrust-e5d3e5180f667f8850cdd96af60fc5511746b1bd.tar.gz
rust-e5d3e5180f667f8850cdd96af60fc5511746b1bd.zip
std: Add support for an accept() timeout
This adds experimental support for timeouts when accepting sockets through
`TcpAcceptor::accept`. This does not add a separate `accept_timeout` function,
but rather it adds a `set_timeout` function instead. This second function is
intended to be used as a hard deadline after which all accepts will never block
and fail immediately.

This idea was derived from Go's SetDeadline() methods. We do not currently have
a robust time abstraction in the standard library, so I opted to have the
argument be a relative time in millseconds into the future. I believe a more
appropriate argument type is an absolute time, but this concept does not exist
yet (this is also why the function is marked #[experimental]).

The native support is built on select(), similarly to connect_timeout(), and the
green support is based on channel select and a timer.

cc #13523
Diffstat (limited to 'src/libnative')
-rw-r--r--src/libnative/io/c_win32.rs6
-rw-r--r--src/libnative/io/net.rs83
-rw-r--r--src/libnative/io/timer_win32.rs11
3 files changed, 70 insertions, 30 deletions
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs
index 8d75a673914..dbbb39b3b7b 100644
--- a/src/libnative/io/c_win32.rs
+++ b/src/libnative/io/c_win32.rs
@@ -50,9 +50,9 @@ extern "system" {
     pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
                        argp: *mut libc::c_ulong) -> libc::c_int;
     pub fn select(nfds: libc::c_int,
-                  readfds: *mut fd_set,
-                  writefds: *mut fd_set,
-                  exceptfds: *mut fd_set,
+                  readfds: *fd_set,
+                  writefds: *fd_set,
+                  exceptfds: *fd_set,
                   timeout: *libc::timeval) -> libc::c_int;
     pub fn getsockopt(sockfd: libc::SOCKET,
                       level: libc::c_int,
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index be597761b1a..93ec23e32ad 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -13,6 +13,7 @@ use std::cast;
 use std::io::net::ip;
 use std::io;
 use std::mem;
+use std::os;
 use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
@@ -144,6 +145,21 @@ fn last_error() -> io::IoError {
     super::last_error()
 }
 
+fn ms_to_timeval(ms: u64) -> libc::timeval {
+    libc::timeval {
+        tv_sec: (ms / 1000) as libc::time_t,
+        tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
+    }
+}
+
+fn timeout(desc: &'static str) -> io::IoError {
+    io::IoError {
+        kind: io::TimedOut,
+        desc: desc,
+        detail: None,
+    }
+}
+
 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
 
@@ -271,8 +287,7 @@ impl TcpStream {
     fn connect_timeout(fd: sock_t,
                        addrp: *libc::sockaddr,
                        len: libc::socklen_t,
-                       timeout: u64) -> IoResult<()> {
-        use std::os;
+                       timeout_ms: u64) -> IoResult<()> {
         #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
         #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
         #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
@@ -289,12 +304,8 @@ impl TcpStream {
                   os::errno() as int == WOULDBLOCK as int => {
                 let mut set: c::fd_set = unsafe { mem::init() };
                 c::fd_set(&mut set, fd);
-                match await(fd, &mut set, timeout) {
-                    0 => Err(io::IoError {
-                        kind: io::TimedOut,
-                        desc: "connection timed out",
-                        detail: None,
-                    }),
+                match await(fd, &mut set, timeout_ms) {
+                    0 => Err(timeout("connection timed out")),
                     -1 => Err(last_error()),
                     _ => {
                         let err: libc::c_int = try!(
@@ -338,22 +349,14 @@ impl TcpStream {
                 // Recalculate the timeout each iteration (it is generally
                 // undefined what the value of the 'tv' is after select
                 // returns EINTR).
-                let timeout = timeout - (::io::timer::now() - start);
-                let tv = libc::timeval {
-                    tv_sec: (timeout / 1000) as libc::time_t,
-                    tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
-                };
-                c::select(fd + 1, ptr::null(), set as *mut _ as *_,
-                          ptr::null(), &tv)
+                let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
+                c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
             })
         }
         #[cfg(windows)]
         fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
-            let tv = libc::timeval {
-                tv_sec: (timeout / 1000) as libc::time_t,
-                tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
-            };
-            unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
+            let tv = ms_to_timeval(timeout);
+            unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
         }
     }
 
@@ -467,7 +470,7 @@ impl Drop for Inner {
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct TcpListener {
-    inner: UnsafeArc<Inner>,
+    inner: Inner,
 }
 
 impl TcpListener {
@@ -477,7 +480,7 @@ impl TcpListener {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
                 let inner = Inner { fd: fd };
-                let ret = TcpListener { inner: UnsafeArc::new(inner) };
+                let ret = TcpListener { inner: inner };
                 // On platforms with Berkeley-derived sockets, this allows
                 // to quickly rebind a socket, without needing to wait for
                 // the OS to clean up the previous one.
@@ -498,15 +501,12 @@ impl TcpListener {
         }
     }
 
-    pub fn fd(&self) -> sock_t {
-        // This is just a read-only arc so the unsafety is fine
-        unsafe { (*self.inner.get()).fd }
-    }
+    pub fn fd(&self) -> sock_t { self.inner.fd }
 
     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(last_error()),
-            _ => Ok(TcpAcceptor { listener: self })
+            _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
         }
     }
 }
@@ -525,12 +525,16 @@ impl rtio::RtioSocket for TcpListener {
 
 pub struct TcpAcceptor {
     listener: TcpListener,
+    deadline: u64,
 }
 
 impl TcpAcceptor {
     pub fn fd(&self) -> sock_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
+        if self.deadline != 0 {
+            try!(self.accept_deadline());
+        }
         unsafe {
             let mut storage: libc::sockaddr_storage = mem::init();
             let storagep = &mut storage as *mut libc::sockaddr_storage;
@@ -546,6 +550,25 @@ impl TcpAcceptor {
             }
         }
     }
+
+    fn accept_deadline(&mut self) -> IoResult<()> {
+        let mut set: c::fd_set = unsafe { mem::init() };
+        c::fd_set(&mut set, self.fd());
+
+        match retry(|| {
+            // If we're past the deadline, then pass a 0 timeout to select() so
+            // we can poll the status of the socket.
+            let now = ::io::timer::now();
+            let ms = if self.deadline > now {0} else {self.deadline - now};
+            let tv = ms_to_timeval(ms);
+            let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
+            unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
+        }) {
+            -1 => Err(last_error()),
+            0 => Err(timeout("accept timed out")),
+            _ => return Ok(()),
+        }
+    }
 }
 
 impl rtio::RtioSocket for TcpAcceptor {
@@ -561,6 +584,12 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
 
     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = match timeout {
+            None => 0,
+            Some(t) => ::io::timer::now() + t,
+        };
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs
index a15898feb92..588ec367d81 100644
--- a/src/libnative/io/timer_win32.rs
+++ b/src/libnative/io/timer_win32.rs
@@ -89,6 +89,17 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
     }
 }
 
+// returns the current time (in milliseconds)
+pub fn now() -> u64 {
+    let mut ticks_per_s = 0;
+    assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1);
+    let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
+    let mut ticks = 0;
+    assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1);
+
+    return (ticks as u64 * 1000) / (ticks_per_s as u64);
+}
+
 impl Timer {
     pub fn new() -> IoResult<Timer> {
         timer_helper::boot(helper);