about summary refs log tree commit diff
path: root/src/libstd/io/net/unix.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-04-25 20:47:49 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-07 23:27:01 -0700
commite27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec (patch)
tree25f32daa865ea84163e5f6681f15e876ffb74305 /src/libstd/io/net/unix.rs
parente0fcb4eb3d516017c7c2fa8d17e7b8b82bdc065b (diff)
downloadrust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.tar.gz
rust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.zip
std: Add I/O timeouts to networking objects
These timeouts all follow the same pattern as established by the timeouts on
acceptors. There are three methods: set_timeout, set_read_timeout, and
set_write_timeout. Each of these sets a point in the future after which
operations will time out.

Timeouts with cloned objects are a little trickier. Each object is viewed as
having its own timeout, unaffected by other objects' timeouts. Additionally,
timeouts do not propagate when a stream is cloned or when a cloned stream has
its timeouts modified.

This commit is just the public interface which will be exposed for timeouts, the
implementation will come in later commits.
Diffstat (limited to 'src/libstd/io/net/unix.rs')
-rw-r--r--src/libstd/io/net/unix.rs153
1 files changed, 139 insertions, 14 deletions
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index bbe39885c03..73b05a0b3e7 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -61,21 +61,11 @@ impl UnixStream {
         })
     }
 
-    /// Connect to a pipe named by `path`. This will attempt to open a
-    /// connection to the underlying socket.
-    ///
-    /// The returned stream will be closed when the object falls out of scope.
-    ///
-    /// # Example
-    ///
-    /// ```rust
-    /// # #![allow(unused_must_use)]
-    /// use std::io::net::unix::UnixStream;
+    /// Connect to a pipe named by `path`, timing out if the specified number of
+    /// milliseconds.
     ///
-    /// let server = Path::new("path/to/my/socket");
-    /// let mut stream = UnixStream::connect(&server);
-    /// stream.write([1, 2, 3]);
-    /// ```
+    /// This function is similar to `connect`, except that if `timeout_ms`
+    /// elapses the function will return an error of kind `TimedOut`.
     #[experimental = "the timeout argument is likely to change types"]
     pub fn connect_timeout<P: ToCStr>(path: &P,
                                       timeout_ms: u64) -> IoResult<UnixStream> {
@@ -103,6 +93,27 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
+
+    /// Sets the read/write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the read timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for UnixStream {
@@ -457,6 +468,7 @@ mod tests {
                 Err(ref e) if e.kind == TimedOut => {}
                 Err(e) => fail!("error: {}", e),
             }
+            ::task::deschedule();
             if i == 1000 { fail!("should have a pending connection") }
         }
         drop(l);
@@ -541,4 +553,117 @@ mod tests {
         // this test will never finish if the child doesn't wake up
         rx.recv();
     })
+
+    iotest!(fn readwrite_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        s.set_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        s.set_timeout(None);
+        assert_eq!(s.read([0, 0]), Ok(1));
+    })
+
+    iotest!(fn read_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            let mut amt = 0;
+            while amt < 100 * 128 * 1024 {
+                match s.read([0, ..128 * 1024]) {
+                    Ok(n) => { amt += n; }
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        for _ in range(0, 100) {
+            assert!(s.write([0, ..128 * 1024]).is_ok());
+        }
+    })
+
+    iotest!(fn write_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_write_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+
+        tx.send(());
+        assert!(s.read([0]).is_ok());
+    })
+
+    iotest!(fn timeout_concurrent_read() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        let s2 = s.clone();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert!(s2.read([0]).is_ok());
+            tx2.send(());
+        });
+
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        tx.send(());
+
+        rx2.recv();
+    })
 }