about summary refs log tree commit diff
path: root/src/libnative
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnative')
-rw-r--r--src/libnative/io/c_win32.rs2
-rw-r--r--src/libnative/io/file_unix.rs13
-rw-r--r--src/libnative/io/file_win32.rs11
-rw-r--r--src/libnative/io/net.rs7
-rw-r--r--src/libnative/io/pipe_unix.rs7
-rw-r--r--src/libnative/io/pipe_win32.rs175
6 files changed, 181 insertions, 34 deletions
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs
index 6c84424e97a..4fdd05a8b42 100644
--- a/src/libnative/io/c_win32.rs
+++ b/src/libnative/io/c_win32.rs
@@ -61,4 +61,6 @@ extern "system" {
                       optlen: *mut libc::c_int) -> libc::c_int;
 
     pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
+    pub fn CancelIoEx(hFile: libc::HANDLE,
+                      lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
 }
diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs
index 2727f9a0b09..84ea0d29434 100644
--- a/src/libnative/io/file_unix.rs
+++ b/src/libnative/io/file_unix.rs
@@ -12,12 +12,12 @@
 
 use libc::{c_int, c_void};
 use libc;
-use std::sync::arc::UnsafeArc;
 use std::c_str::CString;
 use std::io::IoError;
 use std::io;
 use std::mem;
 use std::rt::rtio;
+use std::sync::arc::UnsafeArc;
 
 use io::{IoResult, retry, keep_going};
 
@@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc {
     fn clone(&self) -> Box<rtio::RtioPipe:Send> {
         box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
     }
+
+    // Only supported on named pipes currently. Note that this doesn't have an
+    // impact on the std::io primitives, this is never called via
+    // std::io::PipeStream. If the functionality is exposed in the future, then
+    // these methods will need to be implemented.
+    fn close_read(&mut self) -> Result<(), IoError> {
+        Err(io::standard_error(io::InvalidInput))
+    }
+    fn close_write(&mut self) -> Result<(), IoError> {
+        Err(io::standard_error(io::InvalidInput))
+    }
 }
 
 impl rtio::RtioTTY for FileDesc {
diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs
index 018907303b8..c2acd91d476 100644
--- a/src/libnative/io/file_win32.rs
+++ b/src/libnative/io/file_win32.rs
@@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc {
     fn clone(&self) -> Box<rtio::RtioPipe:Send> {
         box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
     }
+
+    // Only supported on named pipes currently. Note that this doesn't have an
+    // impact on the std::io primitives, this is never called via
+    // std::io::PipeStream. If the functionality is exposed in the future, then
+    // these methods will need to be implemented.
+    fn close_read(&mut self) -> IoResult<()> {
+        Err(io::standard_error(io::InvalidInput))
+    }
+    fn close_write(&mut self) -> IoResult<()> {
+        Err(io::standard_error(io::InvalidInput))
+    }
 }
 
 impl rtio::RtioTTY for FileDesc {
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 880cbaabaf8..a54fe911ae0 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream {
         } as Box<rtio::RtioTcpStream:Send>
     }
     fn close_write(&mut self) -> IoResult<()> {
-        super::mkerr_libc(unsafe {
-            libc::shutdown(self.fd(), libc::SHUT_WR)
-        })
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
+    }
+    fn close_read(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
     }
 }
 
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs
index 65e9c7448c2..94aca1ef748 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libnative/io/pipe_unix.rs
@@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream {
             inner: self.inner.clone(),
         } as Box<rtio::RtioPipe:Send>
     }
