about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-04-25 20:47:49 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-07 23:27:01 -0700
commite27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec (patch)
tree25f32daa865ea84163e5f6681f15e876ffb74305 /src/libstd
parente0fcb4eb3d516017c7c2fa8d17e7b8b82bdc065b (diff)
downloadrust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.tar.gz
rust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.zip
std: Add I/O timeouts to networking objects
These timeouts all follow the same pattern as established by the timeouts on
acceptors. There are three methods: set_timeout, set_read_timeout, and
set_write_timeout. Each of these sets a point in the future after which
operations will time out.

Timeouts with cloned objects are a little trickier. Each object is viewed as
having its own timeout, unaffected by other objects' timeouts. Additionally,
timeouts do not propagate when a stream is cloned or when a cloned stream has
its timeouts modified.

This commit is just the public interface which will be exposed for timeouts, the
implementation will come in later commits.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/io/mod.rs14
-rw-r--r--src/libstd/io/net/tcp.rs178
-rw-r--r--src/libstd/io/net/udp.rs74
-rw-r--r--src/libstd/io/net/unix.rs153
-rw-r--r--src/libstd/rt/rtio.rs9
-rw-r--r--src/libstd/rt/task.rs6
6 files changed, 419 insertions, 15 deletions
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index e2fde98a77c..ea3e0219a5b 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -434,6 +434,17 @@ pub enum IoErrorKind {
     InvalidInput,
     /// The I/O operation's timeout expired, causing it to be canceled.
     TimedOut,
+    /// This write operation failed to write all of its data.
+    ///
+    /// Normally the write() method on a Writer guarantees that all of its data
+    /// has been written, but some operations may be terminated after only
+    /// partially writing some data. An example of this is a timed out write
+    /// which successfully wrote a known number of bytes, but bailed out after
+    /// doing so.
+    ///
+    /// The payload contained as part of this variant is the number of bytes
+    /// which are known to have been successfully written.
+    ShortWrite(uint),
 }
 
 /// A trait for objects which are byte-oriented streams. Readers are defined by
@@ -1429,7 +1440,8 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
         PathDoesntExist => "no such file",
         MismatchedFileTypeForOperation => "mismatched file type",
         ResourceUnavailable => "resource unavailable",
-        TimedOut => "operation timed out"
+        TimedOut => "operation timed out",
+        ShortWrite(..) => "short write",
     };
     IoError {
         kind: kind,
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index d07b2e556d6..89141155ae4 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -151,6 +151,69 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
+
+    /// Sets a timeout, in milliseconds, for blocking operations on this stream.
+    ///
+    /// This function will set a timeout for all blocking operations (including
+    /// reads and writes) on this stream. The timeout specified is a relative
+    /// time, in milliseconds, into the future after which point operations will
+    /// time out. This means that the timeout must be reset periodically to keep
+    /// it from expiring. Specifying a value of `None` will clear the timeout
+    /// for this stream.
+    ///
+    /// The timeout on this stream is local to this stream only. Setting a
+    /// timeout does not affect any other cloned instances of this stream, nor
+    /// does the timeout propagated to cloned handles of this stream. Setting
+    /// this timeout will override any specific read or write timeouts
+    /// previously set for this stream.
+    ///
+    /// For clarification on the semantics of interrupting a read and a write,
+    /// take a look at `set_read_timeout` and `set_write_timeout`.
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the timeout for read operations on this stream.
+    ///
+    /// See documentation in `set_timeout` for the semantics of this read time.
+    /// This will overwrite any previous read timeout set through either this
+    /// function or `set_timeout`.
+    ///
+    /// # Errors
+    ///
+    /// When this timeout expires, if there is no pending read operation, no
+    /// action is taken. Otherwise, the read operation will be scheduled to
+    /// promptly return. If a timeout error is returned, then no data was read
+    /// during the timeout period.
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the timeout for write operations on this stream.
+    ///
+    /// See documentation in `set_timeout` for the semantics of this write time.
+    /// This will overwrite any previous write timeout set through either this
+    /// function or `set_timeout`.
+    ///
+    /// # Errors
+    ///
+    /// When this timeout expires, if there is no pending write operation, no
+    /// action is taken. Otherwise, the pending write operation will be
+    /// scheduled to promptly return. The actual state of the underlying stream
+    /// is not specified.
+    ///
+    /// The write operation may return an error of type `ShortWrite` which
+    /// indicates that the object is known to have written an exact number of
+    /// bytes successfully during the timeout period, and the remaining bytes
+    /// were never written.
+    ///
+    /// If the write operation returns `TimedOut`, then it the timeout primitive
+    /// does not know how many bytes were written as part of the timeout
+    /// operation. It may be the case that bytes continue to be written in an
+    /// asynchronous fashion after the call to write returns.
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for TcpStream {
@@ -892,6 +955,7 @@ mod test {
                 Err(ref e) if e.kind == TimedOut => {}
                 Err(e) => fail!("error: {}", e),
             }
+            ::task::deschedule();
             if i == 1000 { fail!("should have a pending connection") }
         }
         drop(l);
@@ -964,4 +1028,118 @@ mod test {
         // this test will never finish if the child doesn't wake up
         rx.recv();
     })
