about summary refs log tree commit diff
path: root/src/libnative
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-04-27 18:11:49 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-07 23:29:35 -0700
commit8e9530218124a277ae1febbc338c4de6f88711dd (patch)
tree24a7f9bf12e9559152a665810137fee4bd25c842 /src/libnative
parentb2c6d6fd3ff303c2e32a3ac0175810581c65b751 (diff)
downloadrust-8e9530218124a277ae1febbc338c4de6f88711dd.tar.gz
rust-8e9530218124a277ae1febbc338c4de6f88711dd.zip
native: Implement timeouts for windows pipes
This is the last remaining networkig object to implement timeouts for. This
takes advantage of the CancelIo function and the already existing asynchronous
I/O functionality of pipes.
Diffstat (limited to 'src/libnative')
-rw-r--r--src/libnative/io/net.rs2
-rw-r--r--src/libnative/io/pipe_win32.rs88
2 files changed, 69 insertions, 21 deletions
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 06105b46244..63d57756e5d 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -893,7 +893,5 @@ pub fn write<T>(fd: sock_t,
         Err(last_error())
     } else {
         Ok(written)
->>>>>>> native: Implement timeouts for unix networking
->>>>>>> native: Implement timeouts for unix networking
     }
 }
diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs
index 8050123cedc..af80c7174f2 100644
--- a/src/libnative/io/pipe_win32.rs
+++ b/src/libnative/io/pipe_win32.rs
@@ -169,6 +169,27 @@ unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE {
     )
 }
 
+pub fn await(handle: libc::HANDLE, deadline: u64,
+             overlapped: &mut libc::OVERLAPPED) -> bool {
+    if deadline == 0 { return true }
+
+    // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
+    // to figure out if we should indeed get the result.
+    let now = ::io::timer::now();
+    let timeout = deadline < now || unsafe {
+        let ms = (deadline - now) as libc::DWORD;
+        let r = libc::WaitForSingleObject(overlapped.hEvent,
+                                          ms);
+        r != libc::WAIT_OBJECT_0
+    };
+    if timeout {
+        unsafe { let _ = c::CancelIo(handle); }
+        false
+    } else {
+        true
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // Unix Streams
 ////////////////////////////////////////////////////////////////////////////////
@@ -177,6 +198,8 @@ pub struct UnixStream {
     inner: UnsafeArc<Inner>,
     write: Option<Event>,
     read: Option<Event>,
+    read_deadline: u64,
+    write_deadline: u64,
 }
 
 impl UnixStream {
@@ -253,6 +276,8 @@ impl UnixStream {
                                 inner: UnsafeArc::new(inner),
                                 read: None,
                                 write: None,
+                                read_deadline: 0,
+                                write_deadline: 0,
                             })
                         }
                     }
@@ -358,6 +383,10 @@ impl rtio::RtioPipe for UnixStream {
         // sleep.
         drop(guard);
         loop {
+            // Process a timeout if one is pending
+            let succeeded = await(self.handle(), self.read_deadline,
+                                  &mut overlapped);
+
             let ret = unsafe {
                 libc::GetOverlappedResult(self.handle(),
                                           &mut overlapped,
@@ -373,6 +402,9 @@ impl rtio::RtioPipe for UnixStream {
 
             // If the reading half is now closed, then we're done. If we woke up
             // because the writing half was closed, keep trying.
+            if !succeeded {
+                return Err(io::standard_error(io::TimedOut))
+            }
             if self.read_closed() {
                 return Err(io::standard_error(io::EndOfFile))
             }
@@ -408,12 +440,16 @@ impl rtio::RtioPipe for UnixStream {
                                 &mut bytes_written,
                                 &mut overlapped)
             };
+            let err = os::errno();
             drop(guard);
 
             if ret == 0 {
-                if os::errno() != libc::ERROR_IO_PENDING as uint {
-                    return Err(super::last_error())
+                if err != libc::ERROR_IO_PENDING as uint {
+                    return Err(io::IoError::from_errno(err, true));
                 }
+                // Process a timeout if one is pending
+                let succeeded = await(self.handle(), self.write_deadline,
+                                      &mut overlapped);
                 let ret = unsafe {
                     libc::GetOverlappedResult(self.handle(),
                                               &mut overlapped,
@@ -427,10 +463,22 @@ impl rtio::RtioPipe for UnixStream {
                     if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
                         return Err(super::last_error())
                     }
+                    if !succeeded {
+                        let amt = offset + bytes_written as uint;
+                        return if amt > 0 {
+                            Err(io::IoError {
+                                kind: io::ShortWrite(amt),
+                                desc: "short write during write",
+                                detail: None,
+                            })
+                        } else {
+                            Err(util::timeout("write timed out"))
+                        }
+                    }
                     if self.write_closed() {
                         return Err(io::standard_error(io::BrokenPipe))
                     }
-                    continue; // retry
+                    continue // retry
                 }
             }
             offset += bytes_written as uint;
@@ -443,6 +491,8 @@ impl rtio::RtioPipe for UnixStream {
             inner: self.inner.clone(),
             read: None,
             write: None,
+            read_deadline: 0,
+            write_deadline: 0,
         } as Box<rtio::RtioPipe:Send>
     }
 
@@ -475,6 +525,18 @@ impl rtio::RtioPipe for UnixStream {
         unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
         self.cancel_io()
     }
+
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+        self.read_deadline = deadline;
+        self.write_deadline = deadline;
+    }
+    fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    }
+    fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -577,22 +639,8 @@ impl UnixAcceptor {
             let mut err = unsafe { libc::GetLastError() };
 
             if err == libc::ERROR_IO_PENDING as libc::DWORD {
-                // If we've got a timeout, use WaitForSingleObject in tandem
-                // with CancelIo to figure out if we should indeed get the
-                // result.
-                if self.deadline != 0 {
-                    let now = ::io::timer::now();
-                    let timeout = self.deadline < now || unsafe {
-                        let ms = (self.deadline - now) as libc::DWORD;
-                        let r = libc::WaitForSingleObject(overlapped.hEvent,
-                                                          ms);
-                        r != libc::WAIT_OBJECT_0
-                    };
-                    if timeout {
-                        unsafe { let _ = c::CancelIo(handle); }
-                        return Err(util::timeout("accept timed out"))
-                    }
-                }
+                // Process a timeout if one is pending
+                let _ = await(handle, self.deadline, &mut overlapped);
 
                 // This will block until the overlapped I/O is completed. The
                 // timeout was previously handled, so this will either block in
@@ -638,6 +686,8 @@ impl UnixAcceptor {
             inner: UnsafeArc::new(Inner::new(handle)),
             read: None,
             write: None,
+            read_deadline: 0,
+            write_deadline: 0,
         })
     }
 }