about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/liblibc/lib.rs6
-rw-r--r--src/libnative/io/c_win32.rs2
-rw-r--r--src/libnative/io/file_unix.rs13
-rw-r--r--src/libnative/io/file_win32.rs11
-rw-r--r--src/libnative/io/net.rs7
-rw-r--r--src/libnative/io/pipe_unix.rs7
-rw-r--r--src/libnative/io/pipe_win32.rs175
-rw-r--r--src/librustuv/access.rs19
-rw-r--r--src/librustuv/net.rs100
-rw-r--r--src/librustuv/pipe.rs38
-rw-r--r--src/librustuv/stream.rs35
-rw-r--r--src/libstd/io/net/tcp.rs113
-rw-r--r--src/libstd/io/net/unix.rs102
-rw-r--r--src/libstd/rt/rtio.rs4
14 files changed, 534 insertions, 98 deletions
diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs
index a4593c1cb5a..5f67be22068 100644
--- a/src/liblibc/lib.rs
+++ b/src/liblibc/lib.rs
@@ -118,7 +118,7 @@ pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
 pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
 pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
 pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
-pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL};
+pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL, SHUT_RD};
 
 pub use funcs::c95::ctype::{isalnum, isalpha, iscntrl, isdigit};
 pub use funcs::c95::ctype::{islower, isprint, ispunct, isspace};
@@ -226,6 +226,8 @@ pub use funcs::bsd43::{shutdown};
 #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
+#[cfg(windows)] pub use consts::os::extra::{ERROR_NOT_FOUND};
+#[cfg(windows)] pub use consts::os::extra::{ERROR_OPERATION_ABORTED};
 #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
 #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
 #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
@@ -1740,8 +1742,10 @@ pub mod consts {
             pub static ERROR_NO_DATA: c_int = 232;
             pub static ERROR_INVALID_ADDRESS : c_int = 487;
             pub static ERROR_PIPE_CONNECTED: c_int = 535;
+            pub static ERROR_OPERATION_ABORTED: c_int = 995;
             pub static ERROR_IO_PENDING: c_int = 997;
             pub static ERROR_FILE_INVALID : c_int = 1006;
+            pub static ERROR_NOT_FOUND: c_int = 1168;
             pub static INVALID_HANDLE_VALUE : c_int = -1;
 
             pub static DELETE : DWORD = 0x00010000;
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs
index 6c84424e97a..4fdd05a8b42 100644
--- a/src/libnative/io/c_win32.rs
+++ b/src/libnative/io/c_win32.rs
@@ -61,4 +61,6 @@ extern "system" {
                       optlen: *mut libc::c_int) -> libc::c_int;
 
     pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
+    pub fn CancelIoEx(hFile: libc::HANDLE,
+                      lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
 }
diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs
index 2727f9a0b09..84ea0d29434 100644
--- a/src/libnative/io/file_unix.rs
+++ b/src/libnative/io/file_unix.rs
@@ -12,12 +12,12 @@
 
 use libc::{c_int, c_void};
 use libc;
-use std::sync::arc::UnsafeArc;
 use std::c_str::CString;
 use std::io::IoError;
 use std::io;
 use std::mem;
 use std::rt::rtio;
+use std::sync::arc::UnsafeArc;
 
 use io::{IoResult, retry, keep_going};
 
@@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc {
     fn clone(&self) -> Box<rtio::RtioPipe:Send> {
         box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
     }
+
+    // Only supported on named pipes currently. Note that this doesn't have an
+    // impact on the std::io primitives, this is never called via
+    // std::io::PipeStream. If the functionality is exposed in the future, then
+    // these methods will need to be implemented.
+    fn close_read(&mut self) -> Result<(), IoError> {
+        Err(io::standard_error(io::InvalidInput))
+    }
+    fn close_write(&mut self) -> Result<(), IoError> {
+        Err(io::standard_error(io::InvalidInput))
+    }
 }
 
 impl rtio::RtioTTY for FileDesc {
diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs
index 018907303b8..c2acd91d476 100644
--- a/src/libnative/io/file_win32.rs
+++ b/src/libnative/io/file_win32.rs
@@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc {
     fn clone(&self) -> Box<rtio::RtioPipe:Send> {
         box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
     }
+
+    // Only supported on named pipes currently. Note that this doesn't have an
+    // impact on the std::io primitives, this is never called via
+    // std::io::PipeStream. If the functionality is exposed in the future, then
+    // these methods will need to be implemented.
+    fn close_read(&mut self) -> IoResult<()> {
+        Err(io::standard_error(io::InvalidInput))
+    }
+    fn close_write(&mut self) -> IoResult<()> {
+        Err(io::standard_error(io::InvalidInput))
+    }
 }
 
 impl rtio::RtioTTY for FileDesc {
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 880cbaabaf8..a54fe911ae0 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream {
         } as Box<rtio::RtioTcpStream:Send>
     }
     fn close_write(&mut self) -> IoResult<()> {
-        super::mkerr_libc(unsafe {
-            libc::shutdown(self.fd(), libc::SHUT_WR)
-        })
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
+    }
+    fn close_read(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
     }
 }
 
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs
index 65e9c7448c2..94aca1ef748 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libnative/io/pipe_unix.rs
@@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream {
             inner: self.inner.clone(),
         } as Box<rtio::RtioPipe:Send>
     }
