about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/liblibc/lib.rs2
-rw-r--r--src/libnative/io/c_win32.rs2
-rw-r--r--src/libnative/io/mod.rs6
-rw-r--r--src/libnative/io/net.rs128
-rw-r--r--src/libnative/io/pipe_unix.rs59
-rw-r--r--src/libnative/io/pipe_win32.rs56
-rw-r--r--src/libnative/io/util.rs136
-rw-r--r--src/librustuv/net.rs332
-rw-r--r--src/librustuv/pipe.rs59
-rw-r--r--src/librustuv/uvio.rs5
-rw-r--r--src/libstd/io/net/unix.rs91
-rw-r--r--src/libstd/rt/rtio.rs4
12 files changed, 531 insertions, 349 deletions
diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs
index 98613f885cd..bebf95a4a3b 100644
--- a/src/liblibc/lib.rs
+++ b/src/liblibc/lib.rs
@@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
 #[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
 #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
-#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
+#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
 #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
 #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
 #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs
index dbbb39b3b7b..6c84424e97a 100644
--- a/src/libnative/io/c_win32.rs
+++ b/src/libnative/io/c_win32.rs
@@ -59,4 +59,6 @@ extern "system" {
                       optname: libc::c_int,
                       optval: *mut libc::c_char,
                       optlen: *mut libc::c_int) -> libc::c_int;
+
+    pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
 }
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index 19cb5c5f1d4..944766e8fd0 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -44,6 +44,7 @@ pub use self::process::Process;
 pub mod addrinfo;
 pub mod net;
 pub mod process;
+mod util;
 
 #[cfg(unix)]
 #[path = "file_unix.rs"]
@@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
     fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
         pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
     }
-    fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
-        pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
+        pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
     }
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
                           hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 93ec23e32ad..cc41da846b2 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -13,13 +13,12 @@ use std::cast;
 use std::io::net::ip;
 use std::io;
 use std::mem;
-use std::os;
-use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
 
 use super::{IoResult, retry, keep_going};
 use super::c;
+use super::util;
 
 ////////////////////////////////////////////////////////////////////////////////
 // sockaddr and misc bindings
@@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
     }
 }
 
-fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
-                       val: libc::c_int) -> IoResult<T> {
+pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
+                           val: libc::c_int) -> IoResult<T> {
     unsafe {
         let mut slot: T = mem::init();
         let mut len = mem::size_of::<T>() as libc::socklen_t;
@@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
     super::last_error()
 }
 
-fn ms_to_timeval(ms: u64) -> libc::timeval {
-    libc::timeval {
-        tv_sec: (ms / 1000) as libc::time_t,
-        tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
-    }
-}
-
-fn timeout(desc: &'static str) -> io::IoError {
-    io::IoError {
-        kind: io::TimedOut,
-        desc: desc,
-        detail: None,
-    }
-}
-
 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
 
@@ -270,7 +254,7 @@ impl TcpStream {
         let addrp = &addr as *_ as *libc::sockaddr;
         match timeout {
             Some(timeout) => {
-                try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
+                try!(util::connect_timeout(fd, addrp, len, timeout));
                 Ok(ret)
             },
             None => {
@@ -282,84 +266,6 @@ impl TcpStream {
         }
     }
 
-    // See http://developerweb.net/viewtopic.php?id=3196 for where this is
-    // derived from.
-    fn connect_timeout(fd: sock_t,
-                       addrp: *libc::sockaddr,
-                       len: libc::socklen_t,
-                       timeout_ms: u64) -> IoResult<()> {
-        #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
-        #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
-        #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
-        #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
-
-        // Make sure the call to connect() doesn't block
-        try!(set_nonblocking(fd, true));
-
-        let ret = match unsafe { libc::connect(fd, addrp, len) } {
-            // If the connection is in progress, then we need to wait for it to
-            // finish (with a timeout). The current strategy for doing this is
-            // to use select() with a timeout.
-            -1 if os::errno() as int == INPROGRESS as int ||
-                  os::errno() as int == WOULDBLOCK as int => {
-                let mut set: c::fd_set = unsafe { mem::init() };
-                c::fd_set(&mut set, fd);
-                match await(fd, &mut set, timeout_ms) {
-                    0 => Err(timeout("connection timed out")),
-                    -1 => Err(last_error()),
-                    _ => {
-                        let err: libc::c_int = try!(
-                            getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
-                        if err == 0 {
-                            Ok(())
-                        } else {
-                            Err(io::IoError::from_errno(err as uint, true))
-                        }
-                    }
-                }
-            }
-
-            -1 => Err(last_error()),
-            _ => Ok(()),
-        };
-
-        // be sure to turn blocking I/O back on
-        try!(set_nonblocking(fd, false));
-        return ret;
-
-        #[cfg(unix)]
-        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
-            let set = nb as libc::c_int;
-            super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
-        }
-        #[cfg(windows)]
-        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
-            let mut set = nb as libc::c_ulong;
-            if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
-                Err(last_error())
-            } else {
-                Ok(())
-            }
-        }
-
-        #[cfg(unix)]
-        fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
-            let start = ::io::timer::now();
-            retry(|| unsafe {
-                // Recalculate the timeout each iteration (it is generally
-                // undefined what the value of the 'tv' is after select
-                // returns EINTR).
-                let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
-                c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
-            })
-        }
-        #[cfg(windows)]
-        fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
-            let tv = ms_to_timeval(timeout);
-            unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
-        }
-    }
-
     pub fn fd(&self) -> sock_t {
         // This unsafety is fine because it's just a read-only arc
         unsafe { (*self.inner.get()).fd }
@@ -533,7 +439,7 @@ impl TcpAcceptor {
 
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
         if self.deadline != 0 {
-            try!(self.accept_deadline());
+            try!(util::accept_deadline(self.fd(), self.deadline));
         }
         unsafe {
             let mut storage: libc::sockaddr_storage = mem::init();
@@ -550,25 +456,6 @@ impl TcpAcceptor {
             }
         }
     }
-
-    fn accept_deadline(&mut self) -> IoResult<()> {
-        let mut set: c::fd_set = unsafe { mem::init() };
-        c::fd_set(&mut set, self.fd());
-
-        match retry(|| {
-            // If we're past the deadline, then pass a 0 timeout to select() so
-            // we can poll the status of the socket.
-            let now = ::io::timer::now();
-            let ms = if self.deadline > now {0} else {self.deadline - now};
-            let tv = ms_to_timeval(ms);
-            let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
-            unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
-        }) {
-            -1 => Err(last_error()),
-            0 => Err(timeout("accept timed out")),
-            _ => return Ok(()),
-        }
-    }
 }
 
 impl rtio::RtioSocket for TcpAcceptor {
@@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn set_timeout(&mut self, timeout: Option<u64>) {
-        self.deadline = match timeout {
-            None => 0,
-            Some(t) => ::io::timer::now() + t,
-        };
+        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
 }
 
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs
index 5d13a6b5fc5..190cae05d43 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libnative/io/pipe_unix.rs
@@ -8,16 +8,17 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use libc;
 use std::c_str::CString;
 use std::cast;
+use std::intrinsics;
 use std::io;
-use libc;
 use std::mem;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
-use std::intrinsics;
 
 use super::{IoResult, retry, keep_going};
+use super::util;
 use super::file::fd_t;
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
@@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
     return Ok((storage, len));
 }
 
-fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
-                    len: uint) -> IoResult<CString> {
-    match storage.ss_family as libc::c_int {
-        libc::AF_UNIX => {
-            assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
-            let storage: &libc::sockaddr_un = unsafe {
-                cast::transmute(storage)
-            };
-            unsafe {
-                Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
-            }
-        }
-        _ => Err(io::standard_error(io::InvalidInput))
-    }
-}
-
 struct Inner {
     fd: fd_t,
 }
@@ -76,16 +61,24 @@ impl Drop for Inner {
     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
 }
 
-fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
+fn connect(addr: &CString, ty: libc::c_int,
+           timeout: Option<u64>) -> IoResult<Inner> {
     let (addr, len) = try!(addr_to_sockaddr_un(addr));
     let inner = Inner { fd: try!(unix_socket(ty)) };
-    let addrp = &addr as *libc::sockaddr_storage;
-    match retry(|| unsafe {
-        libc::connect(inner.fd, addrp as *libc::sockaddr,
-                      len as libc::socklen_t)
-    }) {
-        -1 => Err(super::last_error()),
-        _  => Ok(inner)
+    let addrp = &addr as *_ as *libc::sockaddr;
+    let len = len as libc::socklen_t;
+
+    match timeout {
+        None => {
+            match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
+                -1 => Err(super::last_error()),
+                _  => Ok(inner)
+            }
+        }
+        Some(timeout_ms) => {
+            try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
+            Ok(inner)
+        }
     }
 }
 
@@ -110,8 +103,9 @@ pub struct UnixStream {
 }
 
 impl UnixStream {
-    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
-        connect(addr, libc::SOCK_STREAM).map(|inner| {
+    pub fn connect(addr: &CString,
+                   timeout: Option<u64>) -> IoResult<UnixStream> {
+        connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
             UnixStream { inner: UnsafeArc::new(inner) }
         })
     }
@@ -176,7 +170,7 @@ impl UnixListener {
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(UnixAcceptor { listener: self })
+            _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
         }
     }
 }
