diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-04-24 18:48:21 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-05-07 17:18:07 -0700 |
| commit | ec9ade938e9e4aa710f4351e48a8fda1037352aa (patch) | |
| tree | 0446a1bf50f2243bba01f7b5a6a964966b31e82c /src/libnative | |
| parent | ef6daf9935da103f1b915a5c9904794da79b0b60 (diff) | |
| download | rust-ec9ade938e9e4aa710f4351e48a8fda1037352aa.tar.gz rust-ec9ade938e9e4aa710f4351e48a8fda1037352aa.zip | |
std: Add close_{read,write}() methods to I/O
Two new methods were added to TcpStream and UnixStream:
fn close_read(&mut self) -> IoResult<()>;
fn close_write(&mut self) -> IoResult<()>;
These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.
Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.
As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.
cc #11165
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/c_win32.rs | 2 | ||||
| -rw-r--r-- | src/libnative/io/file_unix.rs | 13 | ||||
| -rw-r--r-- | src/libnative/io/file_win32.rs | 11 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 7 | ||||
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 7 | ||||
| -rw-r--r-- | src/libnative/io/pipe_win32.rs | 175 |
6 files changed, 181 insertions, 34 deletions
diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index 6c84424e97a..4fdd05a8b42 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -61,4 +61,6 @@ extern "system" { optlen: *mut libc::c_int) -> libc::c_int; pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; + pub fn CancelIoEx(hFile: libc::HANDLE, + lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL; } diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 2727f9a0b09..84ea0d29434 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -12,12 +12,12 @@ use libc::{c_int, c_void}; use libc; -use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use io::{IoResult, retry, keep_going}; @@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box<rtio::RtioPipe:Send> { box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send> } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index 018907303b8..c2acd91d476 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box<rtio::RtioPipe:Send> { box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send> } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 880cbaabaf8..a54fe911ae0 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream { } as Box<rtio::RtioTcpStream:Send> } fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { - libc::shutdown(self.fd(), libc::SHUT_WR) - }) + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 65e9c7448c2..94aca1ef748 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream { inner: self.inner.clone(), } as Box<rtio::RtioPipe:Send> } + + fn close_write(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index f1239285434..8050123cedc 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -84,13 +84,17 @@ //! the test suite passing (the suite is in libstd), and that's good enough for //! me! -use std::c_str::CString; use libc; +use std::c_str::CString; +use std::intrinsics; +use std::io; use std::os::win32::as_utf16_p; +use std::os; use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; +use std::sync::atomics; +use std::unstable::mutex; use super::IoResult; use super::c; @@ -124,6 +128,20 @@ impl Drop for Event { struct Inner { handle: libc::HANDLE, + lock: mutex::NativeMutex, + read_closed: atomics::AtomicBool, + write_closed: atomics::AtomicBool, +} + +impl Inner { + fn new(handle: libc::HANDLE) -> Inner { + Inner { + handle: handle, + lock: unsafe { mutex::NativeMutex::new() }, + read_closed: atomics::AtomicBool::new(false), + write_closed: atomics::AtomicBool::new(false), + } + } } impl Drop for Inner { @@ -218,7 +236,7 @@ impl UnixStream { loop { match UnixStream::try_connect(p) { Some(handle) => { - let inner = Inner { handle: handle }; + let inner = Inner::new(handle); let mut mode = libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | libc::PIPE_WAIT; @@ -275,6 +293,24 @@ impl UnixStream { } fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } + + fn read_closed(&self) -> bool { + unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + } + + fn write_closed(&self) -> bool { + unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + } + + fn cancel_io(&self) -> IoResult<()> { + match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } { + 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => { + Ok(()) + } + 0 => Err(super::last_error()), + _ => Ok(()) + } + } } impl rtio::RtioPipe for UnixStream { @@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream { let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; overlapped.hEvent = self.read.get_ref().handle(); + // Pre-flight check to see if the reading half has been closed. This + // must be done before issuing the ReadFile request, but after we + // acquire the lock. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + + // Issue a nonblocking requests, succeeding quickly if it happened to + // succeed. let ret = unsafe { libc::ReadFile(self.handle(), buf.as_ptr() as libc::LPVOID, @@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_read, &mut overlapped) }; - if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_read, - libc::TRUE) - }; - if ret == 0 { - return Err(super::last_error()) - } - } else { + if ret != 0 { return Ok(bytes_read as uint) } + + // If our errno doesn't say that the I/O is pending, then we hit some + // legitimate error and reeturn immediately. + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + + // Now that we've issued a successful nonblocking request, we need to + // wait for it to finish. This can all be done outside the lock because + // we'll see any invocation of CancelIoEx. We also call this in a loop + // because we're woken up if the writing half is closed, we just need to + // realize that the reading half wasn't closed and we go right back to + // sleep. + drop(guard); + loop { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + // If we succeeded, or we failed for some reason other than + // CancelIoEx, return immediately + if ret != 0 { return Ok(bytes_read as uint) } + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } - Ok(bytes_read as uint) + // If the reading half is now closed, then we're done. If we woke up + // because the writing half was closed, keep trying. + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + } } fn write(&mut self, buf: &[u8]) -> IoResult<()> { @@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream { while offset < buf.len() { let mut bytes_written = 0; + + // This sequence below is quite similar to the one found in read(). + // Some careful looping is done to ensure that if close_write() is + // invoked we bail out early, and if close_read() is invoked we keep + // going after we woke up. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } let ret = unsafe { libc::WriteFile(self.handle(), buf.slice_from(offset).as_ptr() as libc::LPVOID, @@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_written, &mut overlapped) }; + drop(guard); + if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_written, - libc::TRUE) - }; - if ret == 0 { + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + // If we weren't aborted, this was a legit error, if we were + // aborted, then check to see if the write half was actually + // closed or whether we woke up from the read half closing. + if ret == 0 { + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } else { - return Err(super::last_error()) + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } + continue; // retry } } offset += bytes_written as uint; @@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream { write: None, } as Box<rtio::RtioPipe:Send> } + + fn close_read(&mut self) -> IoResult<()> { + // On windows, there's no actual shutdown() method for pipes, so we're + // forced to emulate the behavior manually at the application level. To + // do this, we need to both cancel any pending requests, as well as + // prevent all future requests from succeeding. These two operations are + // not atomic with respect to one another, so we must use a lock to do + // so. + // + // The read() code looks like: + // + // 1. Make sure the pipe is still open + // 2. Submit a read request + // 3. Wait for the read request to finish + // + // The race this lock is preventing is if another thread invokes + // close_read() between steps 1 and 2. By atomically executing steps 1 + // and 2 with a lock with respect to close_read(), we're guaranteed that + // no thread will erroneously sit in a read forever. + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } + + fn close_write(&mut self) -> IoResult<()> { + // see comments in close_read() for why this lock is necessary + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } } //////////////////////////////////////////////////////////////////////////////// @@ -520,7 +635,7 @@ impl UnixAcceptor { // Transfer ownership of our handle into this stream Ok(UnixStream { - inner: UnsafeArc::new(Inner { handle: handle }), + inner: UnsafeArc::new(Inner::new(handle)), read: None, write: None, }) |
