about summary refs log tree commit diff
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2014-04-19 00:56:30 -0700
committerbors <bors@rust-lang.org>2014-04-19 00:56:30 -0700
commit158e0c86fe6c4db2d1dde8da156ce58aaf319dc4 (patch)
tree453907889bbb6913dbc8c6686cfab9fd93acfe3c
parent5a2ecb828bec0465a801b8c843f928ddccb2228f (diff)
parent3915e17cd70e2d584726364851d368badb8bf15b (diff)
downloadrust-158e0c86fe6c4db2d1dde8da156ce58aaf319dc4.tar.gz
rust-158e0c86fe6c4db2d1dde8da156ce58aaf319dc4.zip
auto merge of #13604 : alexcrichton/rust/connect-timeout, r=brson
This adds a `TcpStream::connect_timeout` function in order to assist opening
connections with a timeout (cc #13523). There isn't really much design space for
this specific operation (unlike timing out normal blocking reads/writes), so I
am fairly confident that this is the correct interface for this function.

The function is marked #[experimental] because it takes a u64 timeout argument,
and the u64 type is likely to change in the future.
-rw-r--r--src/liblibc/lib.rs15
-rw-r--r--src/libnative/io/c_unix.rs76
-rw-r--r--src/libnative/io/c_win32.rs62
-rw-r--r--src/libnative/io/mod.rs8
-rw-r--r--src/libnative/io/net.rs168
-rw-r--r--src/libnative/io/process.rs13
-rw-r--r--src/libnative/io/timer_unix.rs69
-rw-r--r--src/librustuv/lib.rs1
-rw-r--r--src/librustuv/net.rs89
-rw-r--r--src/librustuv/timer.rs16
-rw-r--r--src/librustuv/uvio.rs4
-rw-r--r--src/libstd/io/mod.rs2
-rw-r--r--src/libstd/io/net/tcp.rs17
-rw-r--r--src/libstd/rt/rtio.rs3
-rw-r--r--src/test/run-pass/tcp-connect-timeouts.rs92
15 files changed, 490 insertions, 145 deletions
diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs
index 4ec41e9488a..98613f885cd 100644
--- a/src/liblibc/lib.rs
+++ b/src/liblibc/lib.rs
@@ -87,13 +87,14 @@ pub use types::common::c95::{FILE, c_void, fpos_t};
 pub use types::common::c99::{int8_t, int16_t, int32_t, int64_t};
 pub use types::common::c99::{uint8_t, uint16_t, uint32_t, uint64_t};
 pub use types::common::posix88::{DIR, dirent_t};
+pub use types::os::common::posix01::{timeval};
 pub use types::os::common::bsd44::{addrinfo, in_addr, in6_addr, sockaddr_storage};
 pub use types::os::common::bsd44::{ip_mreq, ip6_mreq, sockaddr, sockaddr_un};
 pub use types::os::common::bsd44::{sa_family_t, sockaddr_in, sockaddr_in6, socklen_t};
 pub use types::os::arch::c95::{c_char, c_double, c_float, c_int, c_uint};
 pub use types::os::arch::c95::{c_long, c_short, c_uchar, c_ulong};
 pub use types::os::arch::c95::{c_ushort, clock_t, ptrdiff_t};
-pub use types::os::arch::c95::{size_t, time_t};
+pub use types::os::arch::c95::{size_t, time_t, suseconds_t};
 pub use types::os::arch::c99::{c_longlong, c_ulonglong};
 pub use types::os::arch::c99::{intptr_t, uintptr_t};
 pub use types::os::arch::posix88::{dev_t, ino_t, mode_t};
@@ -113,7 +114,7 @@ pub use consts::os::posix88::{STDERR_FILENO, STDIN_FILENO, S_IXUSR};
 pub use consts::os::posix88::{STDOUT_FILENO, W_OK, X_OK};
 pub use consts::os::bsd44::{AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM};
 pub use consts::os::bsd44::{IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP, TCP_NODELAY};
-pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE};
+pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
 pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
 pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
 pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
@@ -170,14 +171,13 @@ pub use funcs::bsd43::{shutdown};
 #[cfg(unix)] pub use consts::os::posix88::{ECONNREFUSED, ECONNRESET, EPERM, EPIPE};
 #[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
 #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
-#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT};
+#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
 #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
 #[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG};
 #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
 
-#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone, timeval};
+#[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone};
 
-#[cfg(unix)] pub use types::os::arch::c95::{suseconds_t};
 #[cfg(unix)] pub use types::os::arch::posix88::{uid_t, gid_t};
 #[cfg(unix)] pub use types::os::arch::posix01::{pthread_attr_t};
 #[cfg(unix)] pub use types::os::arch::posix01::{stat, utimbuf};
