diff options
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/c_win32.rs | 2 | ||||
| -rw-r--r-- | src/libnative/io/file_unix.rs | 13 | ||||
| -rw-r--r-- | src/libnative/io/file_win32.rs | 11 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 7 | ||||
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 7 | ||||
| -rw-r--r-- | src/libnative/io/pipe_win32.rs | 175 |
6 files changed, 181 insertions, 34 deletions
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index 6c84424e97a..4fdd05a8b42 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -61,4 +61,6 @@ extern "system" { optlen: *mut libc::c_int) -> libc::c_int; pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; + pub fn CancelIoEx(hFile: libc::HANDLE, + lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL; } diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 2727f9a0b09..84ea0d29434 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -12,12 +12,12 @@ use libc::{c_int, c_void}; use libc; -use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use io::{IoResult, retry, keep_going}; @@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box<rtio::RtioPipe:Send> { box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send> } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index 018907303b8..c2acd91d476 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box<rtio::RtioPipe:Send> { box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send> } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 880cbaabaf8..a54fe911ae0 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream { } as Box<rtio::RtioTcpStream:Send> } fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { - libc::shutdown(self.fd(), libc::SHUT_WR) - }) + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 65e9c7448c2..94aca1ef748 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream { inner: self.inner.clone(), } as Box<rtio::RtioPipe:Send> } + + fn close_write(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index f1239285434..8050123cedc 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -84,13 +84,17 @@ //! the test suite passing (the suite is in libstd), and that's good enough for //! me! -use std::c_str::CString; use libc; +use std::c_str::CString; +use std::intrinsics; +use std::io; use std::os::win32::as_utf16_p; +use std::os; use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; +use std::sync::atomics; +use std::unstable::mutex; use super::IoResult; use super::c; @@ -124,6 +128,20 @@ impl Drop for Event { struct Inner { handle: libc::HANDLE, + lock: mutex::NativeMutex, + read_closed: atomics::AtomicBool, + write_closed: atomics::AtomicBool, +} + +impl Inner { + fn new(handle: libc::HANDLE) -> Inner { + Inner { + handle: handle, + lock: unsafe { mutex::NativeMutex::new() }, + read_closed: atomics::AtomicBool::new(false), + write_closed: atomics::AtomicBool::new(false), + } + } } impl Drop for Inner { @@ -218,7 +236,7 @@ impl UnixStream { loop { match UnixStream::try_connect(p) { Some(handle) => { - let inner = Inner { handle: handle }; + let inner = Inner::new(handle); let mut mode = libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | libc::PIPE_WAIT; @@ -275,6 +293,24 @@ impl UnixStream { } fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } + + fn read_closed(&self) -> bool { + unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + } + + fn write_closed(&self) -> bool { + unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + } + + fn cancel_io(&self) -> IoResult<()> { + match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } { + 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => { + Ok(()) + } + 0 => Err(super::last_error()), + _ => Ok(()) + } + } } impl rtio::RtioPipe for UnixStream { @@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream { let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; overlapped.hEvent = self.read.get_ref().handle(); + // Pre-flight check to see if the reading half has been closed. This + // must be done before issuing the ReadFile request, but after we + // acquire the lock. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + + // Issue a nonblocking requests, succeeding quickly if it happened to + // succeed. let ret = unsafe { libc::ReadFile(self.handle(), buf.as_ptr() as libc::LPVOID, @@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_read, &mut overlapped) }; - if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_read, - libc::TRUE) - }; - if ret == 0 { - return Err(super::last_error()) - } - } else { + if ret != 0 { return Ok(bytes_read as uint) } + + // If our errno doesn't say that the I/O is pending, then we hit some + // legitimate error and reeturn immediately. + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + + // Now that we've issued a successful nonblocking request, we need to + // wait for it to finish. This can all be done outside the lock because + // we'll see any invocation of CancelIoEx. We also call this in a loop + // because we're woken up if the writing half is closed, we just need to + // realize that the reading half wasn't closed and we go right back to + // sleep. + drop(guard); + loop { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + // If we succeeded, or we failed for some reason other than + // CancelIoEx, return immediately + if ret != 0 { return Ok(bytes_read as uint) } + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } - Ok(bytes_read as uint) + // If the reading half is now closed, then we're done. If we woke up + // because the writing half was closed, keep trying. + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + } } fn write(&mut self, buf: &[u8]) -> IoResult<()> { @@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream { while offset < buf.len() { let mut bytes_written = 0; + + // This sequence below is quite similar to the one found in read(). + // Some careful looping is done to ensure that if close_write() is + // invoked we bail out early, and if close_read() is invoked we keep + // going after we woke up. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } let ret = unsafe { libc::WriteFile(self.handle(), buf.slice_from(offset).as_ptr() as libc::LPVOID, @@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_written, &mut overlapped) }; + drop(guard); + if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_written, - libc::TRUE) - }; - if ret == 0 { + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + // If we weren't aborted, this was a legit error, if we were + // aborted, then check to see if the write half was actually + // closed or whether we woke up from the read half closing. + if ret == 0 { + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } else { - return Err(super::last_error()) + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } + continue; // retry } } offset += bytes_written as uint; @@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream { write: None, } as Box<rtio::RtioPipe:Send> } + + fn close_read(&mut self) -> IoResult<()> { + // On windows, there's no actual shutdown() method for pipes, so we're + // forced to emulate the behavior manually at the application level. To + // do this, we need to both cancel any pending requests, as well as + // prevent all future requests from succeeding. These two operations are + // not atomic with respect to one another, so we must use a lock to do + // so. + // + // The read() code looks like: + // + // 1. Make sure the pipe is still open + // 2. Submit a read request + // 3. Wait for the read request to finish + // + // The race this lock is preventing is if another thread invokes + // close_read() between steps 1 and 2. By atomically executing steps 1 + // and 2 with a lock with respect to close_read(), we're guaranteed that + // no thread will erroneously sit in a read forever. + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } + + fn close_write(&mut self) -> IoResult<()> { + // see comments in close_read() for why this lock is necessary + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } } //////////////////////////////////////////////////////////////////////////////// @@ -520,7 +635,7 @@ impl UnixAcceptor { // Transfer ownership of our handle into this stream Ok(UnixStream { - inner: UnsafeArc::new(Inner { handle: handle }), + inner: UnsafeArc::new(Inner::new(handle)), read: None, write: None, }) |
