diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-04-25 20:47:49 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-05-07 23:27:01 -0700 |
| commit | e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec (patch) | |
| tree | 25f32daa865ea84163e5f6681f15e876ffb74305 /src/libstd/io/net/unix.rs | |
| parent | e0fcb4eb3d516017c7c2fa8d17e7b8b82bdc065b (diff) | |
| download | rust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.tar.gz rust-e27f27c8588f5cfa0cd9dfbbdf7609ea2d6818ec.zip | |
std: Add I/O timeouts to networking objects
These timeouts all follow the same pattern as established by the timeouts on acceptors. There are three methods: set_timeout, set_read_timeout, and set_write_timeout. Each of these sets a point in the future after which operations will time out. Timeouts with cloned objects are a little trickier. Each object is viewed as having its own timeout, unaffected by other objects' timeouts. Additionally, timeouts do not propagate when a stream is cloned or when a cloned stream has its timeouts modified. This commit is just the public interface which will be exposed for timeouts, the implementation will come in later commits.
Diffstat (limited to 'src/libstd/io/net/unix.rs')
| -rw-r--r-- | src/libstd/io/net/unix.rs | 153 |
1 files changed, 139 insertions, 14 deletions
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bbe39885c03..73b05a0b3e7 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,21 +61,11 @@ impl UnixStream { }) } - /// Connect to a pipe named by `path`. This will attempt to open a - /// connection to the underlying socket. - /// - /// The returned stream will be closed when the object falls out of scope. - /// - /// # Example - /// - /// ```rust - /// # #![allow(unused_must_use)] - /// use std::io::net::unix::UnixStream; + /// Connect to a pipe named by `path`, timing out if the specified number of + /// milliseconds. /// - /// let server = Path::new("path/to/my/socket"); - /// let mut stream = UnixStream::connect(&server); - /// stream.write([1, 2, 3]); - /// ``` + /// This function is similar to `connect`, except that if `timeout_ms` + /// elapses the function will return an error of kind `TimedOut`. #[experimental = "the timeout argument is likely to change types"] pub fn connect_timeout<P: ToCStr>(path: &P, timeout_ms: u64) -> IoResult<UnixStream> { @@ -103,6 +93,27 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.obj.set_write_timeout(timeout_ms) + } } impl Clone for UnixStream { @@ -457,6 +468,7 @@ mod tests { Err(ref e) if e.kind == TimedOut => {} Err(e) => fail!("error: {}", e), } + ::task::deschedule(); if i == 1000 { fail!("should have a pending connection") } } drop(l); @@ -541,4 +553,117 @@ mod tests { // this test will never finish if the child doesn't wake up rx.recv(); }) + + iotest!(fn readwrite_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + assert_eq!(s.write([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + s.set_timeout(None); + assert_eq!(s.read([0, 0]), Ok(1)); + }) + + iotest!(fn read_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read([0, ..128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => fail!("{}", e), + } + } + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + + tx.send(()); + for _ in range(0, 100) { + assert!(s.write([0, ..128 * 1024]).is_ok()); + } + }) + + iotest!(fn write_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0, 1001) { + match s.write([0, .. 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => fail!("{}", e), + } + if i == 1000 { fail!("should have filled up?!"); } + } + + tx.send(()); + assert!(s.read([0]).is_ok()); + }) + + iotest!(fn timeout_concurrent_read() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + spawn(proc() { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv(); + assert!(s.write([0]).is_ok()); + let _ = rx.recv_opt(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_ok()); + tx2.send(()); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read([0]).err().unwrap().kind, TimedOut); + tx.send(()); + + rx2.recv(); + }) } |
