about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-04-27 15:45:16 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-07 23:29:04 -0700
commitb2c6d6fd3ff303c2e32a3ac0175810581c65b751 (patch)
treeb216794f1e88b1603cb0e019e73b6e39d6101bcb
parent295e0a04ad57c001e854c5f52cecc18335113544 (diff)
downloadrust-b2c6d6fd3ff303c2e32a3ac0175810581c65b751.tar.gz
rust-b2c6d6fd3ff303c2e32a3ac0175810581c65b751.zip
rustuv: Implement timeouts for unix networking
This commit implements the set{,_read,_write}_timeout() methods for the
libuv-based networking I/O objects. The implementation details are commented
thoroughly throughout the implementation.
-rw-r--r--src/librustuv/access.rs24
-rw-r--r--src/librustuv/lib.rs1
-rw-r--r--src/librustuv/net.rs383
-rw-r--r--src/librustuv/pipe.rs64
-rw-r--r--src/librustuv/stream.rs118
-rw-r--r--src/librustuv/timeout.rs394
-rw-r--r--src/librustuv/timer.rs2
-rw-r--r--src/librustuv/tty.rs2
8 files changed, 711 insertions, 277 deletions
diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs
index f96fa1e5be6..433073b43c4 100644
--- a/src/librustuv/access.rs
+++ b/src/librustuv/access.rs
@@ -31,7 +31,7 @@ pub struct Guard<'a> {
 }
 
 struct Inner {
-    queue: Vec<BlockedTask>,
+    queue: Vec<(BlockedTask, uint)>,
     held: bool,
     closed: bool,
 }
@@ -47,16 +47,17 @@ impl Access {
         }
     }
 
-    pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
+    pub fn grant<'a>(&'a mut self, token: uint,
+                     missile: HomingMissile) -> Guard<'a> {
         // This unsafety is actually OK because the homing missile argument
         // guarantees that we're on the same event loop as all the other objects
         // attempting to get access granted.
-        let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
+        let inner: &mut Inner = unsafe { &mut *self.inner.get() };
 
         if inner.held {
             let t: Box<Task> = Local::take();
             t.deschedule(1, |task| {
-                inner.queue.push(task);
+                inner.queue.push((task, token));
                 Ok(())
             });
             assert!(inner.held);
@@ -75,6 +76,17 @@ impl Access {
         // necessary synchronization to be running on this thread.
         unsafe { (*self.inner.get()).closed = true; }
     }
+
+    // Dequeue a blocked task with a specified token. This is unsafe because it
+    // is only safe to invoke while on the home event loop, and there is no
+    // guarantee that this i being invoked on the home event loop.
+    pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
+        let inner: &mut Inner = &mut *self.inner.get();
+        match inner.queue.iter().position(|&(_, t)| t == token) {
+            Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
+            None => None,
+        }
+    }
 }
 
 impl Clone for Access {
@@ -111,9 +123,9 @@ impl<'a> Drop for Guard<'a> {
             // scheduled on this scheduler. Because we might be woken up on some
             // other scheduler, we drop our homing missile before we reawaken
             // the task.
-            Some(task) => {
+            Some((task, _)) => {
                 drop(self.missile.take());
-                let _ = task.wake().map(|t| t.reawaken());
+                task.reawaken();
             }
             None => { inner.held = false; }
         }
diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs
index 84d4b6b4702..968029a6edc 100644
--- a/src/librustuv/lib.rs
+++ b/src/librustuv/lib.rs
@@ -84,6 +84,7 @@ fn start(argc: int, argv: **u8) -> int {
 mod macros;
 
 mod access;
+mod timeout;
 mod homing;
 mod queue;
 mod rc;
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 0ddf50921fd..84220cd7a30 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -12,21 +12,20 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint};
 use libc;
 use std::cast;
 use std::io;
-use std::io::{IoError, IoResult};
+use std::io::IoError;
 use std::io::net::ip;
 use std::mem;
 use std::ptr;
 use std::rt::rtio;
 use std::rt::task::BlockedTask;
 
-use access::Access;
 use homing::{HomingIO, HomeHandle};
 use rc::Refcount;
 use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
             wait_until_woken_after, wakeup};
-use timer::TimerWatcher;
+use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -146,190 +145,6 @@ fn socket_name(sk: SocketNameKind,
         n => Err(uv_error_to_io_error(UvError(n)))
     }
 }
