diff options
Diffstat (limited to 'src/libnative/io/pipe_windows.rs')
| -rw-r--r-- | src/libnative/io/pipe_windows.rs | 94 |
1 files changed, 71 insertions, 23 deletions
diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs index 4d01230cbd9..95afa11f4a9 100644 --- a/src/libnative/io/pipe_windows.rs +++ b/src/libnative/io/pipe_windows.rs @@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE { } pub fn await(handle: libc::HANDLE, deadline: u64, - overlapped: &mut libc::OVERLAPPED) -> bool { - if deadline == 0 { return true } + events: &[libc::HANDLE]) -> IoResult<uint> { + use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0}; // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo // to figure out if we should indeed get the result. - let now = ::io::timer::now(); - let timeout = deadline < now || unsafe { - let ms = (deadline - now) as libc::DWORD; - let r = libc::WaitForSingleObject(overlapped.hEvent, - ms); - r != libc::WAIT_OBJECT_0 - }; - if timeout { - unsafe { let _ = c::CancelIo(handle); } - false + let ms = if deadline == 0 { + libc::INFINITE as u64 } else { - true + let now = ::io::timer::now(); + if deadline < now {0} else {deadline - now} + }; + let ret = unsafe { + c::WaitForMultipleObjects(events.len() as libc::DWORD, + events.as_ptr(), + libc::FALSE, + ms as libc::DWORD) + }; + match ret { + WAIT_FAILED => Err(super::last_error()), + WAIT_TIMEOUT => unsafe { + let _ = c::CancelIo(handle); + Err(util::timeout("operation timed out")) + }, + n => Ok((n - WAIT_OBJECT_0) as uint) } } @@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream { drop(guard); loop { // Process a timeout if one is pending - let succeeded = await(self.handle(), self.read_deadline, - &mut overlapped); + let wait_succeeded = await(self.handle(), self.read_deadline, + [overlapped.hEvent]); let ret = unsafe { libc::GetOverlappedResult(self.handle(), @@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream { // If the reading half is now closed, then we're done. If we woke up // because the writing half was closed, keep trying. - if !succeeded { + if wait_succeeded.is_err() { return Err(util::timeout("read timed out")) } if self.read_closed() { @@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream { }) } // Process a timeout if one is pending - let succeeded = await(self.handle(), self.write_deadline, - &mut overlapped); + let wait_succeeded = await(self.handle(), self.write_deadline, + [overlapped.hEvent]); let ret = unsafe { libc::GetOverlappedResult(self.handle(), &mut overlapped, @@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream { if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - if !succeeded { + if !wait_succeeded.is_ok() { let amt = offset + bytes_written as uint; return if amt > 0 { Err(IoError { @@ -577,6 +584,10 @@ impl UnixListener { listener: self, event: try!(Event::new(true, false)), deadline: 0, + inner: Arc::new(AcceptorState { + abort: try!(Event::new(true, false)), + closed: atomic::AtomicBool::new(false), + }), }) } } @@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener { } pub struct UnixAcceptor { + inner: Arc<AcceptorState>, listener: UnixListener, event: Event, deadline: u64, } +struct AcceptorState { + abort: Event, + closed: atomic::AtomicBool, +} + impl UnixAcceptor { pub fn native_accept(&mut self) -> IoResult<UnixStream> { // This function has some funky implementation details when working with @@ -638,6 +655,10 @@ impl UnixAcceptor { // using the original server pipe. let handle = self.listener.handle; + // If we've had an artifical call to close_accept, be sure to never + // proceed in accepting new clients in the future + if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) } + let name = try!(to_utf16(&self.listener.name)); // Once we've got a "server handle", we need to wait for a client to @@ -652,7 +673,9 @@ impl UnixAcceptor { if err == libc::ERROR_IO_PENDING as libc::DWORD { // Process a timeout if one is pending - let _ = await(handle, self.deadline, &mut overlapped); + let wait_succeeded = await(handle, self.deadline, + [self.inner.abort.handle(), + overlapped.hEvent]); // This will block until the overlapped I/O is completed. The // timeout was previously handled, so this will either block in @@ -665,7 +688,11 @@ impl UnixAcceptor { libc::TRUE) }; if ret == 0 { - err = unsafe { libc::GetLastError() }; + if wait_succeeded.is_ok() { + err = unsafe { libc::GetLastError() }; + } else { + return Err(util::timeout("accept timed out")) + } } else { // we succeeded, bypass the check below err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; @@ -711,11 +738,32 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { } fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { - fail!() + let name = to_utf16(&self.listener.name).ok().unwrap(); + box UnixAcceptor { + inner: self.inner.clone(), + event: Event::new(true, false).ok().unwrap(), + deadline: 0, + listener: UnixListener { + name: self.listener.name.clone(), + handle: unsafe { + let p = pipe(name.as_ptr(), false) ; + assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE); + p + }, + }, + } as Box<rtio::RtioUnixAcceptor + Send> } fn close_accept(&mut self) -> IoResult<()> { - fail!() + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { + c::SetEvent(self.inner.abort.handle()) + }; + if ret == 0 { + Err(super::last_error()) + } else { + Ok(()) + } } } |