@@ -195,6 +195,7 @@ pub use funcs::bsd43::{shutdown};
 #[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
 #[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
 #[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
+#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
 #[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
 #[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};
@@ -1708,6 +1709,7 @@ pub mod consts {
             pub static SO_KEEPALIVE: c_int = 8;
             pub static SO_BROADCAST: c_int = 32;
             pub static SO_REUSEADDR: c_int = 4;
+            pub static SO_ERROR: c_int = 0x1007;
 
             pub static SHUT_RD: c_int = 0;
             pub static SHUT_WR: c_int = 1;
@@ -2496,6 +2498,7 @@ pub mod consts {
             pub static SO_KEEPALIVE: c_int = 9;
             pub static SO_BROADCAST: c_int = 6;
             pub static SO_REUSEADDR: c_int = 2;
+            pub static SO_ERROR: c_int = 4;
 
             pub static SHUT_RD: c_int = 0;
             pub static SHUT_WR: c_int = 1;
@@ -2954,6 +2957,7 @@ pub mod consts {
             pub static SO_KEEPALIVE: c_int = 0x0008;
             pub static SO_BROADCAST: c_int = 0x0020;
             pub static SO_REUSEADDR: c_int = 0x0004;
+            pub static SO_ERROR: c_int = 0x1007;
 
             pub static SHUT_RD: c_int = 0;
             pub static SHUT_WR: c_int = 1;
@@ -3340,6 +3344,7 @@ pub mod consts {
             pub static SO_KEEPALIVE: c_int = 0x0008;
             pub static SO_BROADCAST: c_int = 0x0020;
             pub static SO_REUSEADDR: c_int = 0x0004;
+            pub static SO_ERROR: c_int = 0x1007;
 
             pub static SHUT_RD: c_int = 0;
             pub static SHUT_WR: c_int = 1;
diff --git a/src/libnative/io/c_unix.rs b/src/libnative/io/c_unix.rs
new file mode 100644
index 00000000000..e2bf515a1e5
--- /dev/null
+++ b/src/libnative/io/c_unix.rs
@@ -0,0 +1,76 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! C definitions used by libnative that don't belong in liblibc
+
+pub use self::select::fd_set;
+
+use libc;
+
+#[cfg(target_os = "macos")]
+#[cfg(target_os = "freebsd")]
+pub static FIONBIO: libc::c_ulong = 0x8004667e;
+#[cfg(target_os = "linux")]
+#[cfg(target_os = "android")]
+pub static FIONBIO: libc::c_ulong = 0x5421;
+#[cfg(target_os = "macos")]
+#[cfg(target_os = "freebsd")]
+pub static FIOCLEX: libc::c_ulong = 0x20006601;
+#[cfg(target_os = "linux")]
+#[cfg(target_os = "android")]
+pub static FIOCLEX: libc::c_ulong = 0x5451;
+
+extern {
+    pub fn gettimeofday(timeval: *mut libc::timeval,
+                        tzp: *libc::c_void) -> libc::c_int;
+    pub fn select(nfds: libc::c_int,
+                  readfds: *fd_set,
+                  writefds: *fd_set,
+                  errorfds: *fd_set,
+                  timeout: *libc::timeval) -> libc::c_int;
+    pub fn getsockopt(sockfd: libc::c_int,
+                      level: libc::c_int,
+                      optname: libc::c_int,
+                      optval: *mut libc::c_void,
+                      optlen: *mut libc::socklen_t) -> libc::c_int;
+    pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int;
+
+}
+
+#[cfg(target_os = "macos")]
+mod select {
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [i32, ..(FD_SETSIZE / 32)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
+    }
+}
+
+#[cfg(target_os = "android")]
+#[cfg(target_os = "freebsd")]
+#[cfg(target_os = "linux")]
+mod select {
+    use std::uint;
+
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        let fd = fd as uint;
+        set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
+    }
+}
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs
new file mode 100644
index 00000000000..8d75a673914
--- /dev/null
+++ b/src/libnative/io/c_win32.rs
@@ -0,0 +1,62 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! C definitions used by libnative that don't belong in liblibc
+
+#![allow(type_overflow)]
+
+use libc;
+
+pub static WSADESCRIPTION_LEN: uint = 256;
+pub static WSASYS_STATUS_LEN: uint = 128;
+pub static FIONBIO: libc::c_long = 0x8004667e;
+static FD_SETSIZE: uint = 64;
+
+pub struct WSADATA {
+    pub wVersion: libc::WORD,
+    pub wHighVersion: libc::WORD,
+    pub szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
+    pub szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
+    pub iMaxSockets: u16,
+    pub iMaxUdpDg: u16,
+    pub lpVendorInfo: *u8,
+}
+
+pub type LPWSADATA = *mut WSADATA;
+
+pub struct fd_set {
+    fd_count: libc::c_uint,
+    fd_array: [libc::SOCKET, ..FD_SETSIZE],
+}
+
+pub fn fd_set(set: &mut fd_set, s: libc::SOCKET) {
+    set.fd_array[set.fd_count as uint] = s;
+    set.fd_count += 1;
+}
+
+#[link(name = "ws2_32")]
+extern "system" {
+    pub fn WSAStartup(wVersionRequested: libc::WORD,
+                      lpWSAData: LPWSADATA) -> libc::c_int;
+    pub fn WSAGetLastError() -> libc::c_int;
+
+    pub fn ioctlsocket(s: libc::SOCKET, cmd: libc::c_long,
+                       argp: *mut libc::c_ulong) -> libc::c_int;
+    pub fn select(nfds: libc::c_int,
+                  readfds: *mut fd_set,
+                  writefds: *mut fd_set,
+                  exceptfds: *mut fd_set,
+                  timeout: *libc::timeval) -> libc::c_int;
+    pub fn getsockopt(sockfd: libc::SOCKET,
+                      level: libc::c_int,
+                      optname: libc::c_int,
+                      optval: *mut libc::c_char,
+                      optlen: *mut libc::c_int) -> libc::c_int;
+}
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index 78d17bc8d74..19cb5c5f1d4 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -71,6 +71,9 @@ pub mod pipe;
 #[path = "pipe_win32.rs"]
 pub mod pipe;
 
+#[cfg(unix)]    #[path = "c_unix.rs"]  mod c;
+#[cfg(windows)] #[path = "c_win32.rs"] mod c;
+
 mod timer_helper;
 
 pub type IoResult<T> = Result<T, IoError>;
@@ -161,8 +164,9 @@ impl IoFactory {
 
 impl rtio::IoFactory for IoFactory {
     // networking
-    fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send> {
-        net::TcpStream::connect(addr).map(|s| ~s as ~RtioTcpStream:Send)
+    fn tcp_connect(&mut self, addr: SocketAddr,
+                   timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send> {
+        net::TcpStream::connect(addr, timeout).map(|s| ~s as ~RtioTcpStream:Send)
     }
     fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send> {
         net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener:Send)
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 2e64b82a84a..be597761b1a 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -8,15 +8,17 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use libc;
 use std::cast;
 use std::io::net::ip;
 use std::io;
-use libc;
 use std::mem;
+use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
 
 use super::{IoResult, retry, keep_going};
+use super::c;
 
 ////////////////////////////////////////////////////////////////////////////////
 // sockaddr and misc bindings
@@ -115,12 +117,26 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
     }
 }
 
+fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
+                       val: libc::c_int) -> IoResult<T> {
+    unsafe {
+        let mut slot: T = mem::init();
+        let mut len = mem::size_of::<T>() as libc::socklen_t;
+        let ret = c::getsockopt(fd, opt, val,
+                                &mut slot as *mut _ as *mut _,
+                                &mut len);
+        if ret != 0 {
+            Err(last_error())
+        } else {
+            assert!(len as uint == mem::size_of::<T>());
+            Ok(slot)
+        }
+    }
+}
+
 #[cfg(windows)]
 fn last_error() -> io::IoError {
-    extern "system" {
-        fn WSAGetLastError() -> libc::c_int;
-    }
-    io::IoError::from_errno(unsafe { WSAGetLastError() } as uint, true)
+    io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
 }
 
 #[cfg(not(windows))]
@@ -197,24 +213,6 @@ pub fn init() {}
 
 #[cfg(windows)]
 pub fn init() {
-    static WSADESCRIPTION_LEN: uint = 256;
-    static WSASYS_STATUS_LEN: uint = 128;
-    struct WSADATA {
-        wVersion: libc::WORD,
-        wHighVersion: libc::WORD,
-        szDescription: [u8, ..WSADESCRIPTION_LEN + 1],
-        szSystemStatus: [u8, ..WSASYS_STATUS_LEN + 1],
-        iMaxSockets: u16,
-        iMaxUdpDg: u16,
-        lpVendorInfo: *u8,
-    }
-    type LPWSADATA = *mut WSADATA;
-
-    #[link(name = "ws2_32")]
-    extern "system" {
-        fn WSAStartup(wVersionRequested: libc::WORD,
-                       lpWSAData: LPWSADATA) -> libc::c_int;
-    }
 
     unsafe {
         use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
@@ -223,9 +221,9 @@ pub fn init() {
 
         let _guard = LOCK.lock();
         if !INITIALIZED {
-            let mut data: WSADATA = mem::init();
-            let ret = WSAStartup(0x202,      // version 2.2
-                                 &mut data);
+            let mut data: c::WSADATA = mem::init();
+            let ret = c::WSAStartup(0x202,      // version 2.2
+                                    &mut data);
             assert_eq!(ret, 0);
             INITIALIZED = true;
         }
@@ -245,22 +243,118 @@ struct Inner {
 }
 
 impl TcpStream {
-    pub fn connect(addr: ip::SocketAddr) -> IoResult<TcpStream> {
-        unsafe {
-            socket(addr, libc::SOCK_STREAM).and_then(|fd| {
-                let (addr, len) = addr_to_sockaddr(addr);
-                let addrp = &addr as *libc::sockaddr_storage;
-                let inner = Inner { fd: fd };
-                let ret = TcpStream { inner: UnsafeArc::new(inner) };
-                match retry(|| {
-                    libc::connect(fd, addrp as *libc::sockaddr,
-                                  len as libc::socklen_t)
-                }) {
+    pub fn connect(addr: ip::SocketAddr,
+                   timeout: Option<u64>) -> IoResult<TcpStream> {
+        let fd = try!(socket(addr, libc::SOCK_STREAM));
+        let (addr, len) = addr_to_sockaddr(addr);
+        let inner = Inner { fd: fd };
+        let ret = TcpStream { inner: UnsafeArc::new(inner) };
+
+        let len = len as libc::socklen_t;
+        let addrp = &addr as *_ as *libc::sockaddr;
+        match timeout {
+            Some(timeout) => {
+                try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
+                Ok(ret)
+            },
+            None => {
+                match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
                     -1 => Err(last_error()),
                     _ => Ok(ret),
                 }
+            }
+        }
+    }
+
+    // See http://developerweb.net/viewtopic.php?id=3196 for where this is
+    // derived from.
+    fn connect_timeout(fd: sock_t,
+                       addrp: *libc::sockaddr,
+                       len: libc::socklen_t,
+                       timeout: u64) -> IoResult<()> {
+        use std::os;
+        #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
+        #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
+        #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
+        #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
+
+        // Make sure the call to connect() doesn't block
+        try!(set_nonblocking(fd, true));
+
+        let ret = match unsafe { libc::connect(fd, addrp, len) } {
+            // If the connection is in progress, then we need to wait for it to
+            // finish (with a timeout). The current strategy for doing this is
+            // to use select() with a timeout.
+            -1 if os::errno() as int == INPROGRESS as int ||
+                  os::errno() as int == WOULDBLOCK as int => {
+                let mut set: c::fd_set = unsafe { mem::init() };
+                c::fd_set(&mut set, fd);
+                match await(fd, &mut set, timeout) {
+                    0 => Err(io::IoError {
+                        kind: io::TimedOut,
+                        desc: "connection timed out",
+                        detail: None,
+                    }),
+                    -1 => Err(last_error()),
+                    _ => {
+                        let err: libc::c_int = try!(
+                            getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
+                        if err == 0 {
+                            Ok(())
+                        } else {
+                            Err(io::IoError::from_errno(err as uint, true))
+                        }
+                    }
+                }
+            }
+
+            -1 => Err(last_error()),
+            _ => Ok(()),
+        };
+
+        // be sure to turn blocking I/O back on
+        try!(set_nonblocking(fd, false));
+        return ret;
+
+        #[cfg(unix)]
+        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
+            let set = nb as libc::c_int;
+            super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
+        }
+        #[cfg(windows)]
+        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
+            let mut set = nb as libc::c_ulong;
+            if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
+                Err(last_error())
+            } else {
+                Ok(())
+            }
+        }
+
+        #[cfg(unix)]
+        fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
+            let start = ::io::timer::now();
+            retry(|| unsafe {
+                // Recalculate the timeout each iteration (it is generally
+                // undefined what the value of the 'tv' is after select
+                // returns EINTR).
+                let timeout = timeout - (::io::timer::now() - start);
+                let tv = libc::timeval {
+                    tv_sec: (timeout / 1000) as libc::time_t,
+                    tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
+                };
+                c::select(fd + 1, ptr::null(), set as *mut _ as *_,
+                          ptr::null(), &tv)
             })
         }
+        #[cfg(windows)]
+        fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
+            let tv = libc::timeval {
+                tv_sec: (timeout / 1000) as libc::time_t,
+                tv_usec: ((timeout % 1000) * 1000) as libc::suseconds_t,
+            };
+            unsafe { c::select(1, ptr::mut_null(), set, ptr::mut_null(), &tv) }
+        }
     }
 
     pub fn fd(&self) -> sock_t {
diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs
index a29a5b631c6..efdab990d18 100644
--- a/src/libnative/io/process.rs
+++ b/src/libnative/io/process.rs
@@ -454,7 +454,7 @@ fn spawn_process_os(config: p::ProcessConfig,
                     err_fd: c_int) -> IoResult<SpawnProcessResult> {
     use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
     use libc::funcs::bsd44::getdtablesize;
-    use libc::c_ulong;
+    use io::c;
 
     mod rustrt {
         extern {
@@ -475,16 +475,7 @@ fn spawn_process_os(config: p::ProcessConfig,
     }
 
     unsafe fn set_cloexec(fd: c_int) {
-        extern { fn ioctl(fd: c_int, req: c_ulong) -> c_int; }
-
-        #[cfg(target_os = "macos")]
-        #[cfg(target_os = "freebsd")]
-        static FIOCLEX: c_ulong = 0x20006601;
-        #[cfg(target_os = "linux")]
-        #[cfg(target_os = "android")]
-        static FIOCLEX: c_ulong = 0x5451;
-
-        let ret = ioctl(fd, FIOCLEX);
+        let ret = c::ioctl(fd, c::FIOCLEX);
         assert_eq!(ret, 0);
     }
 
diff --git a/src/libnative/io/timer_unix.rs b/src/libnative/io/timer_unix.rs
index 0a38a6ff0be..e5d4a6bb02b 100644
--- a/src/libnative/io/timer_unix.rs
+++ b/src/libnative/io/timer_unix.rs
@@ -53,8 +53,9 @@ use std::ptr;
 use std::rt::rtio;
 use std::sync::atomics;
 
-use io::file::FileDesc;
 use io::IoResult;
+use io::c;
+use io::file::FileDesc;
 use io::timer_helper;
 
 pub struct Timer {
@@ -84,16 +85,16 @@ pub enum Req {
 }
 
 // returns the current time (in milliseconds)
-fn now() -> u64 {
+pub fn now() -> u64 {
     unsafe {
         let mut now: libc::timeval = mem::init();
-        assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
+        assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0);
         return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
     }
 }
 
 fn helper(input: libc::c_int, messages: Receiver<Req>) {
-    let mut set: imp::fd_set = unsafe { mem::init() };
+    let mut set: c::fd_set = unsafe { mem::init() };
 
     let mut fd = FileDesc::new(input, true);
     let mut timeout: libc::timeval = unsafe { mem::init() };
@@ -150,9 +151,9 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
             &timeout as *libc::timeval
         };
 
-        imp::fd_set(&mut set, input);
+        c::fd_set(&mut set, input);
         match unsafe {
-            imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
+            c::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
         } {
             // timed out
             0 => signal(&mut active, &mut dead),
@@ -283,59 +284,3 @@ impl Drop for Timer {
         self.inner = Some(self.inner());
     }
 }
-
-#[cfg(target_os = "macos")]
-mod imp {
-    use libc;
-
-    pub static FD_SETSIZE: uint = 1024;
-
-    pub struct fd_set {
-        fds_bits: [i32, ..(FD_SETSIZE / 32)]
-    }
-
-    pub fn fd_set(set: &mut fd_set, fd: i32) {
-        set.fds_bits[(fd / 32) as uint] |= 1 << (fd % 32);
-    }
-
-    extern {
-        pub fn select(nfds: libc::c_int,
-                      readfds: *fd_set,
-                      writefds: *fd_set,
-                      errorfds: *fd_set,
-                      timeout: *libc::timeval) -> libc::c_int;
-
-        pub fn gettimeofday(timeval: *mut libc::timeval,
-                            tzp: *libc::c_void) -> libc::c_int;
-    }
-}
-
-#[cfg(target_os = "android")]
-#[cfg(target_os = "freebsd")]
-#[cfg(target_os = "linux")]
-mod imp {
-    use libc;
-    use std::uint;
-
-    pub static FD_SETSIZE: uint = 1024;
-
-    pub struct fd_set {
-        fds_bits: [uint, ..(FD_SETSIZE / uint::BITS)]
-    }
-
-    pub fn fd_set(set: &mut fd_set, fd: i32) {
-        let fd = fd as uint;
-        set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
-    }
-
-    extern {
-        pub fn select(nfds: libc::c_int,
-                      readfds: *fd_set,
-                      writefds: *fd_set,
-                      errorfds: *fd_set,
-                      timeout: *libc::timeval) -> libc::c_int;
-
-        pub fn gettimeofday(timeval: *mut libc::timeval,
-                            tzp: *libc::c_void) -> libc::c_int;
-    }
-}
diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs
index f30c04b405b..4f1ca0b02d3 100644
--- a/src/librustuv/lib.rs
+++ b/src/librustuv/lib.rs
@@ -412,6 +412,7 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
             uvll::EPIPE => io::BrokenPipe,
             uvll::ECONNABORTED => io::ConnectionAborted,
             uvll::EADDRNOTAVAIL => io::ConnectionRefused,
+            uvll::ECANCELED => io::TimedOut,
             err => {
                 uvdebug!("uverr.code {}", err as int);
                 // FIXME: Need to map remaining uv error types
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 73454aaf13f..cbda25485c7 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -25,6 +25,7 @@ use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
             wait_until_woken_after, wakeup};
+use timer::TimerWatcher;
 use uvio::UvIoFactory;
 use uvll;
 
@@ -198,10 +199,14 @@ impl TcpWatcher {
         }
     }
 
-    pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
-        -> Result<TcpWatcher, UvError>
-    {
-        struct Ctx { status: c_int, task: Option<BlockedTask> }
+    pub fn connect(io: &mut UvIoFactory,
+                   address: ip::SocketAddr,
+                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
+        struct Ctx {
+            status: c_int,
+            task: Option<BlockedTask>,
+            timer: Option<~TimerWatcher>,
+        }
 
         let tcp = TcpWatcher::new(io);
         let (addr, _len) = addr_to_sockaddr(address);
@@ -215,24 +220,72 @@ impl TcpWatcher {
         return match result {
             0 => {
                 req.defuse(); // uv callback now owns this request
-                let mut cx = Ctx { status: 0, task: None };
+                let mut cx = Ctx { status: -1, task: None, timer: None };
+                match timeout {
+                    Some(t) => {
+                        let mut timer = TimerWatcher::new(io);
+                        timer.start(timer_cb, t, 0);
+                        cx.timer = Some(timer);
+                    }
+                    None => {}
+                }
                 wait_until_woken_after(&mut cx.task, &io.loop_, || {
-                    req.set_data(&cx);
+                    let data = &cx as *_;
+                    match cx.timer {
+                        Some(ref mut timer) => unsafe { timer.set_data(data) },
+                        None => {}
+                    }
+                    req.set_data(data);
                 });
+                // Make sure an erroneously fired callback doesn't have access
+                // to the context any more.
+                req.set_data(0 as *int);
+
+                // If we failed because of a timeout, drop the TcpWatcher as
+                // soon as possible because it's data is now set to null and we
+                // want to cancel the callback ASAP.
                 match cx.status {
                     0 => Ok(tcp),
-                    n => Err(UvError(n)),
+                    n => { drop(tcp); Err(UvError(n)) }
                 }
             }
             n => Err(UvError(n))
         };
 
+        extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
+            // Don't close the corresponding tcp request, just wake up the task
+            // and let RAII take care of the pending watcher.
+            assert_eq!(status, 0);
+            let cx: &mut Ctx = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
+            };
+            cx.status = uvll::ECANCELED;
+            wakeup(&mut cx.task);
+        }
+
         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
+            // This callback can be invoked with ECANCELED if the watcher is
+            // closed by the timeout callback. In that case we just want to free
+            // the request and be along our merry way.
             let req = Request::wrap(req);
-            assert!(status != uvll::ECANCELED);
+            if status == uvll::ECANCELED { return }
+
             let cx: &mut Ctx = unsafe { req.get_data() };
             cx.status = status;
-            wakeup(&mut cx.task);
+            match cx.timer {
+                Some(ref mut t) => t.stop(),
+                None => {}
+            }
+            // Note that the timer callback doesn't cancel the connect request
+            // (that's the job of uv_close()), so it's possible for this
+            // callback to get triggered after the timeout callback fires, but
+            // before the task wakes up. In that case, we did indeed
+            // successfully connect, but we don't need to wake someone up. We
+            // updated the status above (correctly so), and the task will pick
+            // up on this when it wakes up.
+            if cx.task.is_some() {
+                wakeup(&mut cx.task);
+            }
         }
     }
 }
@@ -741,7 +794,7 @@ mod test {
 
     #[test]
     fn connect_close_ip4() {
-        match TcpWatcher::connect(local_loop(), next_test_ip4()) {
+        match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
             Ok(..) => fail!(),
             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
         }
@@ -749,7 +802,7 @@ mod test {
 
     #[test]
     fn connect_close_ip6() {
-        match TcpWatcher::connect(local_loop(), next_test_ip6()) {
+        match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
             Ok(..) => fail!(),
             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
         }
@@ -799,7 +852,7 @@ mod test {
         });
 
         rx.recv();
-        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+        let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@@ -835,7 +888,7 @@ mod test {
         });
 
         rx.recv();
-        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+        let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
             Ok(w) => w, Err(e) => fail!("{:?}", e)
         };
         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
@@ -928,7 +981,7 @@ mod test {
         });
 
         rx.recv();
-        let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+        let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
         let mut buf = [0, .. 2048];
         let mut total_bytes_read = 0;
         while total_bytes_read < MAX {
@@ -1036,7 +1089,7 @@ mod test {
 
         spawn(proc() {
             let rx = rx.recv();
-            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+            let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
             rx.recv();
@@ -1088,9 +1141,9 @@ mod test {
             }
         });
 
-        let mut stream = TcpWatcher::connect(local_loop(), addr);
+        let mut stream = TcpWatcher::connect(local_loop(), addr, None);
         while stream.is_err() {
-            stream = TcpWatcher::connect(local_loop(), addr);
+            stream = TcpWatcher::connect(local_loop(), addr, None);
         }
         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
     }
@@ -1115,7 +1168,7 @@ mod test {
             drop(w.accept().unwrap());
         });
         rx.recv();
-        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+        let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
         fail!();
     }
 
diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs
index 58008002837..3710d97827f 100644
--- a/src/librustuv/timer.rs
+++ b/src/librustuv/timer.rs
@@ -48,15 +48,19 @@ impl TimerWatcher {
         return me.install();
     }
 
-    fn start(&mut self, msecs: u64, period: u64) {
+    pub fn start(&mut self, f: uvll::uv_timer_cb, msecs: u64, period: u64) {
         assert_eq!(unsafe {
-            uvll::uv_timer_start(self.handle, timer_cb, msecs, period)
+            uvll::uv_timer_start(self.handle, f, msecs, period)
         }, 0)
     }
 
-    fn stop(&mut self) {
+    pub fn stop(&mut self) {
         assert_eq!(unsafe { uvll::uv_timer_stop(self.handle) }, 0)
     }
+
+    pub unsafe fn set_data<T>(&mut self, data: *T) {
+        uvll::set_data_for_uv_handle(self.handle, data);
+    }
 }
 
 impl HomingIO for TimerWatcher {
@@ -92,7 +96,7 @@ impl RtioTimer for TimerWatcher {
 
         self.action = Some(WakeTask);
         wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
-            self.start(msecs, 0);
+            self.start(timer_cb, msecs, 0);
         });
         self.stop();
     }
@@ -106,7 +110,7 @@ impl RtioTimer for TimerWatcher {
             let _m = self.fire_homing_missile();
             self.id += 1;
             self.stop();
-            self.start(msecs, 0);
+            self.start(timer_cb, msecs, 0);
             mem::replace(&mut self.action, Some(SendOnce(tx)))
         };
 
@@ -122,7 +126,7 @@ impl RtioTimer for TimerWatcher {
             let _m = self.fire_homing_missile();
             self.id += 1;
             self.stop();
-            self.start(msecs, msecs);
+            self.start(timer_cb, msecs, msecs);
             mem::replace(&mut self.action, Some(SendMany(tx, self.id)))
         };
 
diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs
index 55456bb548e..3769a1b8d6d 100644
--- a/src/librustuv/uvio.rs
+++ b/src/librustuv/uvio.rs
@@ -143,10 +143,10 @@ impl IoFactory for UvIoFactory {
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
-    fn tcp_connect(&mut self, addr: SocketAddr)
+    fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
         -> Result<~rtio::RtioTcpStream:Send, IoError>
     {
-        match TcpWatcher::connect(self, addr) {
+        match TcpWatcher::connect(self, addr, timeout) {
             Ok(t) => Ok(~t as ~rtio::RtioTcpStream:Send),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index d8267e472bd..9c163523abe 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -430,6 +430,8 @@ pub enum IoErrorKind {
     IoUnavailable,
     /// A parameter was incorrect in a way that caused an I/O error not part of this list.
     InvalidInput,
+    /// The I/O operation's timeout expired, causing it to be canceled.
+    TimedOut,
 }
 
 /// A trait for objects which are byte-oriented streams. Readers are defined by
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 49e6bcff8eb..4f1e6bd7418 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -22,6 +22,7 @@ use io::IoResult;
 use io::net::ip::SocketAddr;
 use io::{Reader, Writer, Listener, Acceptor};
 use kinds::Send;
+use option::{None, Some};
 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
 
@@ -57,7 +58,21 @@ impl TcpStream {
     /// If no error is encountered, then `Ok(stream)` is returned.
     pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
         LocalIo::maybe_raise(|io| {
-            io.tcp_connect(addr).map(TcpStream::new)
+            io.tcp_connect(addr, None).map(TcpStream::new)
+        })
+    }
+
+    /// Creates a TCP connection to a remote socket address, timing out after
+    /// the specified number of milliseconds.
+    ///
+    /// This is the same as the `connect` method, except that if the timeout
+    /// specified (in milliseconds) elapses before a connection is made an error
+    /// will be returned. The error's kind will be `TimedOut`.
+    #[experimental = "the timeout argument may eventually change types"]
+    pub fn connect_timeout(addr: SocketAddr,
+                           timeout_ms: u64) -> IoResult<TcpStream> {
+        LocalIo::maybe_raise(|io| {
+            io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
         })
     }
 
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index cc8356d2b9a..0f3fc9c21ce 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -146,7 +146,8 @@ impl<'a> LocalIo<'a> {
 
 pub trait IoFactory {
     // networking
-    fn tcp_connect(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpStream:Send>;
+    fn tcp_connect(&mut self, addr: SocketAddr,
+                   timeout: Option<u64>) -> IoResult<~RtioTcpStream:Send>;
     fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener:Send>;
     fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
     fn unix_bind(&mut self, path: &CString)
diff --git a/src/test/run-pass/tcp-connect-timeouts.rs b/src/test/run-pass/tcp-connect-timeouts.rs
new file mode 100644
index 00000000000..26f9b2ea6b7
--- /dev/null
+++ b/src/test/run-pass/tcp-connect-timeouts.rs
@@ -0,0 +1,92 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+// ignore-pretty
+// compile-flags:--test
+// exec-env:RUST_TEST_TASKS=1
+
+// Tests for the connect_timeout() function on a TcpStream. This runs with only
+// one test task to ensure that errors are timeouts, not file descriptor
+// exhaustion.
+
+#![feature(macro_rules, globs)]
+#![allow(experimental)]
+
+extern crate native;
+extern crate green;
+extern crate rustuv;
+
+#[cfg(test)] #[start]
+fn start(argc: int, argv: **u8) -> int {
+    green::start(argc, argv, rustuv::event_loop, __test::main)
+}
+
+macro_rules! iotest (
+    { fn $name:ident() $b:block $($a:attr)* } => (
+        mod $name {
+            #![allow(unused_imports)]
+
+            use std::io::*;
+            use std::io::net::tcp::*;
+            use std::io::test::*;
+            use std::io;
+
+            fn f() $b
+
+            $($a)* #[test] fn green() { f() }
+            $($a)* #[test] fn native() {
+                use native;
+                let (tx, rx) = channel();
+                native::task::spawn(proc() { tx.send(f()) });
+                rx.recv();
+            }
+        }
+    )
+)
+
+iotest!(fn eventual_timeout() {
+    use native;
+    let addr = next_test_ip4();
+
+    // Use a native task to receive connections because it turns out libuv is
+    // really good at accepting connections and will likely run out of file
+    // descriptors before timing out.
+    let (tx1, rx1) = channel();
+    let (_tx2, rx2) = channel::<()>();
+    native::task::spawn(proc() {
+        let _l = TcpListener::bind(addr).unwrap().listen();
+        tx1.send(());
+        let _ = rx2.recv_opt();
+    });
+    rx1.recv();
+
+    let mut v = Vec::new();
+    for _ in range(0, 10000) {
+        match TcpStream::connect_timeout(addr, 100) {
+            Ok(e) => v.push(e),
+            Err(ref e) if e.kind == io::TimedOut => return,
+            Err(e) => fail!("other error: {}", e),
+        }
+    }
+    fail!("never timed out!");
+})
+
+iotest!(fn timeout_success() {
+    let addr = next_test_ip4();
+    let _l = TcpListener::bind(addr).unwrap().listen();
+
+    assert!(TcpStream::connect_timeout(addr, 1000).is_ok());
+})
+
+iotest!(fn timeout_error() {
+    let addr = next_test_ip4();
+
+    assert!(TcpStream::connect_timeout(addr, 1000).is_err());
+})