-////////////////////////////////////////////////////////////////////////////////
-// Helpers for handling timeouts, shared for pipes/tcp
-////////////////////////////////////////////////////////////////////////////////
-
-pub struct ConnectCtx {
-    pub status: c_int,
-    pub task: Option<BlockedTask>,
-    pub timer: Option<Box<TimerWatcher>>,
-}
-
-pub struct AcceptTimeout {
-    timer: Option<TimerWatcher>,
-    timeout_tx: Option<Sender<()>>,
-    timeout_rx: Option<Receiver<()>>,
-}
-
-impl ConnectCtx {
-    pub fn connect<T>(
-        mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
-        f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
-    ) -> Result<T, UvError> {
-        let mut req = Request::new(uvll::UV_CONNECT);
-        let r = f(&req, &obj, connect_cb);
-        return match r {
-            0 => {
-                req.defuse(); // uv callback now owns this request
-                match timeout {
-                    Some(t) => {
-                        let mut timer = TimerWatcher::new(io);
-                        timer.start(timer_cb, t, 0);
-                        self.timer = Some(timer);
-                    }
-                    None => {}
-                }
-                wait_until_woken_after(&mut self.task, &io.loop_, || {
-                    let data = &self as *_;
-                    match self.timer {
-                        Some(ref mut timer) => unsafe { timer.set_data(data) },
-                        None => {}
-                    }
-                    req.set_data(data);
-                });
-                // Make sure an erroneously fired callback doesn't have access
-                // to the context any more.
-                req.set_data(0 as *int);
-
-                // If we failed because of a timeout, drop the TcpWatcher as
-                // soon as possible because it's data is now set to null and we
-                // want to cancel the callback ASAP.
-                match self.status {
-                    0 => Ok(obj),
-                    n => { drop(obj); Err(UvError(n)) }
-                }
-            }
-            n => Err(UvError(n))
-        };
-
-        extern fn timer_cb(handle: *uvll::uv_timer_t) {
-            // Don't close the corresponding tcp request, just wake up the task
-            // and let RAII take care of the pending watcher.
-            let cx: &mut ConnectCtx = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
-            };
-            cx.status = uvll::ECANCELED;
-            wakeup(&mut cx.task);
-        }
-
-        extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
-            // This callback can be invoked with ECANCELED if the watcher is
-            // closed by the timeout callback. In that case we just want to free
-            // the request and be along our merry way.
-            let req = Request::wrap(req);
-            if status == uvll::ECANCELED { return }
-
-            // Apparently on windows when the handle is closed this callback may
-            // not be invoked with ECANCELED but rather another error code.
-            // Either ways, if the data is null, then our timeout has expired
-            // and there's nothing we can do.
-            let data = unsafe { uvll::get_data_for_req(req.handle) };
-            if data.is_null() { return }
-
-            let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
-            cx.status = status;
-            match cx.timer {
-                Some(ref mut t) => t.stop(),
-                None => {}
-            }
-            // Note that the timer callback doesn't cancel the connect request
-            // (that's the job of uv_close()), so it's possible for this
-            // callback to get triggered after the timeout callback fires, but
-            // before the task wakes up. In that case, we did indeed
-            // successfully connect, but we don't need to wake someone up. We
-            // updated the status above (correctly so), and the task will pick
-            // up on this when it wakes up.
-            if cx.task.is_some() {
-                wakeup(&mut cx.task);
-            }
-        }
-    }
-}
-
-impl AcceptTimeout {
-    pub fn new() -> AcceptTimeout {
-        AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
-    }
-
-    pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
-        match self.timeout_rx {
-            None => c.recv(),
-            Some(ref rx) => {
-                use std::comm::Select;
-
-                // Poll the incoming channel first (don't rely on the order of
-                // select just yet). If someone's pending then we should return
-                // them immediately.
-                match c.try_recv() {
-                    Ok(data) => return data,
-                    Err(..) => {}
-                }
-
-                // Use select to figure out which channel gets ready first. We
-                // do some custom handling of select to ensure that we never
-                // actually drain the timeout channel (we'll keep seeing the
-                // timeout message in the future).
-                let s = Select::new();
-                let mut timeout = s.handle(rx);
-                let mut data = s.handle(c);
-                unsafe {
-                    timeout.add();
-                    data.add();
-                }
-                if s.wait() == timeout.id() {
-                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
-                } else {
-                    c.recv()
-                }
-            }
-        }
-    }
-
-    pub fn clear(&mut self) {
-        // Clear any previous timeout by dropping the timer and transmission
-        // channels
-        drop((self.timer.take(),
-              self.timeout_tx.take(),
-              self.timeout_rx.take()))
-    }
-
-    pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
-        &mut self, ms: u64, t: &mut T
-    ) {
-        // If we have a timeout, lazily initialize the timer which will be used
-        // to fire when the timeout runs out.
-        if self.timer.is_none() {
-            let _m = t.fire_homing_missile();
-            let loop_ = Loop::wrap(unsafe {
-                uvll::get_loop_for_uv_handle(t.uv_handle())
-            });
-            let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
-            unsafe {
-                timer.set_data(self as *mut _ as *AcceptTimeout);
-            }
-            self.timer = Some(timer);
-        }
-
-        // Once we've got a timer, stop any previous timeout, reset it for the
-        // current one, and install some new channels to send/receive data on
-        let timer = self.timer.get_mut_ref();
-        timer.stop();
-        timer.start(timer_cb, ms, 0);
-        let (tx, rx) = channel();
-        self.timeout_tx = Some(tx);
-        self.timeout_rx = Some(rx);
-
-        extern fn timer_cb(timer: *uvll::uv_timer_t) {
-            let acceptor: &mut AcceptTimeout = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
-            };
-            // This send can never fail because if this timer is active then the
-            // receiving channel is guaranteed to be alive
-            acceptor.timeout_tx.get_ref().send(());
-        }
-    }
-}
 
 ////////////////////////////////////////////////////////////////////////////////
 /// TCP implementation
@@ -345,8 +160,8 @@ pub struct TcpWatcher {
     // stream object, so we use these access guards in order to arbitrate among
     // multiple concurrent reads and writes. Note that libuv *can* read and
     // write simultaneously, it just can't read and read simultaneously.
-    read_access: Access,
-    write_access: Access,
+    read_access: AccessTimeout,
+    write_access: AccessTimeout,
 }
 
 pub struct TcpListener {
@@ -380,8 +195,8 @@ impl TcpWatcher {
             handle: handle,
             stream: StreamWatcher::new(handle),
             refcount: Refcount::new(),
-            read_access: Access::new(),
-            write_access: Access::new(),
+            read_access: AccessTimeout::new(),
+            write_access: AccessTimeout::new(),
         }
     }
 