@@ -189,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener {
 
 pub struct UnixAcceptor {
     listener: UnixListener,
+    deadline: u64,
 }
 
 impl UnixAcceptor {
     fn fd(&self) -> fd_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+        if self.deadline != 0 {
+            try!(util::accept_deadline(self.fd(), self.deadline));
+        }
         let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
         let storagep = &mut storage as *mut libc::sockaddr_storage;
         let size = mem::size_of::<libc::sockaddr_storage>();
@@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
         self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
     }
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    }
 }
 
 impl Drop for UnixListener {
diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs
index 84b3d887c04..a4f09ded0ac 100644
--- a/src/libnative/io/pipe_win32.rs
+++ b/src/libnative/io/pipe_win32.rs
@@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc;
 use std::intrinsics;
 
 use super::IoResult;
+use super::c;
+use super::util;
 
 struct Event(libc::HANDLE);
 
@@ -210,8 +212,9 @@ impl UnixStream {
         None
     }
 
-    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
+    pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
         as_utf16_p(addr.as_str().unwrap(), |p| {
+            let start = ::io::timer::now();
             loop {
                 match UnixStream::try_connect(p) {
                     Some(handle) => {
@@ -246,11 +249,26 @@ impl UnixStream {
                     return Err(super::last_error())
                 }
 
-                // An example I found on microsoft's website used 20 seconds,
-                // libuv uses 30 seconds, hence we make the obvious choice of
-                // waiting for 25 seconds.
-                if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
-                    return Err(super::last_error())
+                match timeout {
+                    Some(timeout) => {
+                        let now = ::io::timer::now();
+                        let timed_out = (now - start) >= timeout || unsafe {
+                            let ms = (timeout - (now - start)) as libc::DWORD;
+                            libc::WaitNamedPipeW(p, ms) == 0
+                        };
+                        if timed_out {
+                            return Err(util::timeout("connect timed out"))
+                        }
+                    }
+
+                    // An example I found on microsoft's website used 20
+                    // seconds, libuv uses 30 seconds, hence we make the
+                    // obvious choice of waiting for 25 seconds.
+                    None => {
+                        if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
+                            return Err(super::last_error())
+                        }
+                    }
                 }
             }
         })
@@ -372,6 +390,7 @@ impl UnixListener {
         Ok(UnixAcceptor {
             listener: self,
             event: try!(Event::new(true, false)),
+            deadline: 0,
         })
     }
 }
@@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener {
 pub struct UnixAcceptor {
     listener: UnixListener,
     event: Event,
+    deadline: u64,
 }
 
 impl UnixAcceptor {
@@ -438,7 +458,28 @@ impl UnixAcceptor {
         overlapped.hEvent = self.event.handle();
         if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
             let mut err = unsafe { libc::GetLastError() };
+
             if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                // If we've got a timeout, use WaitForSingleObject in tandem
+                // with CancelIo to figure out if we should indeed get the
+                // result.
+                if self.deadline != 0 {
+                    let now = ::io::timer::now();
+                    let timeout = self.deadline < now || unsafe {
+                        let ms = (self.deadline - now) as libc::DWORD;
+                        let r = libc::WaitForSingleObject(overlapped.hEvent,
+                                                          ms);
+                        r != libc::WAIT_OBJECT_0
+                    };
+                    if timeout {
+                        unsafe { let _ = c::CancelIo(handle); }
+                        return Err(util::timeout("accept timed out"))
+                    }
+                }
+
+                // This will block until the overlapped I/O is completed. The
+                // timeout was previously handled, so this will either block in
+                // the normal case or succeed very quickly in the timeout case.
                 let ret = unsafe {
                     let mut transfer = 0;
                     libc::GetOverlappedResult(handle,
@@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
         self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
     }
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
+    }
 }
 
diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs
new file mode 100644
index 00000000000..0aaac8f8ad8
--- /dev/null
+++ b/src/libnative/io/util.rs
@@ -0,0 +1,136 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc;
+use std::io::IoResult;
+use std::io;
+use std::mem;
+use std::ptr;
+
+use super::c;
+use super::net;
+use super::{retry, last_error};
+
+pub fn timeout(desc: &'static str) -> io::IoError {
+    io::IoError {
+        kind: io::TimedOut,
+        desc: desc,
+        detail: None,
+    }
+}
+
+pub fn ms_to_timeval(ms: u64) -> libc::timeval {
+    libc::timeval {
+        tv_sec: (ms / 1000) as libc::time_t,
+        tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
+    }
+}
+
+// See http://developerweb.net/viewtopic.php?id=3196 for where this is
+// derived from.
+pub fn connect_timeout(fd: net::sock_t,
+                       addrp: *libc::sockaddr,
+                       len: libc::socklen_t,
+                       timeout_ms: u64) -> IoResult<()> {
+    use std::os;
+    #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
+    #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
+    #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
+    #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
+
+    // Make sure the call to connect() doesn't block
+    try!(set_nonblocking(fd, true));
+
+    let ret = match unsafe { libc::connect(fd, addrp, len) } {
+        // If the connection is in progress, then we need to wait for it to
+        // finish (with a timeout). The current strategy for doing this is
+        // to use select() with a timeout.
+        -1 if os::errno() as int == INPROGRESS as int ||
+              os::errno() as int == WOULDBLOCK as int => {
+            let mut set: c::fd_set = unsafe { mem::init() };
+            c::fd_set(&mut set, fd);
+            match await(fd, &mut set, timeout_ms) {
+                0 => Err(timeout("connection timed out")),
+                -1 => Err(last_error()),
+                _ => {
+                    let err: libc::c_int = try!(
+                        net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
+                    if err == 0 {
+                        Ok(())
+                    } else {
+                        Err(io::IoError::from_errno(err as uint, true))
+                    }
+                }
+            }
+        }
+
+        -1 => Err(last_error()),
+        _ => Ok(()),
+    };
+
+    // be sure to turn blocking I/O back on
+    try!(set_nonblocking(fd, false));
+    return ret;
+
+    #[cfg(unix)]
+    fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+        let set = nb as libc::c_int;
+        super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
+    }
+
+    #[cfg(windows)]
+    fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+        let mut set = nb as libc::c_ulong;
+        if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
+            Err(last_error())
+        } else {
+            Ok(())
+        }
+    }
+
+    #[cfg(unix)]
+    fn await(fd: net::sock_t, set: &mut c::fd_set,
+             timeout: u64) -> libc::c_int {
+        let start = ::io::timer::now();
+        retry(|| unsafe {
+            // Recalculate the timeout each iteration (it is generally
+            // undefined what the value of the 'tv' is after select
+            // returns EINTR).
+            let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
+            c::select(fd + 1, ptr::null(), set as *mut _ as *_,
+                      ptr::null(), &tv)
+        })
+    }
+    #[cfg(windows)]
+    fn await(_fd: net::sock_t, set: &mut c::fd_set,
+             timeout: u64) -> libc::c_int {
+        let tv = ms_to_timeval(timeout);
+        unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
+    }
+}
+
+pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> {
+    let mut set: c::fd_set = unsafe { mem::init() };
+    c::fd_set(&mut set, fd);
+
+    match retry(|| {
+        // If we're past the deadline, then pass a 0 timeout to select() so
+        // we can poll the status of the socket.
+        let now = ::io::timer::now();
+        let ms = if deadline < now {0} else {deadline - now};
+        let tv = ms_to_timeval(ms);
+        let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
+        unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
+    }) {
+        -1 => Err(last_error()),
+        0 => Err(timeout("accept timed out")),
+        _ => return Ok(()),
+    }
+}
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 27a06911939..470a343b84e 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use std::cast;
-use std::io::IoError;
+use std::io::{IoError, IoResult};
 use std::io::net::ip;
 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
 use libc;
@@ -145,96 +145,43 @@ fn socket_name(sk: SocketNameKind,
         n => Err(uv_error_to_io_error(UvError(n)))
     }
 }