+
+    iotest!(fn readwrite_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        s.set_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        s.set_timeout(None);
+        assert_eq!(s.read([0, 0]), Ok(1));
+    })
+
+    iotest!(fn read_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            let mut amt = 0;
+            while amt < 100 * 128 * 1024 {
+                match s.read([0, ..128 * 1024]) {
+                    Ok(n) => { amt += n; }
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        for _ in range(0, 100) {
+            assert!(s.write([0, ..128 * 1024]).is_ok());
+        }
+    })
+
+    iotest!(fn write_timeouts() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_write_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        assert!(s.read([0]).is_ok());
+    })
+
+    iotest!(fn timeout_concurrent_read() {
+        let addr = next_test_ip6();
+        let mut a = TcpListener::bind(addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr).unwrap();
+            rx.recv();
+            assert_eq!(s.write([0]), Ok(()));
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        let s2 = s.clone();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert_eq!(s2.read([0]), Ok(1));
+            tx2.send(());
+        });
+
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        tx.send(());
+
+        rx2.recv();
+    })
 }
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index b7636493dec..45da872ca11 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr};
 use io::{Reader, Writer, IoResult};
 use kinds::Send;
 use owned::Box;
+use option::Option;
 use result::{Ok, Err};
 use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
 
@@ -142,6 +143,27 @@ impl UdpSocket {
             self.obj.ignore_broadcasts()
         }
     }
+
+    /// Sets the read/write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the read timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for UdpSocket {
@@ -485,4 +507,56 @@ mod test {
         rx.recv();
         serv_rx.recv();
     })
+
+    iotest!(fn recvfrom_timeout() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut a = UdpSocket::bind(addr1).unwrap();
+
+        let (tx, rx) = channel();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut a = UdpSocket::bind(addr2).unwrap();
+            assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
+            assert_eq!(a.sendto([0], addr1), Ok(()));
+            rx.recv();
+            assert_eq!(a.sendto([0], addr1), Ok(()));
+
+            tx2.send(());
+        });
+
+        // Make sure that reads time out, but writes can continue
+        a.set_read_timeout(Some(20));
+        assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(a.sendto([0], addr2), Ok(()));
+
+        // Cloned handles should be able to block
+        let mut a2 = a.clone();
+        assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
+
+        // Clearing the timeout should allow for receiving
+        a.set_timeout(None);
+        tx.send(());
+        assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
+
+        // Make sure the child didn't die
+        rx2.recv();
+    })
+
+    iotest!(fn sendto_timeout() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut a = UdpSocket::bind(addr1).unwrap();
+        let _b = UdpSocket::bind(addr2).unwrap();
+
+        a.set_write_timeout(Some(1000));
+        for _ in range(0, 100) {
+            match a.sendto([0, ..4*1024], addr2) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("other error: {}", e),
+            }
+        }
+    })
 }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index bbe39885c03..73b05a0b3e7 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -61,21 +61,11 @@ impl UnixStream {
         })
     }
 
-    /// Connect to a pipe named by `path`. This will attempt to open a
-    /// connection to the underlying socket.
-    ///
-    /// The returned stream will be closed when the object falls out of scope.
-    ///
-    /// # Example
-    ///
-    /// ```rust
-    /// # #![allow(unused_must_use)]
-    /// use std::io::net::unix::UnixStream;
+    /// Connect to a pipe named by `path`, timing out if the specified number of
+    /// milliseconds.
     ///
-    /// let server = Path::new("path/to/my/socket");
-    /// let mut stream = UnixStream::connect(&server);
-    /// stream.write([1, 2, 3]);
-    /// ```
+    /// This function is similar to `connect`, except that if `timeout_ms`
+    /// elapses the function will return an error of kind `TimedOut`.
     #[experimental = "the timeout argument is likely to change types"]
     pub fn connect_timeout<P: ToCStr>(path: &P,
                                       timeout_ms: u64) -> IoResult<UnixStream> {
@@ -103,6 +93,27 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
+
+    /// Sets the read/write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+
+    /// Sets the read timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_read_timeout(timeout_ms)
+    }
+
+    /// Sets the write timeout for this socket.
+    ///
+    /// For more information, see `TcpStream::set_timeout`
+    pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_write_timeout(timeout_ms)
+    }
 }
 
 impl Clone for UnixStream {
@@ -457,6 +468,7 @@ mod tests {
                 Err(ref e) if e.kind == TimedOut => {}
                 Err(e) => fail!("error: {}", e),
             }
+            ::task::deschedule();
             if i == 1000 { fail!("should have a pending connection") }
         }
         drop(l);
@@ -541,4 +553,117 @@ mod tests {
         // this test will never finish if the child doesn't wake up
         rx.recv();
     })
+
+    iotest!(fn readwrite_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        s.set_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+        assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        s.set_timeout(None);
+        assert_eq!(s.read([0, 0]), Ok(1));
+    })
+
+    iotest!(fn read_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            let mut amt = 0;
+            while amt < 100 * 128 * 1024 {
+                match s.read([0, ..128 * 1024]) {
+                    Ok(n) => { amt += n; }
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+
+        tx.send(());
+        for _ in range(0, 100) {
+            assert!(s.write([0, ..128 * 1024]).is_ok());
+        }
+    })
+
+    iotest!(fn write_timeouts() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        s.set_write_timeout(Some(20));
+        for i in range(0, 1001) {
+            match s.write([0, .. 128 * 1024]) {
+                Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
+                Err(IoError { kind: TimedOut, .. }) => break,
+                Err(e) => fail!("{}", e),
+           }
+           if i == 1000 { fail!("should have filled up?!"); }
+        }
+
+        tx.send(());
+        assert!(s.read([0]).is_ok());
+    })
+
+    iotest!(fn timeout_concurrent_read() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).listen().unwrap();
+        let (tx, rx) = channel::<()>();
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr).unwrap();
+            rx.recv();
+            assert!(s.write([0]).is_ok());
+            let _ = rx.recv_opt();
+        });
+
+        let mut s = a.accept().unwrap();
+        let s2 = s.clone();
+        let (tx2, rx2) = channel();
+        spawn(proc() {
+            let mut s2 = s2;
+            assert!(s2.read([0]).is_ok());
+            tx2.send(());
+        });
+
+        s.set_read_timeout(Some(20));
+        assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
+        tx.send(());
+
+        rx2.recv();
+    })
 }
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index c5afe7887ad..16882624ab7 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -222,6 +222,9 @@ pub trait RtioTcpStream : RtioSocket {
     fn clone(&self) -> Box<RtioTcpStream:Send>;
     fn close_write(&mut self) -> IoResult<()>;
     fn close_read(&mut self) -> IoResult<()>;
+    fn set_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
 }
 
 pub trait RtioSocket {
@@ -245,6 +248,9 @@ pub trait RtioUdpSocket : RtioSocket {
     fn ignore_broadcasts(&mut self) -> IoResult<()>;
 
     fn clone(&self) -> Box<RtioUdpSocket:Send>;
+    fn set_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
 }
 
 pub trait RtioTimer {
@@ -278,6 +284,9 @@ pub trait RtioPipe {
 
     fn close_write(&mut self) -> IoResult<()>;
     fn close_read(&mut self) -> IoResult<()>;
+    fn set_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
+    fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
 }
 
 pub trait RtioUnixListener {
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 909df5618aa..8924ed7cfd2 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -323,6 +323,12 @@ impl BlockedTask {
         }
     }
 
+    /// Reawakens this task if ownership is acquired. If finer-grained control
+    /// is desired, use `wake` instead.
+    pub fn reawaken(self) {
+        self.wake().map(|t| t.reawaken());
+    }
+
     // This assertion has two flavours because the wake involves an atomic op.
     // In the faster version, destructors will fail dramatically instead.
     #[cfg(not(test))] pub fn trash(self) { }