diff options
Diffstat (limited to 'src/libstd/io')
| -rw-r--r-- | src/libstd/io/mod.rs | 14 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 178 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 74 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 153 |
4 files changed, 404 insertions, 15 deletions
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index e2fde98a77c..ea3e0219a5b 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -434,6 +434,17 @@ pub enum IoErrorKind { InvalidInput, /// The I/O operation's timeout expired, causing it to be canceled. TimedOut, + /// This write operation failed to write all of its data. + /// + /// Normally the write() method on a Writer guarantees that all of its data + /// has been written, but some operations may be terminated after only + /// partially writing some data. An example of this is a timed out write + /// which successfully wrote a known number of bytes, but bailed out after + /// doing so. + /// + /// The payload contained as part of this variant is the number of bytes + /// which are known to have been successfully written. + ShortWrite(uint), } /// A trait for objects which are byte-oriented streams. Readers are defined by @@ -1429,7 +1440,8 @@ pub fn standard_error(kind: IoErrorKind) -> IoError { PathDoesntExist => "no such file", MismatchedFileTypeForOperation => "mismatched file type", ResourceUnavailable => "resource unavailable", - TimedOut => "operation timed out" + TimedOut => "operation timed out", + ShortWrite(..) => "short write", }; IoError { kind: kind, diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index d07b2e556d6..89141155ae4 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -151,6 +151,69 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + + /// Sets a timeout, in milliseconds, for blocking operations on this stream. + /// + /// This function will set a timeout for all blocking operations (including + /// reads and writes) on this stream. The timeout specified is a relative + /// time, in milliseconds, into the future after which point operations will + /// time out. This means that the timeout must be reset periodically to keep + /// it from expiring. Specifying a value of `None` will clear the timeout + /// for this stream. + /// + /// The timeout on this stream is local to this stream only. Setting a + /// timeout does not affect any other cloned instances of this stream, nor + /// does the timeout propagated to cloned handles of this stream. Setting + /// this timeout will override any specific read or write timeouts + /// previously set for this stream. + /// + /// For clarification on the semantics of interrupting a read and a write, + /// take a look at `set_read_timeout` and `set_write_timeout`. + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the timeout for read operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this read time. + /// This will overwrite any previous read timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending read operation, no + /// action is taken. Otherwise, the read operation will be scheduled to + /// promptly return. If a timeout error is returned, then no data was read + /// during the timeout period. + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the timeout for write operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this write time. + /// This will overwrite any previous write timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending write operation, no + /// action is taken. Otherwise, the pending write operation will be + /// scheduled to promptly return. The actual state of the underlying stream + /// is not specified. + /// + /// The write operation may return an error of type `ShortWrite` which + /// indicates that the object is known to have written an exact number of + /// bytes successfully during the timeout period, and the remaining bytes + /// were never written. + /// + /// If the write operation returns `TimedOut`, then it the timeout primitive + /// does not know how many bytes were written as part of the timeout + /// operation. It may be the case that bytes continue to be written in an + /// asynchronous fashion after the call to write returns. + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for TcpStream { @@ -892,6 +955,7 @@ mod test { Err(ref e) if e.kind == TimedOut => {} Err(e) => fail!("error: {}", e), } + ::task::deschedule(); if i == 1000 { fail!("should have a pending connection") } } drop(l); @@ -964,4 +1028,118 @@ mod test { // this test will never finish if the child doesn't wake up rx.recv(); }) + + iotest!(fn readwrite_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + s.set_timeout(None); + assert_eq!(s.read([0, 0]), Ok(1)); + }) + + iotest!(fn read_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read([0, ..128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => fail!("{}", e), + } + } + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + for _ in range(0, 100) { + assert!(s.write([0, ..128 * 1024]).is_ok()); + } + }) + + iotest!(fn write_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + assert!(s.read([0]).is_ok()); + }) + + iotest!(fn timeout_concurrent_read() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv(); + assert_eq!(s.write([0]), Ok(())); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut s2 = s2; + assert_eq!(s2.read([0]), Ok(1)); + tx2.send(()); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + tx.send(()); + + rx2.recv(); + }) } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index b7636493dec..45da872ca11 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr}; use io::{Reader, Writer, IoResult}; use kinds::Send; use owned::Box; +use option::Option; use result::{Ok, Err}; use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo}; @@ -142,6 +143,27 @@ impl UdpSocket { self.obj.ignore_broadcasts() } } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for UdpSocket { @@ -485,4 +507,56 @@ mod test { rx.recv(); serv_rx.recv(); }) + + iotest!(fn recvfrom_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut a = UdpSocket::bind(addr2).unwrap(); + assert_eq!(a.recvfrom([0]), Ok((1, addr1))); + assert_eq!(a.sendto([0], addr1), Ok(())); + rx.recv(); + assert_eq!(a.sendto([0], addr1), Ok(())); + + tx2.send(()); + }); + + // Make sure that reads time out, but writes can continue + a.set_read_timeout(Some(20)); + assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut); + assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut); + assert_eq!(a.sendto([0], addr2), Ok(())); + + // Cloned handles should be able to block + let mut a2 = a.clone(); + assert_eq!(a2.recvfrom([0]), Ok((1, addr2))); + + // Clearing the timeout should allow for receiving + a.set_timeout(None); + tx.send(()); + assert_eq!(a2.recvfrom([0]), Ok((1, addr2))); + + // Make sure the child didn't die + rx2.recv(); + }) + + iotest!(fn sendto_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + let _b = UdpSocket::bind(addr2).unwrap(); + + a.set_write_timeout(Some(1000)); + for _ in range(0, 100) { + match a.sendto([0, ..4*1024], addr2) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("other error: {}", e), + } + } + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bbe39885c03..73b05a0b3e7 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,21 +61,11 @@ impl UnixStream { }) } - /// Connect to a pipe named by `path`. This will attempt to open a - /// connection to the underlying socket. - /// - /// The returned stream will be closed when the object falls out of scope. - /// - /// # Example - /// - /// ```rust - /// # #![allow(unused_must_use)] - /// use std::io::net::unix::UnixStream; + /// Connect to a pipe named by `path`, timing out if the specified number of + /// milliseconds. /// - /// let server = Path::new("path/to/my/socket"); - /// let mut stream = UnixStream::connect(&server); - /// stream.write([1, 2, 3]); - /// ``` + /// This function is similar to `connect`, except that if `timeout_ms` + /// elapses the function will return an error of kind `TimedOut`. #[experimental = "the timeout argument is likely to change types"] pub fn connect_timeout<P: ToCStr>(path: &P, timeout_ms: u64) -> IoResult<UnixStream> { @@ -103,6 +93,27 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for UnixStream { @@ -457,6 +468,7 @@ mod tests { Err(ref e) if e.kind == TimedOut => {} Err(e) => fail!("error: {}", e), } + ::task::deschedule(); if i == 1000 { fail!("should have a pending connection") } } drop(l); @@ -541,4 +553,117 @@ mod tests { // this test will never finish if the child doesn't wake up rx.recv(); }) + + iotest!(fn readwrite_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + s.set_timeout(None); + assert_eq!(s.read([0, 0]), Ok(1)); + }) + + iotest!(fn read_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read([0, ..128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => fail!("{}", e), + } + } + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + for _ in range(0, 100) { + assert!(s.write([0, ..128 * 1024]).is_ok()); + } + }) + + iotest!(fn write_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + + tx.send(()); + assert!(s.read([0]).is_ok()); + }) + + iotest!(fn timeout_concurrent_read() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_ok()); + tx2.send(()); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + tx.send(()); + + rx2.recv(); + }) } |