+
+    fn close_write(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
+    }
+    fn close_read(&mut self) -> IoResult<()> {
+        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs
index f1239285434..8050123cedc 100644
--- a/src/libnative/io/pipe_win32.rs
+++ b/src/libnative/io/pipe_win32.rs
@@ -84,13 +84,17 @@
 //! the test suite passing (the suite is in libstd), and that's good enough for
 //! me!
 
-use std::c_str::CString;
 use libc;
+use std::c_str::CString;
+use std::intrinsics;
+use std::io;
 use std::os::win32::as_utf16_p;
+use std::os;
 use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
-use std::intrinsics;
+use std::sync::atomics;
+use std::unstable::mutex;
 
 use super::IoResult;
 use super::c;
@@ -124,6 +128,20 @@ impl Drop for Event {
 
 struct Inner {
     handle: libc::HANDLE,
+    lock: mutex::NativeMutex,
+    read_closed: atomics::AtomicBool,
+    write_closed: atomics::AtomicBool,
+}
+
+impl Inner {
+    fn new(handle: libc::HANDLE) -> Inner {
+        Inner {
+            handle: handle,
+            lock: unsafe { mutex::NativeMutex::new() },
+            read_closed: atomics::AtomicBool::new(false),
+            write_closed: atomics::AtomicBool::new(false),
+        }
+    }
 }
 
 impl Drop for Inner {
@@ -218,7 +236,7 @@ impl UnixStream {
             loop {
                 match UnixStream::try_connect(p) {
                     Some(handle) => {
-                        let inner = Inner { handle: handle };
+                        let inner = Inner::new(handle);
                         let mut mode = libc::PIPE_TYPE_BYTE |
                                        libc::PIPE_READMODE_BYTE |
                                        libc::PIPE_WAIT;
@@ -275,6 +293,24 @@ impl UnixStream {
     }
 
     fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
+
+    fn read_closed(&self) -> bool {
+        unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
+    }
+
+    fn write_closed(&self) -> bool {
+        unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
+    }
+
+    fn cancel_io(&self) -> IoResult<()> {
+        match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
+            0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
+                Ok(())
+            }
+            0 => Err(super::last_error()),
+            _ => Ok(())
+        }
+    }
 }
 
 impl rtio::RtioPipe for UnixStream {
@@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream {
         let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
         overlapped.hEvent = self.read.get_ref().handle();
 
+        // Pre-flight check to see if the reading half has been closed. This
+        // must be done before issuing the ReadFile request, but after we
+        // acquire the lock.
+        //
+        // See comments in close_read() about why this lock is necessary.
+        let guard = unsafe { (*self.inner.get()).lock.lock() };
+        if self.read_closed() {
+            return Err(io::standard_error(io::EndOfFile))
+        }
+
+        // Issue a nonblocking requests, succeeding quickly if it happened to
+        // succeed.
         let ret = unsafe {
             libc::ReadFile(self.handle(),
                            buf.as_ptr() as libc::LPVOID,
@@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream {
                            &mut bytes_read,
                            &mut overlapped)
         };
-        if ret == 0 {
-            let err = unsafe { libc::GetLastError() };
-            if err == libc::ERROR_IO_PENDING as libc::DWORD {
-                let ret = unsafe {
-                    libc::GetOverlappedResult(self.handle(),
-                                              &mut overlapped,
-                                              &mut bytes_read,
-                                              libc::TRUE)
-                };
-                if ret == 0 {
-                    return Err(super::last_error())
-                }
-            } else {
+        if ret != 0 { return Ok(bytes_read as uint) }
+
+        // If our errno doesn't say that the I/O is pending, then we hit some
+        // legitimate error and reeturn immediately.
+        if os::errno() != libc::ERROR_IO_PENDING as uint {
+            return Err(super::last_error())
+        }
+
+        // Now that we've issued a successful nonblocking request, we need to
+        // wait for it to finish. This can all be done outside the lock because
+        // we'll see any invocation of CancelIoEx. We also call this in a loop
+        // because we're woken up if the writing half is closed, we just need to
+        // realize that the reading half wasn't closed and we go right back to
+        // sleep.
+        drop(guard);
+        loop {
+            let ret = unsafe {
+                libc::GetOverlappedResult(self.handle(),
+                                          &mut overlapped,
+                                          &mut bytes_read,
+                                          libc::TRUE)
+            };
+            // If we succeeded, or we failed for some reason other than
+            // CancelIoEx, return immediately
+            if ret != 0 { return Ok(bytes_read as uint) }
+            if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                 return Err(super::last_error())
             }
-        }
 
-        Ok(bytes_read as uint)
+            // If the reading half is now closed, then we're done. If we woke up
+            // because the writing half was closed, keep trying.
+            if self.read_closed() {
+                return Err(io::standard_error(io::EndOfFile))
+            }
+        }
     }
 
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
@@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream {
 
         while offset < buf.len() {
             let mut bytes_written = 0;
+
+            // This sequence below is quite similar to the one found in read().
+            // Some careful looping is done to ensure that if close_write() is
+            // invoked we bail out early, and if close_read() is invoked we keep
+            // going after we woke up.
+            //
+            // See comments in close_read() about why this lock is necessary.
+            let guard = unsafe { (*self.inner.get()).lock.lock() };
+            if self.write_closed() {
+                return Err(io::standard_error(io::BrokenPipe))
+            }
             let ret = unsafe {
                 libc::WriteFile(self.handle(),
                                 buf.slice_from(offset).as_ptr() as libc::LPVOID,
@@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream {
                                 &mut bytes_written,
                                 &mut overlapped)
             };
+            drop(guard);
+
             if ret == 0 {
-                let err = unsafe { libc::GetLastError() };
-                if err == libc::ERROR_IO_PENDING as libc::DWORD {
-                    let ret = unsafe {
-                        libc::GetOverlappedResult(self.handle(),
-                                                  &mut overlapped,
-                                                  &mut bytes_written,
-                                                  libc::TRUE)
-                    };
-                    if ret == 0 {
+                if os::errno() != libc::ERROR_IO_PENDING as uint {
+                    return Err(super::last_error())
+                }
+                let ret = unsafe {
+                    libc::GetOverlappedResult(self.handle(),
+                                              &mut overlapped,
+                                              &mut bytes_written,
+                                              libc::TRUE)
+                };
+                // If we weren't aborted, this was a legit error, if we were
+                // aborted, then check to see if the write half was actually
+                // closed or whether we woke up from the read half closing.
+                if ret == 0 {
+                    if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                         return Err(super::last_error())
                     }
-                } else {
-                    return Err(super::last_error())
+                    if self.write_closed() {
+                        return Err(io::standard_error(io::BrokenPipe))
+                    }
+                    continue; // retry
                 }
             }
             offset += bytes_written as uint;
@@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream {
             write: None,
         } as Box<rtio::RtioPipe:Send>
     }
+
+    fn close_read(&mut self) -> IoResult<()> {
+        // On windows, there's no actual shutdown() method for pipes, so we're
+        // forced to emulate the behavior manually at the application level. To
+        // do this, we need to both cancel any pending requests, as well as
+        // prevent all future requests from succeeding. These two operations are
+        // not atomic with respect to one another, so we must use a lock to do
+        // so.
+        //
+        // The read() code looks like:
+        //
+        //      1. Make sure the pipe is still open
+        //      2. Submit a read request
+        //      3. Wait for the read request to finish
+        //
+        // The race this lock is preventing is if another thread invokes
+        // close_read() between steps 1 and 2. By atomically executing steps 1
+        // and 2 with a lock with respect to close_read(), we're guaranteed that
+        // no thread will erroneously sit in a read forever.
+        let _guard = unsafe { (*self.inner.get()).lock.lock() };
+        unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
+        self.cancel_io()
+    }
+
+    fn close_write(&mut self) -> IoResult<()> {
+        // see comments in close_read() for why this lock is necessary
+        let _guard = unsafe { (*self.inner.get()).lock.lock() };
+        unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
+        self.cancel_io()
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -520,7 +635,7 @@ impl UnixAcceptor {
 
         // Transfer ownership of our handle into this stream
         Ok(UnixStream {
-            inner: UnsafeArc::new(Inner { handle: handle }),
+            inner: UnsafeArc::new(Inner::new(handle)),
             read: None,
             write: None,
         })