+
+    fn close_write(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
+    }
+    fn close_read(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs
index f1239285434..8050123cedc 100644
--- a/src/libnative/io/pipe_win32.rs
+++ b/src/libnative/io/pipe_win32.rs
@@ -84,13 +84,17 @@
 //! the test suite passing (the suite is in libstd), and that's good enough for
 //! me!
 
-use std::c_str::CString;
 use libc;
+use std::c_str::CString;
+use std::intrinsics;
+use std::io;
 use std::os::win32::as_utf16_p;
+use std::os;
 use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
-use std::intrinsics;
+use std::sync::atomics;
+use std::unstable::mutex;
 
 use super::IoResult;
 use super::c;
@@ -124,6 +128,20 @@ impl Drop for Event {
 
 struct Inner {
     handle: libc::HANDLE,
+    lock: mutex::NativeMutex,
+    read_closed: atomics::AtomicBool,
+    write_closed: atomics::AtomicBool,
+}
+
+impl Inner {
+    fn new(handle: libc::HANDLE) -> Inner {
+        Inner {
+            handle: handle,
+            lock: unsafe { mutex::NativeMutex::new() },
+            read_closed: atomics::AtomicBool::new(false),
+            write_closed: atomics::AtomicBool::new(false),
+        }
+    }
 }
 
 impl Drop for Inner {
@@ -218,7 +236,7 @@ impl UnixStream {
             loop {
                 match UnixStream::try_connect(p) {
                     Some(handle) => {
-                        let inner = Inner { handle: handle };
+                        let inner = Inner::new(handle);
                         let mut mode = libc::PIPE_TYPE_BYTE |
                                        libc::PIPE_READMODE_BYTE |
                                        libc::PIPE_WAIT;
@@ -275,6 +293,24 @@ impl UnixStream {
     }
 
     fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
+
+    fn read_closed(&self) -> bool {
+        unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
+    }
+
+    fn write_closed(&self) -> bool {
+        unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
+    }
+
+    fn cancel_io(&self) -> IoResult<()> {
+        match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
+            0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
+                Ok(())
+            }
+            0 => Err(super::last_error()),
+            _ => Ok(())
+        }
+    }
 }
 
 impl rtio::RtioPipe for UnixStream {
@@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream {
         let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
         overlapped.hEvent = self.read.get_ref().handle();
 
+        // Pre-flight check to see if the reading half has been closed. This
+        // must be done before issuing the ReadFile request, but after we
+        // acquire the lock.
+        //
+        // See comments in close_read() about why this lock is necessary.
+        let guard = unsafe { (*self.inner.get()).lock.lock() };
+        if self.read_closed() {
+            return Err(io::standard_error(io::EndOfFile))
+        }
+
+        // Issue a nonblocking requests, succeeding quickly if it happened to
+        // succeed.
         let ret = unsafe {
             libc::ReadFile(self.handle(),
                            buf.as_ptr() as libc::LPVOID,
@@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream {
                            &mut bytes_read,
                            &mut overlapped)
         };
-        if ret == 0 {
-            let err = unsafe { libc::GetLastError() };
-            if err == libc::ERROR_IO_PENDING as libc::DWORD {
-                let ret = unsafe {
-                    libc::GetOverlappedResult(self.handle(),
-                                              &mut overlapped,
-                                              &mut bytes_read,
-                                              libc::TRUE)
-                };
-                if ret == 0 {
-                    return Err(super::last_error())
-                }
-            } else {
+        if ret != 0 { return Ok(bytes_read as uint) }
+
+        // If our errno doesn't say that the I/O is pending, then we hit some
+        // legitimate error and reeturn immediately.
+        if os::errno() != libc::ERROR_IO_PENDING as uint {
+            return Err(super::last_error())
+        }
+
+        // Now that we've issued a successful nonblocking request, we need to
+        // wait for it to finish. This can all be done outside the lock because
+        // we'll see any invocation of CancelIoEx. We also call this in a loop
+        // because we're woken up if the writing half is closed, we just need to
+        // realize that the reading half wasn't closed and we go right back to
+        // sleep.
+        drop(guard);
+        loop {
+            let ret = unsafe {
+                libc::GetOverlappedResult(self.handle(),
+                                          &mut overlapped,
+                                          &mut bytes_read,
+                                          libc::TRUE)
+            };
+            // If we succeeded, or we failed for some reason other than
+            // CancelIoEx, return immediately
+            if ret != 0 { return Ok(bytes_read as uint) }
+            if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                 return Err(super::last_error())
             }
-        }
 
-        Ok(bytes_read as uint)
+            // If the reading half is now closed, then we're done. If we woke up
+            // because the writing half was closed, keep trying.
+            if self.read_closed() {
+                return Err(io::standard_error(io::EndOfFile))
+            }
+        }
     }
 
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
@@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream {
 
         while offset < buf.len() {
             let mut bytes_written = 0;
+
+            // This sequence below is quite similar to the one found in read().
+            // Some careful looping is done to ensure that if close_write() is
+            // invoked we bail out early, and if close_read() is invoked we keep
+            // going after we woke up.
+            //
+            // See comments in close_read() about why this lock is necessary.
+            let guard = unsafe { (*self.inner.get()).lock.lock() };
+            if self.write_closed() {
+                return Err(io::standard_error(io::BrokenPipe))
+            }
             let ret = unsafe {
                 libc::WriteFile(self.handle(),
                                 buf.slice_from(offset).as_ptr() as libc::LPVOID,
@@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream {
                                 &mut bytes_written,
                                 &mut overlapped)
             };
+            drop(guard);
+
             if ret == 0 {
-                let err = unsafe { libc::GetLastError() };
-                if err == libc::ERROR_IO_PENDING as libc::DWORD {
-                    let ret = unsafe {
-                        libc::GetOverlappedResult(self.handle(),
-                                                  &mut overlapped,
-                                                  &mut bytes_written,
-                                                  libc::TRUE)
-                    };
-                    if ret == 0 {
+                if os::errno() != libc::ERROR_IO_PENDING as uint {
+                    return Err(super::last_error())
+                }
+                let ret = unsafe {
+                    libc::GetOverlappedResult(self.handle(),
+                                              &mut overlapped,
+                                              &mut bytes_written,
+                                              libc::TRUE)
+                };
+                // If we weren't aborted, this was a legit error, if we were
+                // aborted, then check to see if the write half was actually
+                // closed or whether we woke up from the read half closing.
+                if ret == 0 {
+                    if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                         return Err(super::last_error())
                     }
-                } else {
-                    return Err(super::last_error())
+                    if self.write_closed() {
+                        return Err(io::standard_error(io::BrokenPipe))
+                    }
+                    continue; // retry
                 }
             }
             offset += bytes_written as uint;
@@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream {
             write: None,
         } as Box<rtio::RtioPipe:Send>
     }
+
+    fn close_read(&mut self) -> IoResult<()> {
+        // On windows, there's no actual shutdown() method for pipes, so we're
+        // forced to emulate the behavior manually at the application level. To
+        // do this, we need to both cancel any pending requests, as well as
+        // prevent all future requests from succeeding. These two operations are
+        // not atomic with respect to one another, so we must use a lock to do
+        // so.
+        //
+        // The read() code looks like:
+        //
+        //      1. Make sure the pipe is still open
+        //      2. Submit a read request
+        //      3. Wait for the read request to finish
+        //
+        // The race this lock is preventing is if another thread invokes
+        // close_read() between steps 1 and 2. By atomically executing steps 1
+        // and 2 with a lock with respect to close_read(), we're guaranteed that
+        // no thread will erroneously sit in a read forever.
+        let _guard = unsafe { (*self.inner.get()).lock.lock() };
+        unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
+        self.cancel_io()
+    }
+
+    fn close_write(&mut self) -> IoResult<()> {
+        // see comments in close_read() for why this lock is necessary
+        let _guard = unsafe { (*self.inner.get()).lock.lock() };
+        unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
+        self.cancel_io()
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -520,7 +635,7 @@ impl UnixAcceptor {
 
         // Transfer ownership of our handle into this stream
         Ok(UnixStream {
-            inner: UnsafeArc::new(Inner { handle: handle }),
+            inner: UnsafeArc::new(Inner::new(handle)),
             read: None,
             write: None,
         })
diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs
index fbacf1ca314..f96fa1e5be6 100644
--- a/src/librustuv/access.rs
+++ b/src/librustuv/access.rs
@@ -33,6 +33,7 @@ pub struct Guard<'a> {
 struct Inner {
     queue: Vec<BlockedTask>,
     held: bool,
+    closed: bool,
 }
 
 impl Access {
@@ -41,6 +42,7 @@ impl Access {
             inner: UnsafeArc::new(Inner {
                 queue: vec![],
                 held: false,
+                closed: false,
             })
         }
     }
@@ -64,6 +66,15 @@ impl Access {
 
         Guard { access: self, missile: Some(missile) }
     }
+
+    pub fn close(&self, _missile: &HomingMissile) {
+        // This unsafety is OK because with a homing missile we're guaranteed to
+        // be the only task looking at the `closed` flag (and are therefore
+        // allowed to modify it). Additionally, no atomics are necessary because
+        // everyone's running on the same thread and has already done the
+        // necessary synchronization to be running on this thread.
+        unsafe { (*self.inner.get()).closed = true; }
+    }
 }
 
 impl Clone for Access {
@@ -72,6 +83,14 @@ impl Clone for Access {
     }
 }
 
+impl<'a> Guard<'a> {
+    pub fn is_closed(&self) -> bool {
+        // See above for why this unsafety is ok, it just applies to the read
+        // instead of the write.
+        unsafe { (*self.access.inner.get()).closed }
+    }
+}
+
 #[unsafe_destructor]
 impl<'a> Drop for Guard<'a> {
     fn drop(&mut self) {
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index a2701a57ca9..0ddf50921fd 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -11,6 +11,7 @@
 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
 use libc;
 use std::cast;
+use std::io;
 use std::io::{IoError, IoResult};
 use std::io::net::ip;
 use std::mem;
@@ -411,7 +412,13 @@ impl rtio::RtioSocket for TcpWatcher {
 impl rtio::RtioTcpStream for TcpWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let m = self.fire_homing_missile();
-        let _g = self.read_access.grant(m);
+        let access = self.read_access.grant(m);
+
+        // see comments in close_read about this check
+        if access.is_closed() {
+            return Err(io::standard_error(io::EndOfFile))
+        }
+
         self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
@@ -466,36 +473,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
         } as Box<rtio::RtioTcpStream:Send>
     }
 
-    fn close_write(&mut self) -> Result<(), IoError> {
-        struct Ctx {
-            slot: Option<BlockedTask>,
-            status: c_int,
-        }
-        let mut req = Request::new(uvll::UV_SHUTDOWN);
-
-        return match unsafe {
-            uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
-        } {
-            0 => {
-                req.defuse(); // uv callback now owns this request
-                let mut cx = Ctx { slot: None, status: 0 };
-
-                wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
-                    req.set_data(&cx);
-                });
-
-                status_to_io_result(cx.status)
-            }
-            n => Err(uv_error_to_io_error(UvError(n)))
-        };
+    fn close_read(&mut self) -> Result<(), IoError> {
+        // see comments in PipeWatcher::close_read
+        let m = self.fire_homing_missile();
+        self.read_access.close(&m);
+        self.stream.cancel_read(m);
+        Ok(())
+    }
 
-        extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
-            let req = Request::wrap(req);
-            assert!(status != uvll::ECANCELED);
-            let cx: &mut Ctx = unsafe { req.get_data() };
-            cx.status = status;
-            wakeup(&mut cx.slot);
-        }
+    fn close_write(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_homing_missile();
+        shutdown(self.handle, &self.uv_loop())
     }
 }
 
@@ -704,7 +692,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
         let m = self.fire_homing_missile();
         let _g = self.read_access.grant(m);
 
-        let a = match unsafe {
+        return match unsafe {
             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
         } {
             0 => {
@@ -725,14 +713,12 @@ impl rtio::RtioUdpSocket for UdpWatcher {
             }
             n => Err(uv_error_to_io_error(UvError(n)))
         };
-        return a;
 
         extern fn alloc_cb(handle: *uvll::uv_udp_t,
                            _suggested_size: size_t,
                            buf: *mut Buf) {
             unsafe {
-                let cx: &mut Ctx =
-                    cast::transmute(uvll::get_data_for_uv_handle(handle));
+                let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
             }
         }
@@ -740,8 +726,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
                           addr: *libc::sockaddr, _flags: c_uint) {
             assert!(nread != uvll::ECANCELED as ssize_t);
-            let cx: &mut Ctx = unsafe {
-                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            let cx = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
             };
 
             // When there's no data to read the recv callback can be a no-op.
@@ -752,13 +738,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
                 return
             }
 
-            unsafe {
-                assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
-            }
-
-            let cx: &mut Ctx = unsafe {
-                cast::transmute(uvll::get_data_for_uv_handle(handle))
-            };
+            unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
             let addr = if addr == ptr::null() {
                 None
             } else {
@@ -900,6 +880,40 @@ impl Drop for UdpWatcher {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Shutdown helper
+////////////////////////////////////////////////////////////////////////////////
+
+pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
+    struct Ctx {
+        slot: Option<BlockedTask>,
+        status: c_int,
+    }
+    let mut req = Request::new(uvll::UV_SHUTDOWN);
+
+    return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
+        0 => {
+            req.defuse(); // uv callback now owns this request
+            let mut cx = Ctx { slot: None, status: 0 };
+
+            wait_until_woken_after(&mut cx.slot, loop_, || {
+                req.set_data(&cx);
+            });
+
+            status_to_io_result(cx.status)
+        }
+        n => Err(uv_error_to_io_error(UvError(n)))
+    };
+
+    extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
+        let req = Request::wrap(req);
+        assert!(status != uvll::ECANCELED);
+        let cx: &mut Ctx = unsafe { req.get_data() };
+        cx.status = status;
+        wakeup(&mut cx.slot);
+    }
+}
+
 #[cfg(test)]
 mod test {
     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index 0edc13afcf5..7fec4051761 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -11,6 +11,7 @@
 use libc;
 use std::c_str::CString;
 use std::io::IoError;
+use std::io;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
 
 use access::Access;
@@ -111,7 +112,13 @@ impl PipeWatcher {
 impl RtioPipe for PipeWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let m = self.fire_homing_missile();
-        let _g = self.read_access.grant(m);
+        let access = self.read_access.grant(m);
+
+        // see comments in close_read about this check
+        if access.is_closed() {
+            return Err(io::standard_error(io::EndOfFile))
+        }
+
         self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
@@ -131,6 +138,35 @@ impl RtioPipe for PipeWatcher {
             write_access: self.write_access.clone(),
         } as Box<RtioPipe:Send>
     }
+
+    fn close_read(&mut self) -> Result<(), IoError> {
+        // The current uv_shutdown method only shuts the writing half of the
+        // connection, and no method is provided to shut down the reading half
+        // of the connection. With a lack of method, we emulate shutting down
+        // the reading half of the connection by manually returning early from
+        // all future calls to `read`.
+        //
+        // Note that we must be careful to ensure that *all* cloned handles see
+        // the closing of the read half, so we stored the "is closed" bit in the
+        // Access struct, not in our own personal watcher. Additionally, the
+        // homing missile is used as a locking mechanism to ensure there is no
+        // contention over this bit.
+        //
+        // To shutdown the read half, we must first flag the access as being
+        // closed, and then afterwards we cease any pending read. Note that this
+        // ordering is crucial because we could in theory be rescheduled during
+        // the uv_read_stop which means that another read invocation could leak
+        // in before we set the flag.
+        let m = self.fire_homing_missile();
+        self.read_access.close(&m);
+        self.stream.cancel_read(m);
+        Ok(())
+    }
+
+    fn close_write(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_homing_missile();
+        net::shutdown(self.stream.handle, &self.uv_loop())
+    }
 }
 
 impl HomingIO for PipeWatcher {
diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs
index 1fb61c15b83..a1b606709d8 100644
--- a/src/librustuv/stream.rs
+++ b/src/librustuv/stream.rs
@@ -14,6 +14,7 @@ use std::ptr;
 use std::rt::task::BlockedTask;
 
 use Loop;
+use homing::HomingMissile;
 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
             ForbidUnwind, wakeup};
 use uvll;
@@ -57,6 +58,7 @@ impl StreamWatcher {
     // Wrappers should ensure to always reset the field to an appropriate value
     // if they rely on the field to perform an action.
     pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
+        unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
         StreamWatcher {
             handle: stream,
             last_write_req: None,
@@ -70,7 +72,9 @@ impl StreamWatcher {
 
         let mut rcx = ReadContext {
             buf: Some(slice_to_uv_buf(buf)),
-            result: 0,
+            // if the read is canceled, we'll see eof, otherwise this will get
+            // overwritten
+            result: uvll::EOF as ssize_t,
             task: None,
         };
         // When reading a TTY stream on windows, libuv will invoke alloc_cb
@@ -78,13 +82,11 @@ impl StreamWatcher {
         // we must be ready for this to happen (by setting the data in the uv
         // handle). In theory this otherwise doesn't need to happen until after
         // the read is succesfully started.
-        unsafe {
-            uvll::set_data_for_uv_handle(self.handle, &rcx)
-        }
+        unsafe { uvll::set_data_for_uv_handle(self.handle, &rcx) }
 
         // Send off the read request, but don't block until we're sure that the
         // read request is queued.
-        match unsafe {
+        let ret = match unsafe {
             uvll::uv_read_start(self.handle, alloc_cb, read_cb)
         } {
             0 => {
@@ -96,6 +98,29 @@ impl StreamWatcher {
                 }
             }
             n => Err(UvError(n))
+        };
+        // Make sure a read cancellation sees that there's no pending read
+        unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *int) }
+        return ret;
+    }
+
+    pub fn cancel_read(&mut self, m: HomingMissile) {
+        // When we invoke uv_read_stop, it cancels the read and alloc
+        // callbacks. We need to manually wake up a pending task (if one was
+        // present). Note that we wake up the task *outside* the homing missile
+        // to ensure that we don't switch schedulers when we're not supposed to.
+        assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
+        let data = unsafe {
+            let data = uvll::get_data_for_uv_handle(self.handle);
+            if data.is_null() { return }
+            uvll::set_data_for_uv_handle(self.handle, 0 as *int);
+            &mut *(data as *mut ReadContext)
+        };
+        let task = data.task.take();
+        drop(m);
+        match task {
+            Some(task) => { let _ = task.wake().map(|t| t.reawaken()); }
+            None => {}
         }
     }
 
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index a2cd69da5ae..d07b2e556d6 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -32,7 +32,7 @@ use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
 ///
 /// # Example
 ///
-/// ```rust
+/// ```no_run
 /// # #![allow(unused_must_use)]
 /// use std::io::net::tcp::TcpStream;
 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
@@ -109,6 +109,48 @@ impl TcpStream {
             None => self.obj.letdie(),
         }
     }
+
+    /// Closes the reading half of this connection.
+    ///
+    /// This method will close the reading portion of this connection, causing
+    /// all pending and future reads to immediately return with an error.
+    ///
+    /// # Example
+    ///
+    /// ```no_run
+    /// # #![allow(unused_must_use)]
+    /// use std::io::timer;
+    /// use std::io::net::tcp::TcpStream;
+    /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
+    ///
+    /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
+    /// let mut stream = TcpStream::connect(addr).unwrap();
+    /// let stream2 = stream.clone();
+    ///
+    /// spawn(proc() {
+    ///     // close this stream after one second
+    ///     timer::sleep(1000);
+    ///     let mut stream = stream2;
+    ///     stream.close_read();
+    /// });
+    ///
+    /// // wait for some data, will get canceled after one second
+    /// let mut buf = [0];
+    /// stream.read(buf);
+    /// ```
+    ///
+    /// Note that this method affects all cloned handles associated with this
+    /// stream, not just this one handle.
+    pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
+
+    /// Closes the writing half of this connection.
+    ///
+    /// This method will close the writing portion of this connection, causing
+    /// all future writes to immediately return with an error.
+    ///
+    /// Note that this method affects all cloned handles associated with this
+    /// stream, not just this one handle.
+    pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
 }
 
 impl Clone for TcpStream {
@@ -839,7 +881,11 @@ mod test {
 
         // Also make sure that even though the timeout is expired that we will
         // continue to receive any pending connections.
-        let l = TcpStream::connect(addr).unwrap();
+        let (tx, rx) = channel();
+        spawn(proc() {
+            tx.send(TcpStream::connect(addr).unwrap());
+        });
+        let l = rx.recv();
         for i in range(0, 1001) {
             match a.accept() {
                 Ok(..) => break,
@@ -853,8 +899,69 @@ mod test {
         // Unset the timeout and make sure that this always blocks.
         a.set_timeout(None);
         spawn(proc() {
-            drop(TcpStream::connect(addr));
+            drop(TcpStream::connect(addr).unwrap());
         });
         a.accept().unwrap();
     })
+
+    iotest!(fn close_readwrite_smoke() {
+        let addr = next_test_ip4();
+        let a = TcpListener::bind(addr).listen().unwrap();
+        let (_tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut a = a;
+            let _s = a.accept().unwrap();
+            let _ = rx.recv_opt();
+        });
+
+        let mut b = [0];
+        let mut s = TcpStream::connect(addr).unwrap();
+        let mut s2 = s.clone();
+
+        // closing should prevent reads/writes
+        s.close_write().unwrap();
+        assert!(s.write([0]).is_err());
+        s.close_read().unwrap();
+        assert!(s.read(b).is_err());
+
+        // closing should affect previous handles
+        assert!(s2.write([0]).is_err());
+        assert!(s2.read(b).is_err());
+
+        // closing should affect new handles
+        let mut s3 = s.clone();
+        assert!(s3.write([0]).is_err());
+        assert!(s3.read(b).is_err());
+
+        // make sure these don't die
+        let _ = s2.close_read();
+        let _ = s2.close_write();
+        let _ = s3.close_read();
+        let _ = s3.close_write();
+    })
+
+    iotest!(fn close_read_wakes_up() {
+        let addr = next_test_ip4();
+        let a = TcpListener::bind(addr).listen().unwrap();
+        let (_tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut a = a;
+            let _s = a.accept().unwrap();
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = TcpStream::connect(addr).unwrap();
+        let s2 = s.clone();
+        let (tx, rx) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert!(s2.read([0]).is_err());
+            tx.send(());
+        });
+        // this should wake up the child task
+        s.close_read().unwrap();
+
+        // this test will never finish if the child doesn't wake up
+        rx.recv();
+    })
 }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index f6e985dc278..bbe39885c03 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -28,7 +28,6 @@ use prelude::*;
 
 use c_str::ToCStr;
 use clone::Clone;
-use io::pipe::PipeStream;
 use io::{Listener, Acceptor, Reader, Writer, IoResult};
 use kinds::Send;
 use owned::Box;
@@ -37,14 +36,10 @@ use rt::rtio::{RtioUnixAcceptor, RtioPipe};
 
 /// A stream which communicates over a named pipe.
 pub struct UnixStream {
-    obj: PipeStream,
+    obj: Box<RtioPipe:Send>,
 }
 
 impl UnixStream {
-    fn new(obj: Box<RtioPipe:Send>) -> UnixStream {
-        UnixStream { obj: PipeStream::new(obj) }
-    }
-
     /// Connect to a pipe named by `path`. This will attempt to open a
     /// connection to the underlying socket.
     ///
@@ -62,7 +57,7 @@ impl UnixStream {
     /// ```
     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
         LocalIo::maybe_raise(|io| {
-            io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
+            io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
         })
     }
 
@@ -86,9 +81,28 @@ impl UnixStream {
                                       timeout_ms: u64) -> IoResult<UnixStream> {
         LocalIo::maybe_raise(|io| {
             let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
-            s.map(UnixStream::new)
+            s.map(|p| UnixStream { obj: p })
         })
     }
+
+
+    /// Closes the reading half of this connection.
+    ///
+    /// This method will close the reading portion of this connection, causing
+    /// all pending and future reads to immediately return with an error.
+    ///
+    /// Note that this method affects all cloned handles associated with this
+    /// stream, not just this one handle.
+    pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
+
+    /// Closes the writing half of this connection.
+    ///
+    /// This method will close the writing portion of this connection, causing
+    /// all pending and future writes to immediately return with an error.
+    ///
+    /// Note that this method affects all cloned handles associated with this
+    /// stream, not just this one handle.
+    pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
 }
 
 impl Clone for UnixStream {
@@ -174,7 +188,7 @@ impl UnixAcceptor {
 
 impl Acceptor<UnixStream> for UnixAcceptor {
     fn accept(&mut self) -> IoResult<UnixStream> {
-        self.obj.accept().map(UnixStream::new)
+        self.obj.accept().map(|s| UnixStream { obj: s })
     }
 }
 
@@ -431,7 +445,12 @@ mod tests {
 
         // Also make sure that even though the timeout is expired that we will
         // continue to receive any pending connections.
-        let l = UnixStream::connect(&addr).unwrap();
+        let (tx, rx) = channel();
+        let addr2 = addr.clone();
+        spawn(proc() {
+            tx.send(UnixStream::connect(&addr2).unwrap());
+        });
+        let l = rx.recv();
         for i in range(0, 1001) {
             match a.accept() {
                 Ok(..) => break,
@@ -446,7 +465,7 @@ mod tests {
         a.set_timeout(None);
         let addr2 = addr.clone();
         spawn(proc() {
-            drop(UnixStream::connect(&addr2));
+            drop(UnixStream::connect(&addr2).unwrap());
         });
         a.accept().unwrap();
     })
@@ -461,4 +480,65 @@ mod tests {
         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
         assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
     })
+
+    iotest!(fn close_readwrite_smoke() {
+        let addr = next_test_unix();
+        let a = UnixListener::bind(&addr).listen().unwrap();
+        let (_tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut a = a;
+            let _s = a.accept().unwrap();
+            let _ = rx.recv_opt();
+        });
+
+        let mut b = [0];
+        let mut s = UnixStream::connect(&addr).unwrap();
+        let mut s2 = s.clone();
+
+        // closing should prevent reads/writes
+        s.close_write().unwrap();
+        assert!(s.write([0]).is_err());
+        s.close_read().unwrap();
+        assert!(s.read(b).is_err());
+
+        // closing should affect previous handles
+        assert!(s2.write([0]).is_err());
+        assert!(s2.read(b).is_err());
+
+        // closing should affect new handles
+        let mut s3 = s.clone();
+        assert!(s3.write([0]).is_err());
+        assert!(s3.read(b).is_err());
+
+        // make sure these don't die
+        let _ = s2.close_read();
+        let _ = s2.close_write();
+        let _ = s3.close_read();
+        let _ = s3.close_write();
+    })
+
+    iotest!(fn close_read_wakes_up() {
+        let addr = next_test_unix();
+        let a = UnixListener::bind(&addr).listen().unwrap();
+        let (_tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut a = a;
+            let _s = a.accept().unwrap();
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = UnixStream::connect(&addr).unwrap();
+        let s2 = s.clone();
+        let (tx, rx) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert!(s2.read([0]).is_err());
+            tx.send(());
+        });
+        // this should wake up the child task
+        s.close_read().unwrap();
+
+        // this test will never finish if the child doesn't wake up
+        rx.recv();
+    })
 }
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index fe9f4932a2a..c5afe7887ad 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -221,6 +221,7 @@ pub trait RtioTcpStream : RtioSocket {
     fn letdie(&mut self) -> IoResult<()>;
     fn clone(&self) -> Box<RtioTcpStream:Send>;
     fn close_write(&mut self) -> IoResult<()>;
+    fn close_read(&mut self) -> IoResult<()>;
 }
 
 pub trait RtioSocket {
@@ -274,6 +275,9 @@ pub trait RtioPipe {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint>;
     fn write(&mut self, buf: &[u8]) -> IoResult<()>;
     fn clone(&self) -> Box<RtioPipe:Send>;
+
+    fn close_write(&mut self) -> IoResult<()>;
+    fn close_read(&mut self) -> IoResult<()>;
 }
 
 pub trait RtioUnixListener {