about summary refs log tree commit diff
path: root/src/libnative/io/pipe_windows.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnative/io/pipe_windows.rs')
-rw-r--r--src/libnative/io/pipe_windows.rs94
1 files changed, 71 insertions, 23 deletions
diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs
index 4d01230cbd9..95afa11f4a9 100644
--- a/src/libnative/io/pipe_windows.rs
+++ b/src/libnative/io/pipe_windows.rs
@@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
 }
 
 pub fn await(handle: libc::HANDLE, deadline: u64,
-             overlapped: &mut libc::OVERLAPPED) -> bool {
-    if deadline == 0 { return true }
+             events: &[libc::HANDLE]) -> IoResult<uint> {
+    use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
 
     // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
     // to figure out if we should indeed get the result.
-    let now = ::io::timer::now();
-    let timeout = deadline < now || unsafe {
-        let ms = (deadline - now) as libc::DWORD;
-        let r = libc::WaitForSingleObject(overlapped.hEvent,
-                                          ms);
-        r != libc::WAIT_OBJECT_0
-    };
-    if timeout {
-        unsafe { let _ = c::CancelIo(handle); }
-        false
+    let ms = if deadline == 0 {
+        libc::INFINITE as u64
     } else {
-        true
+        let now = ::io::timer::now();
+        if deadline < now {0} else {deadline - now}
+    };
+    let ret = unsafe {
+        c::WaitForMultipleObjects(events.len() as libc::DWORD,
+                                  events.as_ptr(),
+                                  libc::FALSE,
+                                  ms as libc::DWORD)
+    };
+    match ret {
+        WAIT_FAILED => Err(super::last_error()),
+        WAIT_TIMEOUT => unsafe {
+            let _ = c::CancelIo(handle);
+            Err(util::timeout("operation timed out"))
+        },
+        n => Ok((n - WAIT_OBJECT_0) as uint)
     }
 }
 
@@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream {
         drop(guard);
         loop {
             // Process a timeout if one is pending
-            let succeeded = await(self.handle(), self.read_deadline,
-                                  &mut overlapped);
+            let wait_succeeded = await(self.handle(), self.read_deadline,
+                                       [overlapped.hEvent]);
 
             let ret = unsafe {
                 libc::GetOverlappedResult(self.handle(),
@@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream {
 
             // If the reading half is now closed, then we're done. If we woke up
             // because the writing half was closed, keep trying.
-            if !succeeded {
+            if wait_succeeded.is_err() {
                 return Err(util::timeout("read timed out"))
             }
             if self.read_closed() {
@@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream {
                     })
                 }
                 // Process a timeout if one is pending
-                let succeeded = await(self.handle(), self.write_deadline,
-                                      &mut overlapped);
+                let wait_succeeded = await(self.handle(), self.write_deadline,
+                                           [overlapped.hEvent]);
                 let ret = unsafe {
                     libc::GetOverlappedResult(self.handle(),
                                               &mut overlapped,
@@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream {
                     if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                         return Err(super::last_error())
                     }
-                    if !succeeded {
+                    if !wait_succeeded.is_ok() {
                         let amt = offset + bytes_written as uint;
                         return if amt > 0 {
                             Err(IoError {
@@ -577,6 +584,10 @@ impl UnixListener {
             listener: self,
             event: try!(Event::new(true, false)),
             deadline: 0,
+            inner: Arc::new(AcceptorState {
+                abort: try!(Event::new(true, false)),
+                closed: atomic::AtomicBool::new(false),
+            }),
         })
     }
 }
@@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener {
 }
 
 pub struct UnixAcceptor {
+    inner: Arc<AcceptorState>,
     listener: UnixListener,
     event: Event,
     deadline: u64,
 }
 
+struct AcceptorState {
+    abort: Event,
+    closed: atomic::AtomicBool,
+}
+
 impl UnixAcceptor {
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
         // This function has some funky implementation details when working with
@@ -638,6 +655,10 @@ impl UnixAcceptor {
         // using the original server pipe.
         let handle = self.listener.handle;
 
+        // If we've had an artifical call to close_accept, be sure to never
+        // proceed in accepting new clients in the future
+        if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
+
         let name = try!(to_utf16(&self.listener.name));
 
         // Once we've got a "server handle", we need to wait for a client to
@@ -652,7 +673,9 @@ impl UnixAcceptor {
 
             if err == libc::ERROR_IO_PENDING as libc::DWORD {
                 // Process a timeout if one is pending
-                let _ = await(handle, self.deadline, &mut overlapped);
+                let wait_succeeded = await(handle, self.deadline,
+                                           [self.inner.abort.handle(),
+                                            overlapped.hEvent]);
 
                 // This will block until the overlapped I/O is completed. The
                 // timeout was previously handled, so this will either block in
@@ -665,7 +688,11 @@ impl UnixAcceptor {
                                               libc::TRUE)
                 };
                 if ret == 0 {
-                    err = unsafe { libc::GetLastError() };
+                    if wait_succeeded.is_ok() {
+                        err = unsafe { libc::GetLastError() };
+                    } else {
+                        return Err(util::timeout("accept timed out"))
+                    }
                 } else {
                     // we succeeded, bypass the check below
                     err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
@@ -711,11 +738,32 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     }
 
     fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
-        fail!()
+        let name = to_utf16(&self.listener.name).ok().unwrap();
+        box UnixAcceptor {
+            inner: self.inner.clone(),
+            event: Event::new(true, false).ok().unwrap(),
+            deadline: 0,
+            listener: UnixListener {
+                name: self.listener.name.clone(),
+                handle: unsafe {
+                    let p = pipe(name.as_ptr(), false) ;
+                    assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
+                    p
+                },
+            },
+        } as Box<rtio::RtioUnixAcceptor + Send>
     }
 
     fn close_accept(&mut self) -> IoResult<()> {
-        fail!()
+        self.inner.closed.store(true, atomic::SeqCst);
+        let ret = unsafe {
+            c::SetEvent(self.inner.abort.handle())
+        };
+        if ret == 0 {
+            Err(super::last_error())
+        } else {
+            Ok(())
+        }
     }
 }