From ec9ade938e9e4aa710f4351e48a8fda1037352aa Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 24 Apr 2014 18:48:21 -0700 Subject: std: Add close_{read,write}() methods to I/O Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc #11165 --- src/libstd/io/net/tcp.rs | 113 ++++++++++++++++++++++++++++++++++++++++++++-- src/libstd/io/net/unix.rs | 102 ++++++++++++++++++++++++++++++++++++----- src/libstd/rt/rtio.rs | 4 ++ 3 files changed, 205 insertions(+), 14 deletions(-) (limited to 'src/libstd') diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a2cd69da5ae..d07b2e556d6 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -32,7 +32,7 @@ use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; /// /// # Example /// -/// ```rust +/// ```no_run /// # #![allow(unused_must_use)] /// use std::io::net::tcp::TcpStream; /// use std::io::net::ip::{Ipv4Addr, SocketAddr}; @@ -109,6 +109,48 @@ impl TcpStream { None => self.obj.letdie(), } } + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// # Example + /// + /// ```no_run + /// # #![allow(unused_must_use)] + /// use std::io::timer; + /// use std::io::net::tcp::TcpStream; + /// use std::io::net::ip::{Ipv4Addr, SocketAddr}; + /// + /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; + /// let mut stream = TcpStream::connect(addr).unwrap(); + /// let stream2 = stream.clone(); + /// + /// spawn(proc() { + /// // close this stream after one second + /// timer::sleep(1000); + /// let mut stream = stream2; + /// stream.close_read(); + /// }); + /// + /// // wait for some data, will get canceled after one second + /// let mut buf = [0]; + /// stream.read(buf); + /// ``` + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } } impl Clone for TcpStream { @@ -839,7 +881,11 @@ mod test { // Also make sure that even though the timeout is expired that we will // continue to receive any pending connections. - let l = TcpStream::connect(addr).unwrap(); + let (tx, rx) = channel(); + spawn(proc() { + tx.send(TcpStream::connect(addr).unwrap()); + }); + let l = rx.recv(); for i in range(0, 1001) { match a.accept() { Ok(..) => break, @@ -853,8 +899,69 @@ mod test { // Unset the timeout and make sure that this always blocks. a.set_timeout(None); spawn(proc() { - drop(TcpStream::connect(addr)); + drop(TcpStream::connect(addr).unwrap()); }); a.accept().unwrap(); }) + + iotest!(fn close_readwrite_smoke() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut b = [0]; + let mut s = TcpStream::connect(addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write([0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(b).is_err()); + + // closing should affect previous handles + assert!(s2.write([0]).is_err()); + assert!(s2.read(b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write([0]).is_err()); + assert!(s3.read(b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + }) + + iotest!(fn close_read_wakes_up() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut s = TcpStream::connect(addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_err()); + tx.send(()); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index f6e985dc278..bbe39885c03 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -28,7 +28,6 @@ use prelude::*; use c_str::ToCStr; use clone::Clone; -use io::pipe::PipeStream; use io::{Listener, Acceptor, Reader, Writer, IoResult}; use kinds::Send; use owned::Box; @@ -37,14 +36,10 @@ use rt::rtio::{RtioUnixAcceptor, RtioPipe}; /// A stream which communicates over a named pipe. pub struct UnixStream { - obj: PipeStream, + obj: Box, } impl UnixStream { - fn new(obj: Box) -> UnixStream { - UnixStream { obj: PipeStream::new(obj) } - } - /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -62,7 +57,7 @@ impl UnixStream { /// ``` pub fn connect(path: &P) -> IoResult { LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str(), None).map(UnixStream::new) + io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p }) }) } @@ -86,9 +81,28 @@ impl UnixStream { timeout_ms: u64) -> IoResult { LocalIo::maybe_raise(|io| { let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); - s.map(UnixStream::new) + s.map(|p| UnixStream { obj: p }) }) } + + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all pending and future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } } impl Clone for UnixStream { @@ -174,7 +188,7 @@ impl UnixAcceptor { impl Acceptor for UnixAcceptor { fn accept(&mut self) -> IoResult { - self.obj.accept().map(UnixStream::new) + self.obj.accept().map(|s| UnixStream { obj: s }) } } @@ -431,7 +445,12 @@ mod tests { // Also make sure that even though the timeout is expired that we will // continue to receive any pending connections. - let l = UnixStream::connect(&addr).unwrap(); + let (tx, rx) = channel(); + let addr2 = addr.clone(); + spawn(proc() { + tx.send(UnixStream::connect(&addr2).unwrap()); + }); + let l = rx.recv(); for i in range(0, 1001) { match a.accept() { Ok(..) => break, @@ -446,7 +465,7 @@ mod tests { a.set_timeout(None); let addr2 = addr.clone(); spawn(proc() { - drop(UnixStream::connect(&addr2)); + drop(UnixStream::connect(&addr2).unwrap()); }); a.accept().unwrap(); }) @@ -461,4 +480,65 @@ mod tests { let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); assert!(UnixStream::connect_timeout(&addr, 100).is_ok()); }) + + iotest!(fn close_readwrite_smoke() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut b = [0]; + let mut s = UnixStream::connect(&addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write([0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(b).is_err()); + + // closing should affect previous handles + assert!(s2.write([0]).is_err()); + assert!(s2.read(b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write([0]).is_err()); + assert!(s3.read(b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + }) + + iotest!(fn close_read_wakes_up() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut s = UnixStream::connect(&addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_err()); + tx.send(()); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv(); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index fe9f4932a2a..c5afe7887ad 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -221,6 +221,7 @@ pub trait RtioTcpStream : RtioSocket { fn letdie(&mut self) -> IoResult<()>; fn clone(&self) -> Box; fn close_write(&mut self) -> IoResult<()>; + fn close_read(&mut self) -> IoResult<()>; } pub trait RtioSocket { @@ -274,6 +275,9 @@ pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> IoResult; fn write(&mut self, buf: &[u8]) -> IoResult<()>; fn clone(&self) -> Box; + + fn close_write(&mut self) -> IoResult<()>; + fn close_read(&mut self) -> IoResult<()>; } pub trait RtioUnixListener { -- cgit 1.4.1-3-g733a5