about summary refs log tree commit diff
path: root/src/libstd/io/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/io/net')
-rw-r--r--src/libstd/io/net/tcp.rs178
-rw-r--r--src/libstd/io/net/udp.rs74
-rw-r--r--src/libstd/io/net/unix.rs153
3 files changed, 391 insertions, 14 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index d07b2e556d6..89141155ae4 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -151,6 +151,69 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
+
+    /// Sets a timeout, in milliseconds, for blocking operations on this stream.
+    ///
+    /// This function will set a timeout for all blocking operations (including
+    /// reads and writes) on this stream. The timeout specified is a relative
+    /// time, in milliseconds, into the future after which point operations will
+    /// time out. This means that the timeout must be reset periodically to keep
+    /// it from expiring. Specifying a value of `None` will clear the timeout
+    /// for this stream.
+    ///
+    /// The timeout on this stream is local to this stream only. Setting a
+    /// timeout does not affect any other cloned instances of this stream, nor
+    /// does the timeout propagated to cloned handles of this stream. Setting
+    /// this timeout will override any specific read or write timeouts
+    /// previously set for this stream.
+    ///
+    /// For clarification on the semantics of interrupting a read and a write,
+    /// take a look at `set_read_timeout` and `set_write_timeout`.
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the timeout for read operations on this stream.
+    ///
+    /// See documentation in `set_timeout` for the semantics of this read time.
+    /// This will overwrite any previous read timeout set through either this
+    /// function or `set_timeout`.
+    ///
+    /// # Errors
+    ///
+    /// When this timeout expires, if there is no pending read operation, no
+    /// action is taken. Otherwise, the read operation will be scheduled to
+    /// promptly return. If a timeout error is returned, then no data was read
+    /// during the timeout period.
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the timeout for write operations on this stream.
+    ///
+    /// See documentation in `set_timeout` for the semantics of this write time.
+    /// This will overwrite any previous write timeout set through either this
+    /// function or `set_timeout`.
+    ///
+    /// # Errors
+    ///
+    /// When this timeout expires, if there is no pending write operation, no
+    /// action is taken. Otherwise, the pending write operation will be
+    /// scheduled to promptly return. The actual state of the underlying stream
+    /// is not specified.
+    ///
+    /// The write operation may return an error of type `ShortWrite` which
+    /// indicates that the object is known to have written an exact number of
+    /// bytes successfully during the timeout period, and the remaining bytes
+    /// were never written.
+    ///
+    /// If the write operation returns `TimedOut`, then it the timeout primitive
+    /// does not know how many bytes were written as part of the timeout
+    /// operation. It may be the case that bytes continue to be written in an
+    /// asynchronous fashion after the call to write returns.
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for TcpStream {
@@ -892,6 +955,7 @@ mod test {
                 Err(ref e) if e.kind == TimedOut => {}
                 Err(e) => fail!("error: {}", e),
             }
+            ::task::deschedule();
             if i == 1000 { fail!("should have a pending connection") }
         }
         drop(l);
@@ -964,4 +1028,118 @@ mod test {
         // this test will never finish if the child doesn't wake up
         rx.recv();
     })
+
+    iotest!(fn readwrite_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        s.set_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        s.set_timeout(None);
+        assert_eq!(s.read([0, 0]), Ok(1));
+    })
+
+    iotest!(fn read_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            let mut amt = 0;
+            while amt < 100 * 128 * 1024 {
+                match s.read([0, ..128 * 1024]) {
+                    Ok(n) => { amt += n; }
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        for _ in range(0, 100) {
+            assert!(s.write([0, ..128 * 1024]).is_ok());
+        }
+    })
+
+    iotest!(fn write_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_write_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        assert!(s.read([0]).is_ok());
+    })
+
+    iotest!(fn timeout_concurrent_read() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert_eq!(s.write([0]), Ok(()));
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        let s2 = s.clone();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert_eq!(s2.read([0]), Ok(1));
+            tx2.send(());
+        });
+
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        tx.send(());
+
+        rx2.recv();
+    })
 }
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index b7636493dec..45da872ca11 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr};
 use io::{Reader, Writer, IoResult};
 use kinds::Send;
 use owned::Box;
