diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-07-11 14:29:15 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-08-24 17:08:14 -0700 |
| commit | 110168de2a7b529a7c4839ca1e19c4c42f68be12 (patch) | |
| tree | b590f2a6976d0c44b174c1eb3205a120a8072db0 /src/libstd/io/net/tcp.rs | |
| parent | 6d9b219e6f84325ee32c70a29bf782e7ad54ebc8 (diff) | |
| download | rust-110168de2a7b529a7c4839ca1e19c4c42f68be12.tar.gz rust-110168de2a7b529a7c4839ca1e19c4c42f68be12.zip | |
native: Implement clone/close_accept for unix
This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for
unix. A windows implementation is coming in a later commit.
The clone implementation is based on atomic reference counting (as with all
other clones), and the close_accept implementation is based on selecting on a
self-pipe which signals that a close has been seen.
Diffstat (limited to 'src/libstd/io/net/tcp.rs')
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 7055b9d7a47..ebc3940c16f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -442,6 +442,54 @@ impl TcpAcceptor { #[experimental = "the type of the argument and name of this function are \ subject to change"] pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function is similar to `TcpStream`'s `close_{read,write}` methods + /// in that it will affect *all* cloned handles of this acceptor's original + /// handle. + /// + /// Once this function succeeds, all future calls to `accept` will return + /// immediately with an error, preventing all future calls to accept. The + /// underlying socket will not be relinquished back to the OS until all + /// acceptors have been deallocated. + /// + /// This is useful for waking up a thread in an accept loop to indicate that + /// it should exit. + /// + /// # Example + /// + /// ``` + /// # #![allow(experimental)] + /// use std::io::TcpListener; + /// use std::io::{Listener, Acceptor, TimedOut}; + /// + /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap(); + /// let a2 = a.clone(); + /// + /// spawn(proc() { + /// let mut a2 = a2; + /// for socket in a2.incoming() { + /// match socket { + /// Ok(s) => { /* handle s */ } + /// Err(ref e) if e.kind == EndOfFile => break, // closed + /// Err(e) => fail!("unexpected error: {}", e), + /// } + /// } + /// }); + /// + /// # fn wait_for_sigint() {} + /// // Now that our accept loop is running, wait for the program to be + /// // requested to exit. + /// wait_for_sigint(); + /// + /// // Signal our accept loop to exit + /// assert!(a.close_accept().is_ok()); + /// ``` + #[experimental] + pub fn close_accept(&mut self) -> IoResult<()> { + self.obj.close_accept().map_err(IoError::from_rtio_error) + } } impl Acceptor<TcpStream> for TcpAcceptor { @@ -453,6 +501,25 @@ impl Acceptor<TcpStream> for TcpAcceptor { } } +impl Clone for TcpAcceptor { + /// Creates a new handle to this TCP acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying TCP acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { obj: self.obj.clone() } + } +} + #[cfg(test)] #[allow(experimental)] mod test { @@ -1411,4 +1478,69 @@ mod test { rxdone.recv(); rxdone.recv(); }) + + iotest!(fn clone_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(a.accept().is_ok()); + assert!(a2.accept().is_ok()); + }) + + iotest!(fn clone_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + spawn(proc() { let mut a = a; tx.send(a.accept()) }); + spawn(proc() { let mut a = a2; tx2.send(a.accept()) }); + + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + spawn(proc() { + let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); + }) + + iotest!(fn close_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + }) + + iotest!(fn close_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + spawn(proc() { + let mut a = a; + tx.send(a.accept()); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().err().unwrap().kind, EndOfFile); + }) } |
