about summary refs log tree commit diff
path: root/src/libstd/io/net/tcp.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-07-11 14:29:15 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-08-24 17:08:14 -0700
commit110168de2a7b529a7c4839ca1e19c4c42f68be12 (patch)
treeb590f2a6976d0c44b174c1eb3205a120a8072db0 /src/libstd/io/net/tcp.rs
parent6d9b219e6f84325ee32c70a29bf782e7ad54ebc8 (diff)
downloadrust-110168de2a7b529a7c4839ca1e19c4c42f68be12.tar.gz
rust-110168de2a7b529a7c4839ca1e19c4c42f68be12.zip
native: Implement clone/close_accept for unix
This commits implements {Tcp,Unix}Acceptor::{clone,close_accept} methods for
unix. A windows implementation is coming in a later commit.

The clone implementation is based on atomic reference counting (as with all
other clones), and the close_accept implementation is based on selecting on a
self-pipe which signals that a close has been seen.
Diffstat (limited to 'src/libstd/io/net/tcp.rs')
-rw-r--r--src/libstd/io/net/tcp.rs132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 7055b9d7a47..ebc3940c16f 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -442,6 +442,54 @@ impl TcpAcceptor {
     #[experimental = "the type of the argument and name of this function are \
                       subject to change"]
     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
+
+    /// Closes the accepting capabilities of this acceptor.
+    ///
+    /// This function is similar to `TcpStream`'s `close_{read,write}` methods
+    /// in that it will affect *all* cloned handles of this acceptor's original
+    /// handle.
+    ///
+    /// Once this function succeeds, all future calls to `accept` will return
+    /// immediately with an error, preventing all future calls to accept. The
+    /// underlying socket will not be relinquished back to the OS until all
+    /// acceptors have been deallocated.
+    ///
+    /// This is useful for waking up a thread in an accept loop to indicate that
+    /// it should exit.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # #![allow(experimental)]
+    /// use std::io::TcpListener;
+    /// use std::io::{Listener, Acceptor, TimedOut};
+    ///
+    /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
+    /// let a2 = a.clone();
+    ///
+    /// spawn(proc() {
+    ///     let mut a2 = a2;
+    ///     for socket in a2.incoming() {
+    ///         match socket {
+    ///             Ok(s) => { /* handle s */ }
+    ///             Err(ref e) if e.kind == EndOfFile => break, // closed
+    ///             Err(e) => fail!("unexpected error: {}", e),
+    ///         }
+    ///     }
+    /// });
+    ///
+    /// # fn wait_for_sigint() {}
+    /// // Now that our accept loop is running, wait for the program to be
+    /// // requested to exit.
+    /// wait_for_sigint();
+    ///
+    /// // Signal our accept loop to exit
+    /// assert!(a.close_accept().is_ok());
+    /// ```
+    #[experimental]
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.obj.close_accept().map_err(IoError::from_rtio_error)
+    }
 }
 
 impl Acceptor<TcpStream> for TcpAcceptor {
@@ -453,6 +501,25 @@ impl Acceptor<TcpStream> for TcpAcceptor {
     }
 }
 
+impl Clone for TcpAcceptor {
+    /// Creates a new handle to this TCP acceptor, allowing for simultaneous
+    /// accepts.
+    ///
+    /// The underlying TCP acceptor will not be closed until all handles to the
+    /// acceptor have been deallocated. Incoming connections will be received on
+    /// at most once acceptor, the same connection will not be accepted twice.
+    ///
+    /// The `close_accept` method will shut down *all* acceptors cloned from the
+    /// same original acceptor, whereas the `set_timeout` method only affects
+    /// the selector that it is called on.
+    ///
+    /// This function is useful for creating a handle to invoke `close_accept`
+    /// on to wake up any other task blocked in `accept`.
+    fn clone(&self) -> TcpAcceptor {
+        TcpAcceptor { obj: self.obj.clone() }
+    }
+}
+
 #[cfg(test)]
 #[allow(experimental)]
 mod test {
@@ -1411,4 +1478,69 @@ mod test {
         rxdone.recv();
         rxdone.recv();
     })
+
+    iotest!(fn clone_accept_smoke() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let mut a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+
+        assert!(a.accept().is_ok());
+        assert!(a2.accept().is_ok());
+    })
+
+    iotest!(fn clone_accept_concurrent() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let a = l.listen().unwrap();
+        let a2 = a.clone();
+
+        let (tx, rx) = channel();
+        let tx2 = tx.clone();
+
+        spawn(proc() { let mut a = a; tx.send(a.accept()) });
+        spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
+
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+
+        assert!(rx.recv().is_ok());
+        assert!(rx.recv().is_ok());
+    })
+
+    iotest!(fn close_accept_smoke() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let mut a = l.listen().unwrap();
+
+        a.close_accept().unwrap();
+        assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
+    })
+
+    iotest!(fn close_accept_concurrent() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        let (tx, rx) = channel();
+        spawn(proc() {
+            let mut a = a;
+            tx.send(a.accept());
+        });
+        a2.close_accept().unwrap();
+
+        assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
+    })
 }