+use option::Option;
 use result::{Ok, Err};
 use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
 
@@ -142,6 +143,27 @@ impl UdpSocket {
             self.obj.ignore_broadcasts()
         }
     }
+
+    /// Sets the read/write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the read timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for UdpSocket {
@@ -485,4 +507,56 @@ mod test {
         rx.recv();
         serv_rx.recv();
     })
+
+    iotest!(fn recvfrom_timeout() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut a = UdpSocket::bind(addr1).unwrap();
+
+        let (tx, rx) = channel();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut a = UdpSocket::bind(addr2).unwrap();
+            assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
+            assert_eq!(a.sendto([0], addr1), Ok(()));
+            rx.recv();
+            assert_eq!(a.sendto([0], addr1), Ok(()));
+
+            tx2.send(());
+        });
+
+        // Make sure that reads time out, but writes can continue
+        a.set_read_timeout(Some(20));
+        assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(a.sendto([0], addr2), Ok(()));
+
+        // Cloned handles should be able to block
+        let mut a2 = a.clone();
+        assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
+
+        // Clearing the timeout should allow for receiving
+        a.set_timeout(None);
+        tx.send(());
+        assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
+
+        // Make sure the child didn't die
+        rx2.recv();
+    })
+
+    iotest!(fn sendto_timeout() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut a = UdpSocket::bind(addr1).unwrap();
+        let _b = UdpSocket::bind(addr2).unwrap();
+
+        a.set_write_timeout(Some(1000));
+        for _ in range(0, 100) {
+            match a.sendto([0, ..4*1024], addr2) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("other error: {}", e),
+            }
+        }
+    })
 }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index bbe39885c03..73b05a0b3e7 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -61,21 +61,11 @@ impl UnixStream {
         })
     }
 
-    /// Connect to a pipe named by `path`. This will attempt to open a
-    /// connection to the underlying socket.
-    ///
-    /// The returned stream will be closed when the object falls out of scope.
-    ///
-    /// # Example
-    ///
-    /// ```rust
-    /// # #![allow(unused_must_use)]
-    /// use std::io::net::unix::UnixStream;
+    /// Connect to a pipe named by `path`, timing out if the specified number of
+    /// milliseconds.
     ///
-    /// let server = Path::new("path/to/my/socket");
-    /// let mut stream = UnixStream::connect(&server);
-    /// stream.write([1, 2, 3]);
-    /// ```
+    /// This function is similar to `connect`, except that if `timeout_ms`
+    /// elapses the function will return an error of kind `TimedOut`.
     #[experimental = "the timeout argument is likely to change types"]
     pub fn connect_timeout<P: ToCStr>(path: &P,
                                       timeout_ms: u64) -> IoResult<UnixStream> {
@@ -103,6 +93,27 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
+
+    /// Sets the read/write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the read timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for UnixStream {
@@ -457,6 +468,7 @@ mod tests {
                 Err(ref e) if e.kind == TimedOut => {}
                 Err(e) => fail!("error: {}", e),
             }
+            ::task::deschedule();
             if i == 1000 { fail!("should have a pending connection") }
         }
         drop(l);
@@ -541,4 +553,117 @@ mod tests {
         // this test will never finish if the child doesn't wake up
         rx.recv();
     })
+
+    iotest!(fn readwrite_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        s.set_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        s.set_timeout(None);
+        assert_eq!(s.read([0, 0]), Ok(1));
+    })
+
+    iotest!(fn read_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            let mut amt = 0;
+            while amt < 100 * 128 * 1024 {
+                match s.read([0, ..128 * 1024]) {
+                    Ok(n) => { amt += n; }
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        for _ in range(0, 100) {
+            assert!(s.write([0, ..128 * 1024]).is_ok());
+        }
+    })
+
+    iotest!(fn write_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_write_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+
+        tx.send(());
+        assert!(s.read([0]).is_ok());
+    })
+
+    iotest!(fn timeout_concurrent_read() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        let s2 = s.clone();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert!(s2.read([0]).is_ok());
+            tx2.send(());
+        });
+
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        tx.send(());
+
+        rx2.recv();
+    })
 }