-
 ////////////////////////////////////////////////////////////////////////////////
-/// TCP implementation
+// Helpers for handling timeouts, shared for pipes/tcp
 ////////////////////////////////////////////////////////////////////////////////
 
-pub struct TcpWatcher {
-    handle: *uvll::uv_tcp_t,
-    stream: StreamWatcher,
-    home: HomeHandle,
-    refcount: Refcount,
-
-    // libuv can't support concurrent reads and concurrent writes of the same
-    // stream object, so we use these access guards in order to arbitrate among
-    // multiple concurrent reads and writes. Note that libuv *can* read and
-    // write simultaneously, it just can't read and read simultaneously.
-    read_access: Access,
-    write_access: Access,
-}
-
-pub struct TcpListener {
-    home: HomeHandle,
-    handle: *uvll::uv_pipe_t,
-    closing_task: Option<BlockedTask>,
-    outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
-    incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+pub struct ConnectCtx {
+    pub status: c_int,
+    pub task: Option<BlockedTask>,
+    pub timer: Option<~TimerWatcher>,
 }
 
-pub struct TcpAcceptor {
-    listener: ~TcpListener,
+pub struct AcceptTimeout {
     timer: Option<TimerWatcher>,
     timeout_tx: Option<Sender<()>>,
     timeout_rx: Option<Receiver<()>>,
 }
 
-// TCP watchers (clients/streams)
-
-impl TcpWatcher {
-    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
-        let handle = io.make_handle();
-        TcpWatcher::new_home(&io.loop_, handle)
-    }
-
-    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
-        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
-        assert_eq!(unsafe {
-            uvll::uv_tcp_init(loop_.handle, handle)
-        }, 0);
-        TcpWatcher {
-            home: home,
-            handle: handle,
-            stream: StreamWatcher::new(handle),
-            refcount: Refcount::new(),
-            read_access: Access::new(),
-            write_access: Access::new(),
-        }
-    }
-
-    pub fn connect(io: &mut UvIoFactory,
-                   address: ip::SocketAddr,
-                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
-        struct Ctx {
-            status: c_int,
-            task: Option<BlockedTask>,
-            timer: Option<~TimerWatcher>,
-        }
-
-        let tcp = TcpWatcher::new(io);
-        let (addr, _len) = addr_to_sockaddr(address);
+impl ConnectCtx {
+    pub fn connect<T>(
+        mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
+        f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
+    ) -> Result<T, UvError> {
         let mut req = Request::new(uvll::UV_CONNECT);
-        let result = unsafe {
-            let addr_p = &addr as *libc::sockaddr_storage;
-            uvll::uv_tcp_connect(req.handle, tcp.handle,
-                                 addr_p as *libc::sockaddr,
-                                 connect_cb)
-        };
-        return match result {
+        let r = f(&req, &obj, connect_cb);
+        return match r {
             0 => {
                 req.defuse(); // uv callback now owns this request
-                let mut cx = Ctx { status: -1, task: None, timer: None };
                 match timeout {
                     Some(t) => {
                         let mut timer = TimerWatcher::new(io);
                         timer.start(timer_cb, t, 0);
-                        cx.timer = Some(timer);
+                        self.timer = Some(timer);
                     }
                     None => {}
                 }
-                wait_until_woken_after(&mut cx.task, &io.loop_, || {
-                    let data = &cx as *_;
-                    match cx.timer {
+                wait_until_woken_after(&mut self.task, &io.loop_, || {
+                    let data = &self as *_;
+                    match self.timer {
                         Some(ref mut timer) => unsafe { timer.set_data(data) },
                         None => {}
                     }
@@ -247,9 +194,9 @@ impl TcpWatcher {
                 // If we failed because of a timeout, drop the TcpWatcher as
                 // soon as possible because it's data is now set to null and we
                 // want to cancel the callback ASAP.
-                match cx.status {
-                    0 => Ok(tcp),
-                    n => { drop(tcp); Err(UvError(n)) }
+                match self.status {
+                    0 => Ok(obj),
+                    n => { drop(obj); Err(UvError(n)) }
                 }
             }
             n => Err(UvError(n))
@@ -258,8 +205,8 @@ impl TcpWatcher {
         extern fn timer_cb(handle: *uvll::uv_timer_t) {
             // Don't close the corresponding tcp request, just wake up the task
             // and let RAII take care of the pending watcher.
-            let cx: &mut Ctx = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
+            let cx: &mut ConnectCtx = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
             };
             cx.status = uvll::ECANCELED;
             wakeup(&mut cx.task);
@@ -279,7 +226,7 @@ impl TcpWatcher {
             let data = unsafe { uvll::get_data_for_req(req.handle) };
             if data.is_null() { return }
 
-            let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
+            let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
             cx.status = status;
             match cx.timer {
                 Some(ref mut t) => t.stop(),
@@ -299,6 +246,157 @@ impl TcpWatcher {
     }
 }
 
+impl AcceptTimeout {
+    pub fn new() -> AcceptTimeout {
+        AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
+    }
+
+    pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
+        match self.timeout_rx {
+            None => c.recv(),
+            Some(ref rx) => {
+                use std::comm::Select;
+
+                // Poll the incoming channel first (don't rely on the order of
+                // select just yet). If someone's pending then we should return
+                // them immediately.
+                match c.try_recv() {
+                    Ok(data) => return data,
+                    Err(..) => {}
+                }
+
+                // Use select to figure out which channel gets ready first. We
+                // do some custom handling of select to ensure that we never
+                // actually drain the timeout channel (we'll keep seeing the
+                // timeout message in the future).
+                let s = Select::new();
+                let mut timeout = s.handle(rx);
+                let mut data = s.handle(c);
+                unsafe {
+                    timeout.add();
+                    data.add();
+                }
+                if s.wait() == timeout.id() {
+                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+                } else {
+                    c.recv()
+                }
+            }
+        }
+    }
+
+    pub fn clear(&mut self) {
+        // Clear any previous timeout by dropping the timer and transmission
+        // channels
+        drop((self.timer.take(),
+              self.timeout_tx.take(),
+              self.timeout_rx.take()))
+    }
+
+    pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
+        &mut self, ms: u64, t: &mut T
+    ) {
+        // If we have a timeout, lazily initialize the timer which will be used
+        // to fire when the timeout runs out.
+        if self.timer.is_none() {
+            let _m = t.fire_homing_missile();
+            let loop_ = Loop::wrap(unsafe {
+                uvll::get_loop_for_uv_handle(t.uv_handle())
+            });
+            let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
+            unsafe {
+                timer.set_data(self as *mut _ as *AcceptTimeout);
+            }
+            self.timer = Some(timer);
+        }
+
+        // Once we've got a timer, stop any previous timeout, reset it for the
+        // current one, and install some new channels to send/receive data on
+        let timer = self.timer.get_mut_ref();
+        timer.stop();
+        timer.start(timer_cb, ms, 0);
+        let (tx, rx) = channel();
+        self.timeout_tx = Some(tx);
+        self.timeout_rx = Some(rx);
+
+        extern fn timer_cb(timer: *uvll::uv_timer_t) {
+            let acceptor: &mut AcceptTimeout = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
+            };
+            // This send can never fail because if this timer is active then the
+            // receiving channel is guaranteed to be alive
+            acceptor.timeout_tx.get_ref().send(());
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// TCP implementation
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpWatcher {
+    handle: *uvll::uv_tcp_t,
+    stream: StreamWatcher,
+    home: HomeHandle,
+    refcount: Refcount,
+
+    // libuv can't support concurrent reads and concurrent writes of the same
+    // stream object, so we use these access guards in order to arbitrate among
+    // multiple concurrent reads and writes. Note that libuv *can* read and
+    // write simultaneously, it just can't read and read simultaneously.
+    read_access: Access,
+    write_access: Access,
+}
+
+pub struct TcpListener {
+    home: HomeHandle,
+    handle: *uvll::uv_pipe_t,
+    closing_task: Option<BlockedTask>,
+    outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+    incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+}
+
+pub struct TcpAcceptor {
+    listener: ~TcpListener,
+    timeout: AcceptTimeout,
+}
+
+// TCP watchers (clients/streams)
+
+impl TcpWatcher {
+    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
+        let handle = io.make_handle();
+        TcpWatcher::new_home(&io.loop_, handle)
+    }
+
+    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
+        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+        assert_eq!(unsafe {
+            uvll::uv_tcp_init(loop_.handle, handle)
+        }, 0);
+        TcpWatcher {
+            home: home,
+            handle: handle,
+            stream: StreamWatcher::new(handle),
+            refcount: Refcount::new(),
+            read_access: Access::new(),
+            write_access: Access::new(),
+        }
+    }
+
+    pub fn connect(io: &mut UvIoFactory,
+                   address: ip::SocketAddr,
+                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
+        let tcp = TcpWatcher::new(io);
+        let cx = ConnectCtx { status: -1, task: None, timer: None };
+        let (addr, _len) = addr_to_sockaddr(address);
+        let addr_p = &addr as *_ as *libc::sockaddr;
+        cx.connect(tcp, timeout, io, |req, tcp, cb| {
+            unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
+        })
+    }
+}
+
 impl HomingIO for TcpWatcher {
     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
@@ -463,9 +561,7 @@ impl rtio::RtioTcpListener for TcpListener {
         // create the acceptor object from ourselves
         let mut acceptor = ~TcpAcceptor {
             listener: self,
-            timer: None,
-            timeout_tx: None,
-            timeout_rx: None,
+            timeout: AcceptTimeout::new(),
         };
 
         let _m = acceptor.fire_homing_missile();
@@ -516,37 +612,7 @@ impl rtio::RtioSocket for TcpAcceptor {
 
 impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
-        match self.timeout_rx {
-            None => self.listener.incoming.recv(),
-            Some(ref rx) => {
-                use std::comm::Select;
-
-                // Poll the incoming channel first (don't rely on the order of
-                // select just yet). If someone's pending then we should return
-                // them immediately.
-                match self.listener.incoming.try_recv() {
-                    Ok(data) => return data,
-                    Err(..) => {}
-                }
-
-                // Use select to figure out which channel gets ready first. We
-                // do some custom handling of select to ensure that we never
-                // actually drain the timeout channel (we'll keep seeing the
-                // timeout message in the future).
-                let s = Select::new();
-                let mut timeout = s.handle(rx);
-                let mut data = s.handle(&self.listener.incoming);
-                unsafe {
-                    timeout.add();
-                    data.add();
-                }
-                if s.wait() == timeout.id() {
-                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
-                } else {
-                    self.listener.incoming.recv()
-                }
-            }
-        }
+        self.timeout.accept(&self.listener.incoming)
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
@@ -564,47 +630,9 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
     }
 
     fn set_timeout(&mut self, ms: Option<u64>) {
-        // First, if the timeout is none, clear any previous timeout by dropping
-        // the timer and transmission channels
-        let ms = match ms {
-            None => {
-                return drop((self.timer.take(),
-                             self.timeout_tx.take(),
-                             self.timeout_rx.take()))
-            }
-            Some(ms) => ms,
-        };
-
-        // If we have a timeout, lazily initialize the timer which will be used
-        // to fire when the timeout runs out.
-        if self.timer.is_none() {
-            let _m = self.fire_homing_missile();
-            let loop_ = Loop::wrap(unsafe {
-                uvll::get_loop_for_uv_handle(self.listener.handle)
-            });
-            let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
-            unsafe {
-                timer.set_data(self as *mut _ as *TcpAcceptor);
-            }
-            self.timer = Some(timer);
-        }
-
-        // Once we've got a timer, stop any previous timeout, reset it for the
-        // current one, and install some new channels to send/receive data on
-        let timer = self.timer.get_mut_ref();
-        timer.stop();
-        timer.start(timer_cb, ms, 0);
-        let (tx, rx) = channel();
-        self.timeout_tx = Some(tx);
-        self.timeout_rx = Some(rx);
-
-        extern fn timer_cb(timer: *uvll::uv_timer_t) {
-            let acceptor: &mut TcpAcceptor = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
-            };
-            // This send can never fail because if this timer is active then the
-            // receiving channel is guaranteed to be alive
-            acceptor.timeout_tx.get_ref().send(());
+        match ms {
+            None => self.timeout.clear(),
+            Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
         }
     }
 }
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index 6ee684ff9bd..7277be1616b 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -12,14 +12,13 @@ use std::c_str::CString;
 use std::io::IoError;
 use libc;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
-use std::rt::task::BlockedTask;
 
 use access::Access;
 use homing::{HomingIO, HomeHandle};
+use net;
 use rc::Refcount;
 use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
-            wait_until_woken_after, wakeup};
+use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -43,6 +42,7 @@ pub struct PipeListener {
 
 pub struct PipeAcceptor {
     listener: ~PipeListener,
+    timeout: net::AcceptTimeout,
 }
 
 // PipeWatcher implementation and traits
@@ -84,36 +84,18 @@ impl PipeWatcher {
         }
     }
 
-    pub fn connect(io: &mut UvIoFactory, name: &CString)
+    pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
         -> Result<PipeWatcher, UvError>
     {
-        struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
-        let mut cx = Ctx { task: None, result: 0 };
-        let mut req = Request::new(uvll::UV_CONNECT);
         let pipe = PipeWatcher::new(io, false);
-
-        wait_until_woken_after(&mut cx.task, &io.loop_, || {
+        let cx = net::ConnectCtx { status: -1, task: None, timer: None };
+        cx.connect(pipe, timeout, io, |req, pipe, cb| {
             unsafe {
-                uvll::uv_pipe_connect(req.handle,
-                                      pipe.handle(),
-                                      name.with_ref(|p| p),
-                                      connect_cb)
+                uvll::uv_pipe_connect(req.handle, pipe.handle(),
+                                      name.with_ref(|p| p), cb)
             }
-            req.set_data(&cx);
-            req.defuse(); // uv callback now owns this request
-        });
-        return match cx.result {
-            0 => Ok(pipe),
-            n => Err(UvError(n))
-        };
-
-        extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
-            let req = Request::wrap(req);
-            assert!(status != uvll::ECANCELED);
-            let cx: &mut Ctx = unsafe { req.get_data() };
-            cx.result = status;
-            wakeup(&mut cx.task);
-        }
+            0
+        })
     }
 
     pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
@@ -199,7 +181,10 @@ impl PipeListener {
 impl RtioUnixListener for PipeListener {
     fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> {
         // create the acceptor object from ourselves
-        let mut acceptor = ~PipeAcceptor { listener: self };
+        let mut acceptor = ~PipeAcceptor {
+            listener: self,
+            timeout: net::AcceptTimeout::new(),
+        };
 
         let _m = acceptor.fire_homing_missile();
         // FIXME: the 128 backlog should be configurable
@@ -247,7 +232,14 @@ impl Drop for PipeListener {
 
 impl RtioUnixAcceptor for PipeAcceptor {
     fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> {
-        self.listener.incoming.recv()
+        self.timeout.accept(&self.listener.incoming)
+    }
+
+    fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        match timeout_ms {
+            None => self.timeout.clear(),
+            Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
+        }
     }
 }
 
@@ -265,7 +257,8 @@ mod tests {
 
     #[test]
     fn connect_err() {
-        match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) {
+        match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
+                                   None) {
             Ok(..) => fail!(),
             Err(..) => {}
         }
@@ -312,7 +305,7 @@ mod tests {
             assert!(client.write([2]).is_ok());
         });
         rx.recv();
-        let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+        let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
         assert!(c.write([1]).is_ok());
         let mut buf = [0];
         assert!(c.read(buf).unwrap() == 1);
@@ -332,7 +325,7 @@ mod tests {
             drop(p.accept().unwrap());
         });
         rx.recv();
-        let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+        let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
         fail!()
 
     }
diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs
index 3127a01d70e..81d7ac6601e 100644
--- a/src/librustuv/uvio.rs
+++ b/src/librustuv/uvio.rs
@@ -291,8 +291,9 @@ impl IoFactory for UvIoFactory {
         }
     }
 
-    fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> {
-        match PipeWatcher::connect(self, path) {
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
+        match PipeWatcher::connect(self, path, timeout) {
             Ok(p) => Ok(~p as ~rtio::RtioPipe:Send),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index bf568177020..b75b797e974 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -61,7 +61,31 @@ impl UnixStream {
     /// ```
     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
         LocalIo::maybe_raise(|io| {
-            io.unix_connect(&path.to_c_str()).map(UnixStream::new)
+            io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
+        })
+    }
+
+    /// Connect to a pipe named by `path`. This will attempt to open a
+    /// connection to the underlying socket.
+    ///
+    /// The returned stream will be closed when the object falls out of scope.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// # #![allow(unused_must_use)]
+    /// use std::io::net::unix::UnixStream;
+    ///
+    /// let server = Path::new("path/to/my/socket");
+    /// let mut stream = UnixStream::connect(&server);
+    /// stream.write([1, 2, 3]);
+    /// ```
+    #[experimental = "the timeout argument is likely to change types"]
+    pub fn connect_timeout<P: ToCStr>(path: &P,
+                                      timeout_ms: u64) -> IoResult<UnixStream> {
+        LocalIo::maybe_raise(|io| {
+            let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
+            s.map(UnixStream::new)
         })
     }
 }
@@ -128,6 +152,25 @@ pub struct UnixAcceptor {
     obj: ~RtioUnixAcceptor:Send,
 }
 
+impl UnixAcceptor {
+    /// Sets a timeout for this acceptor, after which accept() will no longer
+    /// block indefinitely.
+    ///
+    /// The argument specified is the amount of time, in milliseconds, into the
+    /// future after which all invocations of accept() will not block (and any
+    /// pending invocation will return). A value of `None` will clear any
+    /// existing timeout.
+    ///
+    /// When using this method, it is likely necessary to reset the timeout as
+    /// appropriate, the timeout specified is specific to this object, not
+    /// specific to the next request.
+    #[experimental = "the name and arguments to this function are likely \
+                      to change"]
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+}
+
 impl Acceptor<UnixStream> for UnixAcceptor {
     fn accept(&mut self) -> IoResult<UnixStream> {
         self.obj.accept().map(UnixStream::new)
@@ -135,6 +178,7 @@ impl Acceptor<UnixStream> for UnixAcceptor {
 }
 
 #[cfg(test)]
+#[allow(experimental)]
 mod tests {
     use prelude::*;
     use super::*;
@@ -371,4 +415,49 @@ mod tests {
         drop(l.listen().unwrap());
         assert!(!path.exists());
     } #[cfg(not(windows))])
+
+    iotest!(fn accept_timeout() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+
+        a.set_timeout(Some(10));
+
+        // Make sure we time out once and future invocations also time out
+        let err = a.accept().err().unwrap();
+        assert_eq!(err.kind, TimedOut);
+        let err = a.accept().err().unwrap();
+        assert_eq!(err.kind, TimedOut);
+
+        // Also make sure that even though the timeout is expired that we will
+        // continue to receive any pending connections.
+        let l = UnixStream::connect(&addr).unwrap();
+        for i in range(0, 1001) {
+            match a.accept() {
+                Ok(..) => break,
+                Err(ref e) if e.kind == TimedOut => {}
+                Err(e) => fail!("error: {}", e),
+            }
+            if i == 1000 { fail!("should have a pending connection") }
+        }
+        drop(l);
+
+        // Unset the timeout and make sure that this always blocks.
+        a.set_timeout(None);
+        let addr2 = addr.clone();
+        spawn(proc() {
+            drop(UnixStream::connect(&addr2));
+        });
+        a.accept().unwrap();
+    })
+
+    iotest!(fn connect_timeout_error() {
+        let addr = next_test_unix();
+        assert!(UnixStream::connect_timeout(&addr, 100).is_err());
+    })
+
+    iotest!(fn connect_timeout_success() {
+        let addr = next_test_unix();
+        let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+        assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
+    })
 }
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 5dd14834669..f3c7fdaf710 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -152,7 +152,8 @@ pub trait IoFactory {
     fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
     fn unix_bind(&mut self, path: &CString)
         -> IoResult<~RtioUnixListener:Send>;
-    fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>;
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> IoResult<~RtioPipe:Send>;
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
                           hint: Option<ai::Hint>) -> IoResult<~[ai::Info]>;
 
@@ -274,6 +275,7 @@ pub trait RtioUnixListener {
 
 pub trait RtioUnixAcceptor {
     fn accept(&mut self) -> IoResult<~RtioPipe:Send>;
+    fn set_timeout(&mut self, timeout: Option<u64>);
 }
 
 pub trait RtioTTY {