@@ -412,10 +227,10 @@ impl rtio::RtioSocket for TcpWatcher {
 impl rtio::RtioTcpStream for TcpWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let m = self.fire_homing_missile();
-        let access = self.read_access.grant(m);
+        let guard = try!(self.read_access.grant(m));
 
         // see comments in close_read about this check
-        if access.is_closed() {
+        if guard.access.is_closed() {
             return Err(io::standard_error(io::EndOfFile))
         }
 
@@ -424,8 +239,8 @@ impl rtio::RtioTcpStream for TcpWatcher {
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         let m = self.fire_homing_missile();
-        let _g = self.write_access.grant(m);
-        self.stream.write(buf).map_err(uv_error_to_io_error)
+        let guard = try!(self.write_access.grant(m));
+        self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
     }
 
     fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
@@ -468,16 +283,19 @@ impl rtio::RtioTcpStream for TcpWatcher {
             stream: StreamWatcher::new(self.handle),
             home: self.home.clone(),
             refcount: self.refcount.clone(),
-            write_access: self.write_access.clone(),
             read_access: self.read_access.clone(),
+            write_access: self.write_access.clone(),
         } as Box<rtio::RtioTcpStream:Send>
     }
 
     fn close_read(&mut self) -> Result<(), IoError> {
         // see comments in PipeWatcher::close_read
-        let m = self.fire_homing_missile();
-        self.read_access.close(&m);
-        self.stream.cancel_read(m);
+        let task = {
+            let m = self.fire_homing_missile();
+            self.read_access.access.close(&m);
+    self.stream.cancel_read(uvll::EOF as libc::ssize_t)
+        };
+        let _ = task.map(|t| t.reawaken());
         Ok(())
     }
 
@@ -485,6 +303,35 @@ impl rtio::RtioTcpStream for TcpWatcher {
         let _m = self.fire_homing_missile();
         shutdown(self.handle, &self.uv_loop())
     }
+
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.set_read_timeout(timeout);
+        self.set_write_timeout(timeout);
+    }
+
+    fn set_read_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
+                                     &self.stream as *_ as uint);
+
+        fn cancel_read(stream: uint) -> Option<BlockedTask> {
+            let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
+            stream.cancel_read(uvll::ECANCELED as ssize_t)
+        }
+    }
+
+    fn set_write_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
+                                      &self.stream as *_ as uint);
+
+        fn cancel_write(stream: uint) -> Option<BlockedTask> {
+            let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
+            stream.cancel_write()
+        }
+    }
 }
 
 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@@ -618,6 +465,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
     }
 
     fn set_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
         match ms {
             None => self.timeout.clear(),
             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
@@ -635,8 +483,22 @@ pub struct UdpWatcher {
 
     // See above for what these fields are
     refcount: Refcount,
-    read_access: Access,
-    write_access: Access,
+    read_access: AccessTimeout,
+    write_access: AccessTimeout,
+
+    blocked_sender: Option<BlockedTask>,
+}
+
+struct UdpRecvCtx {
+    task: Option<BlockedTask>,
+    buf: Option<Buf>,
+    result: Option<(ssize_t, Option<ip::SocketAddr>)>,
+}
+
+struct UdpSendCtx {
+    result: c_int,
+    data: Option<Vec<u8>>,
+    udp: *mut UdpWatcher,
 }
 
 impl UdpWatcher {
@@ -646,8 +508,9 @@ impl UdpWatcher {
             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
             home: io.make_handle(),
             refcount: Refcount::new(),
-            read_access: Access::new(),
-            write_access: Access::new(),
+            read_access: AccessTimeout::new(),
+            write_access: AccessTimeout::new(),
+            blocked_sender: None,
         };
         assert_eq!(unsafe {
             uvll::uv_udp_init(io.uv_loop(), udp.handle)
@@ -683,20 +546,15 @@ impl rtio::RtioUdpSocket for UdpWatcher {
     fn recvfrom(&mut self, buf: &mut [u8])
         -> Result<(uint, ip::SocketAddr), IoError>
     {
-        struct Ctx {
-            task: Option<BlockedTask>,
-            buf: Option<Buf>,
-            result: Option<(ssize_t, Option<ip::SocketAddr>)>,
-        }
         let loop_ = self.uv_loop();
         let m = self.fire_homing_missile();
-        let _g = self.read_access.grant(m);
+        let _guard = try!(self.read_access.grant(m));
 
         return match unsafe {
             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
         } {
             0 => {
-                let mut cx = Ctx {
+                let mut cx = UdpRecvCtx {
                     task: None,
                     buf: Some(slice_to_uv_buf(buf)),
                     result: None,
@@ -718,7 +576,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
                            _suggested_size: size_t,
                            buf: *mut Buf) {
             unsafe {
-                let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
+                let cx = uvll::get_data_for_uv_handle(handle);
+                let cx = &mut *(cx as *mut UdpRecvCtx);
                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
             }
         }
@@ -727,7 +586,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
                           addr: *libc::sockaddr, _flags: c_uint) {
             assert!(nread != uvll::ECANCELED as ssize_t);
             let cx = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
             };
 
             // When there's no data to read the recv callback can be a no-op.
@@ -751,42 +610,68 @@ impl rtio::RtioUdpSocket for UdpWatcher {
     }
 
     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
-        struct Ctx { task: Option<BlockedTask>, result: c_int }
-
         let m = self.fire_homing_missile();
         let loop_ = self.uv_loop();
-        let _g = self.write_access.grant(m);
+        let guard = try!(self.write_access.grant(m));
 
         let mut req = Request::new(uvll::UV_UDP_SEND);
-        let buf = slice_to_uv_buf(buf);
         let (addr, _len) = addr_to_sockaddr(dst);
-        let result = unsafe {
-            let addr_p = &addr as *libc::sockaddr_storage;
-            uvll::uv_udp_send(req.handle, self.handle, [buf],
-                              addr_p as *libc::sockaddr, send_cb)
+        let addr_p = &addr as *_ as *libc::sockaddr;
+
+        // see comments in StreamWatcher::write for why we may allocate a buffer
+        // here.
+        let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
+        let uv_buf = if guard.can_timeout {
+            slice_to_uv_buf(data.get_ref().as_slice())
+        } else {
+            slice_to_uv_buf(buf)
         };
 
-        return match result {
+        return match unsafe {
+            uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
+        } {
             0 => {
                 req.defuse(); // uv callback now owns this request
-                let mut cx = Ctx { task: None, result: 0 };
-                wait_until_woken_after(&mut cx.task, &loop_, || {
+                let mut cx = UdpSendCtx {
+                    result: uvll::ECANCELED, data: data, udp: self as *mut _
+                };
+                wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
                     req.set_data(&cx);
                 });
-                match cx.result {
-                    0 => Ok(()),
-                    n => Err(uv_error_to_io_error(UvError(n)))
+
+                if cx.result != uvll::ECANCELED {
+                    return match cx.result {
+                        0 => Ok(()),
+                        n => Err(uv_error_to_io_error(UvError(n)))
+                    }
                 }
+                let new_cx = ~UdpSendCtx {
+                    result: 0,
+                    udp: 0 as *mut UdpWatcher,
+                    data: cx.data.take(),
+                };
+                unsafe {
+                    req.set_data(&*new_cx);
+                    cast::forget(new_cx);
+                }
+                Err(uv_error_to_io_error(UvError(cx.result)))
             }
             n => Err(uv_error_to_io_error(UvError(n)))
         };
 
+        // This function is the same as stream::write_cb, but adapted for udp
+        // instead of streams.
         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
             let req = Request::wrap(req);
-            assert!(status != uvll::ECANCELED);
-            let cx: &mut Ctx = unsafe { req.get_data() };
+            let cx: &mut UdpSendCtx = unsafe { req.get_data() };
             cx.result = status;
-            wakeup(&mut cx.task);
+
+            if cx.udp as uint != 0 {
+                let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
+                wakeup(&mut udp.blocked_sender);
+            } else {
+                let _cx: ~UdpSendCtx = unsafe { cast::transmute(cx) };
+            }
         }
     }
 
@@ -866,8 +751,48 @@ impl rtio::RtioUdpSocket for UdpWatcher {
             refcount: self.refcount.clone(),
             write_access: self.write_access.clone(),
             read_access: self.read_access.clone(),
+            blocked_sender: None,
         } as Box<rtio::RtioUdpSocket:Send>
     }
+
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.set_read_timeout(timeout);
+        self.set_write_timeout(timeout);
+    }
+
+    fn set_read_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
+                                     self.handle as uint);
+
+        fn cancel_read(stream: uint) -> Option<BlockedTask> {
+            // This method is quite similar to StreamWatcher::cancel_read, see
+            // there for more information
+            let handle = stream as *uvll::uv_udp_t;
+            assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
+            let data = unsafe {
+                let data = uvll::get_data_for_uv_handle(handle);
+                if data.is_null() { return None }
+                uvll::set_data_for_uv_handle(handle, 0 as *int);
+                &mut *(data as *mut UdpRecvCtx)
+            };
+            data.result = Some((uvll::ECANCELED as ssize_t, None));
+            data.task.take()
+        }
+    }
+
+    fn set_write_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
+                                      self as *mut _ as uint);
+
+        fn cancel_write(stream: uint) -> Option<BlockedTask> {
+            let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
+            stream.blocked_sender.take()
+        }
+    }
 }
 
 impl Drop for UdpWatcher {
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index 7fec4051761..76bf92bd555 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -10,16 +10,18 @@
 
 use libc;
 use std::c_str::CString;
+use std::cast;
 use std::io::IoError;
 use std::io;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
+use std::rt::task::BlockedTask;
 
-use access::Access;
 use homing::{HomingIO, HomeHandle};
 use net;
 use rc::Refcount;
 use stream::StreamWatcher;
 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
+use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -30,8 +32,8 @@ pub struct PipeWatcher {
     refcount: Refcount,
 
     // see comments in TcpWatcher for why these exist
-    write_access: Access,
-    read_access: Access,
+    write_access: AccessTimeout,
+    read_access: AccessTimeout,
 }
 
 pub struct PipeListener {
@@ -43,7 +45,7 @@ pub struct PipeListener {
 
 pub struct PipeAcceptor {
     listener: Box<PipeListener>,
-    timeout: net::AcceptTimeout,
+    timeout: AcceptTimeout,
 }
 
 // PipeWatcher implementation and traits
@@ -70,8 +72,8 @@ impl PipeWatcher {
             home: home,
             defused: false,
             refcount: Refcount::new(),
-            read_access: Access::new(),
-            write_access: Access::new(),
+            read_access: AccessTimeout::new(),
+            write_access: AccessTimeout::new(),
         }
     }
 
@@ -89,7 +91,7 @@ impl PipeWatcher {
         -> Result<PipeWatcher, UvError>
     {
         let pipe = PipeWatcher::new(io, false);
-        let cx = net::ConnectCtx { status: -1, task: None, timer: None };
+        let cx = ConnectCtx { status: -1, task: None, timer: None };
         cx.connect(pipe, timeout, io, |req, pipe, cb| {
             unsafe {
                 uvll::uv_pipe_connect(req.handle, pipe.handle(),
@@ -112,10 +114,10 @@ impl PipeWatcher {
 impl RtioPipe for PipeWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let m = self.fire_homing_missile();
-        let access = self.read_access.grant(m);
+        let guard = try!(self.read_access.grant(m));
 
         // see comments in close_read about this check
-        if access.is_closed() {
+        if guard.access.is_closed() {
             return Err(io::standard_error(io::EndOfFile))
         }
 
@@ -124,8 +126,8 @@ impl RtioPipe for PipeWatcher {
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         let m = self.fire_homing_missile();
-        let _g = self.write_access.grant(m);
-        self.stream.write(buf).map_err(uv_error_to_io_error)
+        let guard = try!(self.write_access.grant(m));
+        self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
     }
 
     fn clone(&self) -> Box<RtioPipe:Send> {
@@ -157,9 +159,12 @@ impl RtioPipe for PipeWatcher {
         // ordering is crucial because we could in theory be rescheduled during
         // the uv_read_stop which means that another read invocation could leak
         // in before we set the flag.
-        let m = self.fire_homing_missile();
-        self.read_access.close(&m);
-        self.stream.cancel_read(m);
+        let task = {
+            let m = self.fire_homing_missile();
+            self.read_access.access.close(&m);
+            self.stream.cancel_read(uvll::EOF as libc::ssize_t)
+        };
+        let _ = task.map(|t| t.reawaken());
         Ok(())
     }
 
@@ -167,6 +172,35 @@ impl RtioPipe for PipeWatcher {
         let _m = self.fire_homing_missile();
         net::shutdown(self.stream.handle, &self.uv_loop())
     }
+
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.set_read_timeout(timeout);
+        self.set_write_timeout(timeout);
+    }
+
+    fn set_read_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
+                                     &self.stream as *_ as uint);
+
+        fn cancel_read(stream: uint) -> Option<BlockedTask> {
+            let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
+            stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
+        }
+    }
+
+    fn set_write_timeout(&mut self, ms: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
+        self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
+                                      &self.stream as *_ as uint);
+
+        fn cancel_write(stream: uint) -> Option<BlockedTask> {
+            let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
+            stream.cancel_write()
+        }
+    }
 }
 
 impl HomingIO for PipeWatcher {
@@ -219,7 +253,7 @@ impl RtioUnixListener for PipeListener {
         // create the acceptor object from ourselves
         let mut acceptor = box PipeAcceptor {
             listener: self,
-            timeout: net::AcceptTimeout::new(),
+            timeout: AcceptTimeout::new(),
         };
 
         let _m = acceptor.fire_homing_missile();
diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs
index a1b606709d8..0b067372583 100644
--- a/src/librustuv/stream.rs
+++ b/src/librustuv/stream.rs
@@ -14,7 +14,6 @@ use std::ptr;
 use std::rt::task::BlockedTask;
 
 use Loop;
-use homing::HomingMissile;
 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
             ForbidUnwind, wakeup};
 use uvll;
@@ -31,6 +30,8 @@ pub struct StreamWatcher {
     // structure, but currently we don't have mappings for all the structures
     // defined in libuv, so we're foced to malloc this.
     last_write_req: Option<Request>,
+
+    blocked_writer: Option<BlockedTask>,
 }
 
 struct ReadContext {
@@ -41,7 +42,8 @@ struct ReadContext {
 
 struct WriteContext {
     result: c_int,
-    task: Option<BlockedTask>,
+    stream: *mut StreamWatcher,
+    data: Option<Vec<u8>>,
 }
 
 impl StreamWatcher {
@@ -62,6 +64,7 @@ impl StreamWatcher {
         StreamWatcher {
             handle: stream,
             last_write_req: None,
+            blocked_writer: None,
         }
     }
 
@@ -74,7 +77,7 @@ impl StreamWatcher {
             buf: Some(slice_to_uv_buf(buf)),
             // if the read is canceled, we'll see eof, otherwise this will get
             // overwritten
-            result: uvll::EOF as ssize_t,
+            result: 0,
             task: None,
         };
         // When reading a TTY stream on windows, libuv will invoke alloc_cb
@@ -104,27 +107,22 @@ impl StreamWatcher {
         return ret;
     }
 
-    pub fn cancel_read(&mut self, m: HomingMissile) {
+    pub fn cancel_read(&mut self, reason: ssize_t) -> Option<BlockedTask> {
         // When we invoke uv_read_stop, it cancels the read and alloc
         // callbacks. We need to manually wake up a pending task (if one was
-        // present). Note that we wake up the task *outside* the homing missile
-        // to ensure that we don't switch schedulers when we're not supposed to.
+        // present).
         assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
         let data = unsafe {
             let data = uvll::get_data_for_uv_handle(self.handle);
-            if data.is_null() { return }
+            if data.is_null() { return None }
             uvll::set_data_for_uv_handle(self.handle, 0 as *int);
             &mut *(data as *mut ReadContext)
         };
-        let task = data.task.take();
-        drop(m);
-        match task {
-            Some(task) => { let _ = task.wake().map(|t| t.reawaken()); }
-            None => {}
-        }
+        data.result = reason;
+        data.task.take()
     }
 
-    pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
+    pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> {
         // The ownership of the write request is dubious if this function
         // unwinds. I believe that if the write_cb fails to re-schedule the task
         // then the write request will be leaked.
@@ -137,30 +135,94 @@ impl StreamWatcher {
         };
         req.set_data(ptr::null::<()>());
 
+        // And here's where timeouts get a little interesting. Currently, libuv
+        // does not support canceling an in-flight write request. Consequently,
+        // when a write timeout expires, there's not much we can do other than
+        // detach the sleeping task from the write request itself. Semantically,
+        // this means that the write request will complete asynchronously, but
+        // the calling task will return error (because the write timed out).
+        //
+        // There is special wording in the documentation of set_write_timeout()
+        // indicating that this is a plausible failure scenario, and this
+        // function is why that wording exists.
+        //
+        // Implementation-wise, we must be careful when passing a buffer down to
+        // libuv. Most of this implementation avoids allocations becuase of the
+        // blocking guarantee (all stack local variables are valid for the
+        // entire read/write request). If our write request can be timed out,
+        // however, we must heap allocate the data and pass that to the libuv
+        // functions instead. The reason for this is that if we time out and
+        // return, there's no guarantee that `buf` is a valid buffer any more.
+        //
+        // To do this, the write context has an optionally owned vector of
+        // bytes.
+        let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
+        let uv_buf = if may_timeout {
+            slice_to_uv_buf(data.get_ref().as_slice())
+        } else {
+            slice_to_uv_buf(buf)
+        };
+
         // Send off the request, but be careful to not block until we're sure
         // that the write reqeust is queued. If the reqeust couldn't be queued,
         // then we should return immediately with an error.
         match unsafe {
-            uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
-                           write_cb)
+            uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
         } {
             0 => {
-                let mut wcx = WriteContext { result: 0, task: None, };
+                let mut wcx = WriteContext {
+                    result: uvll::ECANCELED,
+                    stream: self as *mut _,
+                    data: data,
+                };
                 req.defuse(); // uv callback now owns this request
 
                 let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
-                wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
+                wait_until_woken_after(&mut self.blocked_writer,
+                                       &Loop::wrap(loop_), || {
                     req.set_data(&wcx);
                 });
-                self.last_write_req = Some(Request::wrap(req.handle));
-                match wcx.result {
-                    0 => Ok(()),
-                    n => Err(UvError(n)),
+
+                if wcx.result != uvll::ECANCELED {
+                    self.last_write_req = Some(Request::wrap(req.handle));
+                    return match wcx.result {
+                        0 => Ok(()),
+                        n => Err(UvError(n)),
+                    }
                 }
+
+                // This is the second case where canceling an in-flight write
+                // gets interesting. If we've been canceled (no one reset our
+                // result), then someone still needs to free the request, and
+                // someone still needs to free the allocate buffer.
+                //
+                // To take care of this, we swap out the stack-allocated write
+                // context for a heap-allocated context, transferring ownership
+                // of everything to the write_cb. Libuv guarantees that this
+                // callback will be invoked at some point, and the callback will
+                // be responsible for deallocating these resources.
+                //
+                // Note that we don't cache this write request back in the
+                // stream watcher because we no longer have ownership of it, and
+                // we never will.
+                let new_wcx = ~WriteContext {
+                    result: 0,
+                    stream: 0 as *mut StreamWatcher,
+                    data: wcx.data.take(),
+                };
+                unsafe {
+                    req.set_data(&*new_wcx);
+                    cast::forget(new_wcx);
+                }
+                Err(UvError(wcx.result))
             }
             n => Err(UvError(n)),
         }
     }
+
+    pub fn cancel_write(&mut self) -> Option<BlockedTask> {
+        self.blocked_writer.take()
+    }
 }
 
 // This allocation callback expects to be invoked once and only once. It will
@@ -198,12 +260,18 @@ extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) {
 // away the error code as a result.
 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
     let mut req = Request::wrap(req);
-    assert!(status != uvll::ECANCELED);
     // Remember to not free the request because it is re-used between writes on
     // the same stream.
     let wcx: &mut WriteContext = unsafe { req.get_data() };
     wcx.result = status;
-    req.defuse();
 
-    wakeup(&mut wcx.task);
+    // If the stream is present, we haven't timed out, otherwise we acquire
+    // ownership of everything and then deallocate it all at once.
+    if wcx.stream as uint != 0 {
+        req.defuse();
+        let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
+        wakeup(&mut stream.blocked_writer);
+    } else {
+        let _wcx: ~WriteContext = unsafe { cast::transmute(wcx) };
+    }
 }
diff --git a/src/librustuv/timeout.rs b/src/librustuv/timeout.rs
new file mode 100644
index 00000000000..47c9d9335fe
--- /dev/null
+++ b/src/librustuv/timeout.rs
@@ -0,0 +1,394 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::c_int;
+use std::cast;
+use std::io::IoResult;
+use std::mem;
+use std::rt::task::BlockedTask;
+
+use access;
+use homing::{HomeHandle, HomingMissile, HomingIO};
+use timer::TimerWatcher;
+use uvll;
+use uvio::UvIoFactory;
+use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
+use {UvHandle, wait_until_woken_after};
+
+/// Managment of a timeout when gaining access to a portion of a duplex stream.
+pub struct AccessTimeout {
+    state: TimeoutState,
+    timer: Option<~TimerWatcher>,
+    pub access: access::Access,
+}
+
+pub struct Guard<'a> {
+    state: &'a mut TimeoutState,
+    pub access: access::Guard<'a>,
+    pub can_timeout: bool,
+}
+
+#[deriving(Eq)]
+enum TimeoutState {
+    NoTimeout,
+    TimeoutPending(ClientState),
+    TimedOut,
+}
+
+#[deriving(Eq)]
+enum ClientState {
+    NoWaiter,
+    AccessPending,
+    RequestPending,
+}
+
+struct TimerContext {
+    timeout: *mut AccessTimeout,
+    callback: fn(uint) -> Option<BlockedTask>,
+    payload: uint,
+}
+
+impl AccessTimeout {
+    pub fn new() -> AccessTimeout {
+        AccessTimeout {
+            state: NoTimeout,
+            timer: None,
+            access: access::Access::new(),
+        }
+    }
+
+    /// Grants access to half of a duplex stream, timing out if necessary.
+    ///
+    /// On success, Ok(Guard) is returned and access has been granted to the
+    /// stream. If a timeout occurs, then Err is returned with an appropriate
+    /// error.
+    pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
+        // First, flag that we're attempting to acquire access. This will allow
+        // us to cancel the pending grant if we timeout out while waiting for a
+        // grant.
+        match self.state {
+            NoTimeout => {},
+            TimeoutPending(ref mut client) => *client = AccessPending,
+            TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+        }
+        let access = self.access.grant(self as *mut _ as uint, m);
+
+        // After acquiring the grant, we need to flag ourselves as having a
+        // pending request so the timeout knows to cancel the request.
+        let can_timeout = match self.state {
+            NoTimeout => false,
+            TimeoutPending(ref mut client) => { *client = RequestPending; true }
+            TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+        };
+
+        Ok(Guard {
+            access: access,
+            state: &mut self.state,
+            can_timeout: can_timeout
+        })
+    }
+
+    /// Sets the pending timeout to the value specified.
+    ///
+    /// The home/loop variables are used to construct a timer if one has not
+    /// been previously constructed.
+    ///
+    /// The callback will be invoked if the timeout elapses, and the data of
+    /// the time will be set to `data`.
+    pub fn set_timeout(&mut self, ms: Option<u64>,
+                       home: &HomeHandle,
+                       loop_: &Loop,
+                       cb: fn(uint) -> Option<BlockedTask>,
+                       data: uint) {
+        self.state = NoTimeout;
+        let ms = match ms {
+            Some(ms) => ms,
+            None => return match self.timer {
+                Some(ref mut t) => t.stop(),
+                None => {}
+            }
+        };
+
+        // If we have a timeout, lazily initialize the timer which will be used
+        // to fire when the timeout runs out.
+        if self.timer.is_none() {
+            let mut timer = ~TimerWatcher::new_home(loop_, home.clone());
+            let cx = ~TimerContext {
+                timeout: self as *mut _,
+                callback: cb,
+                payload: data,
+            };
+            unsafe {
+                timer.set_data(&*cx);
+                cast::forget(cx);
+            }
+            self.timer = Some(timer);
+        }
+
+        let timer = self.timer.get_mut_ref();
+        unsafe {
+            let cx = uvll::get_data_for_uv_handle(timer.handle);
+            let cx = cx as *mut TimerContext;
+            (*cx).callback = cb;
+            (*cx).payload = data;
+        }
+        timer.stop();
+        timer.start(timer_cb, ms, 0);
+        self.state = TimeoutPending(NoWaiter);
+
+        extern fn timer_cb(timer: *uvll::uv_timer_t) {
+            let cx: &TimerContext = unsafe {
+                &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
+            };
+            let me = unsafe { &mut *cx.timeout };
+
+            match mem::replace(&mut me.state, TimedOut) {
+                TimedOut | NoTimeout => unreachable!(),
+                TimeoutPending(NoWaiter) => {}
+                TimeoutPending(AccessPending) => {
+                    match unsafe { me.access.dequeue(me as *mut _ as uint) } {
+                        Some(task) => task.reawaken(),
+                        None => unreachable!(),
+                    }
+                }
+                TimeoutPending(RequestPending) => {
+                    match (cx.callback)(cx.payload) {
+                        Some(task) => task.reawaken(),
+                        None => unreachable!(),
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl Clone for AccessTimeout {
+    fn clone(&self) -> AccessTimeout {
+        AccessTimeout {
+            access: self.access.clone(),
+            state: NoTimeout,
+            timer: None,
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<'a> Drop for Guard<'a> {
+    fn drop(&mut self) {
+        match *self.state {
+            TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
+                unreachable!(),
+
+            NoTimeout | TimedOut => {}
+            TimeoutPending(RequestPending) => {
+                *self.state = TimeoutPending(NoWaiter);
+            }
+        }
+    }
+}
+
+impl Drop for AccessTimeout {
+    fn drop(&mut self) {
+        match self.timer {
+            Some(ref timer) => unsafe {
+                let data = uvll::get_data_for_uv_handle(timer.handle);
+                let _data: ~TimerContext = cast::transmute(data);
+            },
+            None => {}
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Connect timeouts
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct ConnectCtx {
+    pub status: c_int,
+    pub task: Option<BlockedTask>,
+    pub timer: Option<~TimerWatcher>,
+}
+
+pub struct AcceptTimeout {
+    timer: Option<TimerWatcher>,
+    timeout_tx: Option<Sender<()>>,
+    timeout_rx: Option<Receiver<()>>,
+}
+
+impl ConnectCtx {
+    pub fn connect<T>(
+        mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
+        f: |&Request, &T, uvll::uv_connect_cb| -> c_int
+    ) -> Result<T, UvError> {
+        let mut req = Request::new(uvll::UV_CONNECT);
+        let r = f(&req, &obj, connect_cb);
+        return match r {
+            0 => {
+                req.defuse(); // uv callback now owns this request
+                match timeout {
+                    Some(t) => {
+                        let mut timer = TimerWatcher::new(io);
+                        timer.start(timer_cb, t, 0);
+                        self.timer = Some(timer);
+                    }
+                    None => {}
+                }
+                wait_until_woken_after(&mut self.task, &io.loop_, || {
+                    let data = &self as *_;
+                    match self.timer {
+                        Some(ref mut timer) => unsafe { timer.set_data(data) },
+                        None => {}
+                    }
+                    req.set_data(data);
+                });
+                // Make sure an erroneously fired callback doesn't have access
+                // to the context any more.
+                req.set_data(0 as *int);
+
+                // If we failed because of a timeout, drop the TcpWatcher as
+                // soon as possible because it's data is now set to null and we
+                // want to cancel the callback ASAP.
+                match self.status {
+                    0 => Ok(obj),
+                    n => { drop(obj); Err(UvError(n)) }
+                }
+            }
+            n => Err(UvError(n))
+        };
+
+        extern fn timer_cb(handle: *uvll::uv_timer_t) {
+            // Don't close the corresponding tcp request, just wake up the task
+            // and let RAII take care of the pending watcher.
+            let cx: &mut ConnectCtx = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
+            };
+            cx.status = uvll::ECANCELED;
+            wakeup(&mut cx.task);
+        }
+
+        extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
+            // This callback can be invoked with ECANCELED if the watcher is
+            // closed by the timeout callback. In that case we just want to free
+            // the request and be along our merry way.
+            let req = Request::wrap(req);
+            if status == uvll::ECANCELED { return }
+
+            // Apparently on windows when the handle is closed this callback may
+            // not be invoked with ECANCELED but rather another error code.
+            // Either ways, if the data is null, then our timeout has expired
+            // and there's nothing we can do.
+            let data = unsafe { uvll::get_data_for_req(req.handle) };
+            if data.is_null() { return }
+
+            let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
+            cx.status = status;
+            match cx.timer {
+                Some(ref mut t) => t.stop(),
+                None => {}
+            }
+            // Note that the timer callback doesn't cancel the connect request
+            // (that's the job of uv_close()), so it's possible for this
+            // callback to get triggered after the timeout callback fires, but
+            // before the task wakes up. In that case, we did indeed
+            // successfully connect, but we don't need to wake someone up. We
+            // updated the status above (correctly so), and the task will pick
+            // up on this when it wakes up.
+            if cx.task.is_some() {
+                wakeup(&mut cx.task);
+            }
+        }
+    }
+}
+
+impl AcceptTimeout {
+    pub fn new() -> AcceptTimeout {
+        AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
+    }
+
+    pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
+        match self.timeout_rx {
+            None => c.recv(),
+            Some(ref rx) => {
+                use std::comm::Select;
+
+                // Poll the incoming channel first (don't rely on the order of
+                // select just yet). If someone's pending then we should return
+                // them immediately.
+                match c.try_recv() {
+                    Ok(data) => return data,
+                    Err(..) => {}
+                }
+
+                // Use select to figure out which channel gets ready first. We
+                // do some custom handling of select to ensure that we never
+                // actually drain the timeout channel (we'll keep seeing the
+                // timeout message in the future).
+                let s = Select::new();
+                let mut timeout = s.handle(rx);
+                let mut data = s.handle(c);
+                unsafe {
+                    timeout.add();
+                    data.add();
+                }
+                if s.wait() == timeout.id() {
+                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+                } else {
+                    c.recv()
+                }
+            }
+        }
+    }
+
+    pub fn clear(&mut self) {
+        match self.timeout_rx {
+            Some(ref t) => { let _ = t.try_recv(); }
+            None => {}
+        }
+        match self.timer {
+            Some(ref mut t) => t.stop(),
+            None => {}
+        }
+    }
+
+    pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
+        &mut self, ms: u64, t: &mut T
+    ) {
+        // If we have a timeout, lazily initialize the timer which will be used
+        // to fire when the timeout runs out.
+        if self.timer.is_none() {
+            let loop_ = Loop::wrap(unsafe {
+                uvll::get_loop_for_uv_handle(t.uv_handle())
+            });
+            let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
+            unsafe {
+                timer.set_data(self as *mut _ as *AcceptTimeout);
+            }
+            self.timer = Some(timer);
+        }
+
+        // Once we've got a timer, stop any previous timeout, reset it for the
+        // current one, and install some new channels to send/receive data on
+        let timer = self.timer.get_mut_ref();
+        timer.stop();
+        timer.start(timer_cb, ms, 0);
+        let (tx, rx) = channel();
+        self.timeout_tx = Some(tx);
+        self.timeout_rx = Some(rx);
+
+        extern fn timer_cb(timer: *uvll::uv_timer_t) {
+            let acceptor: &mut AcceptTimeout = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
+            };
+            // This send can never fail because if this timer is active then the
+            // receiving channel is guaranteed to be alive
+            acceptor.timeout_tx.get_ref().send(());
+        }
+    }
+}
diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs
index 216eb600130..525539f8b36 100644
--- a/src/librustuv/timer.rs
+++ b/src/librustuv/timer.rs
@@ -18,7 +18,7 @@ use uvio::UvIoFactory;
 use uvll;
 
 pub struct TimerWatcher {
-    handle: *uvll::uv_timer_t,
+    pub handle: *uvll::uv_timer_t,
     home: HomeHandle,
     action: Option<NextAction>,
     blocker: Option<BlockedTask>,
diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs
index 4f3e12b6974..f70c3b4c1bd 100644
--- a/src/librustuv/tty.rs
+++ b/src/librustuv/tty.rs
@@ -87,7 +87,7 @@ impl RtioTTY for TtyWatcher {
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         let _m = self.fire_homing_missile();
-        self.stream.write(buf).map_err(uv_error_to_io_error)
+        self.stream.write(buf, false).map_err(uv_error_to_io_error)
     }
 
     fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {