// Copyright 2015 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use prelude::v1::*; use io::prelude::*; use fmt; use io; use net::{ToSocketAddrs, SocketAddr, Shutdown}; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner, IntoInner}; use time::Duration; /// A structure which represents a TCP stream between a local socket and a /// remote socket. /// /// The socket will be closed when the value is dropped. /// /// # Examples /// /// ```no_run /// use std::io::prelude::*; /// use std::net::TcpStream; /// /// { /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); /// /// // ignore the Result /// let _ = stream.write(&[1]); /// let _ = stream.read(&mut [0; 128]); // ignore here too /// } // the stream is closed here /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct TcpStream(net_imp::TcpStream); /// A structure representing a socket server. /// /// # Examples /// /// ```no_run /// use std::net::{TcpListener, TcpStream}; /// use std::thread; /// /// let listener = TcpListener::bind("127.0.0.1:80").unwrap(); /// /// fn handle_client(stream: TcpStream) { /// // ... /// } /// /// // accept connections and process them, spawning a new thread for each one /// for stream in listener.incoming() { /// match stream { /// Ok(stream) => { /// thread::spawn(move|| { /// // connection succeeded /// handle_client(stream) /// }); /// } /// Err(e) => { /* connection failed */ } /// } /// } /// /// // close the socket server /// drop(listener); /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct TcpListener(net_imp::TcpListener); /// An infinite iterator over the connections from a `TcpListener`. /// /// This iterator will infinitely yield `Some` of the accepted connections. It /// is equivalent to calling `accept` in a loop. /// /// This `struct` is created by the [`incoming`] method on [`TcpListener`]. /// /// [`incoming`]: struct.TcpListener.html#method.incoming /// [`TcpListener`]: struct.TcpListener.html #[stable(feature = "rust1", since = "1.0.0")] pub struct Incoming<'a> { listener: &'a TcpListener } impl TcpStream { /// Opens a TCP connection to a remote host. /// /// `addr` is an address of the remote host. Anything which implements /// `ToSocketAddrs` trait can be supplied for the address; see this trait /// documentation for concrete examples. /// In case `ToSocketAddrs::to_socket_addrs()` returns more than one entry, /// then the first valid and reachable address is used. #[stable(feature = "rust1", since = "1.0.0")] pub fn connect(addr: A) -> io::Result { super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream) } /// Returns the socket address of the remote peer of this TCP connection. #[stable(feature = "rust1", since = "1.0.0")] pub fn peer_addr(&self) -> io::Result { self.0.peer_addr() } /// Returns the socket address of the local half of this TCP connection. #[stable(feature = "rust1", since = "1.0.0")] pub fn local_addr(&self) -> io::Result { self.0.socket_addr() } /// Shuts down the read, write, or both halves of this connection. /// /// This function will cause all pending and future I/O on the specified /// portions to return immediately with an appropriate value (see the /// documentation of `Shutdown`). #[stable(feature = "rust1", since = "1.0.0")] pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.0.shutdown(how) } /// Creates a new independently owned handle to the underlying socket. /// /// The returned `TcpStream` is a reference to the same stream that this /// object references. Both handles will read and write the same stream of /// data, and options set on one stream will be propagated to the other /// stream. #[stable(feature = "rust1", since = "1.0.0")] pub fn try_clone(&self) -> io::Result { self.0.duplicate().map(TcpStream) } /// Sets the read timeout to the timeout specified. /// /// If the value specified is `None`, then `read` calls will block /// indefinitely. It is an error to pass the zero `Duration` to this /// method. /// /// # Note /// /// Platforms may return a different error code whenever a read times out as /// a result of setting this option. For example Unix typically returns an /// error of the kind `WouldBlock`, but Windows may return `TimedOut`. #[stable(feature = "socket_timeout", since = "1.4.0")] pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { self.0.set_read_timeout(dur) } /// Sets the write timeout to the timeout specified. /// /// If the value specified is `None`, then `write` calls will block /// indefinitely. It is an error to pass the zero `Duration` to this /// method. /// /// # Note /// /// Platforms may return a different error code whenever a write times out /// as a result of setting this option. For example Unix typically returns /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`. #[stable(feature = "socket_timeout", since = "1.4.0")] pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { self.0.set_write_timeout(dur) } /// Returns the read timeout of this socket. /// /// If the timeout is `None`, then `read` calls will block indefinitely. /// /// # Note /// /// Some platforms do not provide access to the current timeout. #[stable(feature = "socket_timeout", since = "1.4.0")] pub fn read_timeout(&self) -> io::Result> { self.0.read_timeout() } /// Returns the write timeout of this socket. /// /// If the timeout is `None`, then `write` calls will block indefinitely. /// /// # Note /// /// Some platforms do not provide access to the current timeout. #[stable(feature = "socket_timeout", since = "1.4.0")] pub fn write_timeout(&self) -> io::Result> { self.0.write_timeout() } /// Sets the value of the `TCP_NODELAY` option on this socket. /// /// If set, this option disables the Nagle algorithm. This means that /// segments are always sent as soon as possible, even if there is only a /// small amount of data. When not set, data is buffered until there is a /// sufficient amount to send out, thereby avoiding the frequent sending of /// small packets. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { self.0.set_nodelay(nodelay) } /// Gets the value of the `TCP_NODELAY` option on this socket. /// /// For more information about this option, see [`set_nodelay`][link]. /// /// [link]: #method.set_nodelay #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn nodelay(&self) -> io::Result { self.0.nodelay() } /// Sets the value for the `IP_TTL` option on this socket. /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.0.set_ttl(ttl) } /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`][link]. /// /// [link]: #method.set_ttl #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn ttl(&self) -> io::Result { self.0.ttl() } /// Get the value of the `SO_ERROR` option on this socket. /// /// This will retrieve the stored error in the underlying socket, clearing /// the field in the process. This can be useful for checking errors between /// calls. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn take_error(&self) -> io::Result> { self.0.take_error() } /// Moves this TCP stream into or out of nonblocking mode. /// /// On Unix this corresponds to calling fcntl, and on Windows this /// corresponds to calling ioctlsocket. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { self.0.set_nonblocking(nonblocking) } } #[stable(feature = "rust1", since = "1.0.0")] impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] impl Write for TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl AsInner for TcpStream { fn as_inner(&self) -> &net_imp::TcpStream { &self.0 } } impl FromInner for TcpStream { fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) } } impl IntoInner for TcpStream { fn into_inner(self) -> net_imp::TcpStream { self.0 } } #[stable(feature = "rust1", since = "1.0.0")] impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(f) } } impl TcpListener { /// Creates a new `TcpListener` which will be bound to the specified /// address. /// /// The returned listener is ready for accepting connections. /// /// Binding with a port number of 0 will request that the OS assigns a port /// to this listener. The port allocated can be queried via the /// `local_addr` method. /// /// The address type can be any implementor of `ToSocketAddrs` trait. See /// its documentation for concrete examples. #[stable(feature = "rust1", since = "1.0.0")] pub fn bind(addr: A) -> io::Result { super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener) } /// Returns the local socket address of this listener. #[stable(feature = "rust1", since = "1.0.0")] pub fn local_addr(&self) -> io::Result { self.0.socket_addr() } /// Creates a new independently owned handle to the underlying socket. /// /// The returned `TcpListener` is a reference to the same socket that this /// object references. Both handles can be used to accept incoming /// connections and options set on one listener will affect the other. #[stable(feature = "rust1", since = "1.0.0")] pub fn try_clone(&self) -> io::Result { self.0.duplicate().map(TcpListener) } /// Accept a new incoming connection from this listener. /// /// This function will block the calling thread until a new TCP connection /// is established. When established, the corresponding `TcpStream` and the /// remote peer's address will be returned. #[stable(feature = "rust1", since = "1.0.0")] pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { self.0.accept().map(|(a, b)| (TcpStream(a), b)) } /// Returns an iterator over the connections being received on this /// listener. /// /// The returned iterator will never return `None` and will also not yield /// the peer's `SocketAddr` structure. #[stable(feature = "rust1", since = "1.0.0")] pub fn incoming(&self) -> Incoming { Incoming { listener: self } } /// Sets the value for the `IP_TTL` option on this socket. /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.0.set_ttl(ttl) } /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`][link]. /// /// [link]: #method.set_ttl #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn ttl(&self) -> io::Result { self.0.ttl() } /// Sets the value for the `IPV6_V6ONLY` option on this socket. /// /// If this is set to `true` then the socket is restricted to sending and /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications /// can bind the same port at the same time. /// /// If this is set to `false` then the socket can be used to send and /// receive packets from an IPv4-mapped IPv6 address. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { self.0.set_only_v6(only_v6) } /// Gets the value of the `IPV6_V6ONLY` option for this socket. /// /// For more information about this option, see [`set_only_v6`][link]. /// /// [link]: #method.set_only_v6 #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn only_v6(&self) -> io::Result { self.0.only_v6() } /// Get the value of the `SO_ERROR` option on this socket. /// /// This will retrieve the stored error in the underlying socket, clearing /// the field in the process. This can be useful for checking errors between /// calls. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn take_error(&self) -> io::Result> { self.0.take_error() } /// Moves this TCP stream into or out of nonblocking mode. /// /// On Unix this corresponds to calling fcntl, and on Windows this /// corresponds to calling ioctlsocket. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { self.0.set_nonblocking(nonblocking) } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Iterator for Incoming<'a> { type Item = io::Result; fn next(&mut self) -> Option> { Some(self.listener.accept().map(|p| p.0)) } } impl AsInner for TcpListener { fn as_inner(&self) -> &net_imp::TcpListener { &self.0 } } impl FromInner for TcpListener { fn from_inner(inner: net_imp::TcpListener) -> TcpListener { TcpListener(inner) } } impl IntoInner for TcpListener { fn into_inner(self) -> net_imp::TcpListener { self.0 } } #[stable(feature = "rust1", since = "1.0.0")] impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(f) } } #[cfg(test)] mod tests { use prelude::v1::*; use io::ErrorKind; use io::prelude::*; use net::*; use net::test::{next_test_ip4, next_test_ip6}; use sync::mpsc::channel; use sys_common::AsInner; use time::{Instant, Duration}; use thread; fn each_ip(f: &mut FnMut(SocketAddr)) { f(next_test_ip4()); f(next_test_ip6()); } macro_rules! t { ($e:expr) => { match $e { Ok(t) => t, Err(e) => panic!("received error for `{}`: {}", stringify!($e), e), } } } #[test] fn bind_error() { match TcpListener::bind("1.1.1.1:9999") { Ok(..) => panic!(), Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable), } } #[test] fn connect_error() { match TcpStream::connect("0.0.0.0:1") { Ok(..) => panic!(), Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused || e.kind() == ErrorKind::InvalidInput || e.kind() == ErrorKind::AddrInUse || e.kind() == ErrorKind::AddrNotAvailable, "bad error: {} {:?}", e, e.kind()), } } #[test] fn listen_localhost() { let socket_addr = next_test_ip4(); let listener = t!(TcpListener::bind(&socket_addr)); let _t = thread::spawn(move || { let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port()))); t!(stream.write(&[144])); }); let mut stream = t!(listener.accept()).0; let mut buf = [0]; t!(stream.read(&mut buf)); assert!(buf[0] == 144); } #[test] fn connect_loopback() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let host = match addr { SocketAddr::V4(..) => "127.0.0.1", SocketAddr::V6(..) => "::1", }; let mut stream = t!(TcpStream::connect(&(host, addr.port()))); t!(stream.write(&[66])); }); let mut stream = t!(acceptor.accept()).0; let mut buf = [0]; t!(stream.read(&mut buf)); assert!(buf[0] == 66); }) } #[test] fn smoke_test() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let (tx, rx) = channel(); let _t = thread::spawn(move|| { let mut stream = t!(TcpStream::connect(&addr)); t!(stream.write(&[99])); tx.send(t!(stream.local_addr())).unwrap(); }); let (mut stream, addr) = t!(acceptor.accept()); let mut buf = [0]; t!(stream.read(&mut buf)); assert!(buf[0] == 99); assert_eq!(addr, t!(rx.recv())); }) } #[test] fn read_eof() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let _stream = t!(TcpStream::connect(&addr)); // Close }); let mut stream = t!(acceptor.accept()).0; let mut buf = [0]; let nread = t!(stream.read(&mut buf)); assert_eq!(nread, 0); let nread = t!(stream.read(&mut buf)); assert_eq!(nread, 0); }) } #[test] fn write_close() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let (tx, rx) = channel(); let _t = thread::spawn(move|| { drop(t!(TcpStream::connect(&addr))); tx.send(()).unwrap(); }); let mut stream = t!(acceptor.accept()).0; rx.recv().unwrap(); let buf = [0]; match stream.write(&buf) { Ok(..) => {} Err(e) => { assert!(e.kind() == ErrorKind::ConnectionReset || e.kind() == ErrorKind::BrokenPipe || e.kind() == ErrorKind::ConnectionAborted, "unknown error: {}", e); } } }) } #[test] fn multiple_connect_serial() { each_ip(&mut |addr| { let max = 10; let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { for _ in 0..max { let mut stream = t!(TcpStream::connect(&addr)); t!(stream.write(&[99])); } }); for stream in acceptor.incoming().take(max) { let mut stream = t!(stream); let mut buf = [0]; t!(stream.read(&mut buf)); assert_eq!(buf[0], 99); } }) } #[test] fn multiple_connect_interleaved_greedy_schedule() { const MAX: usize = 10; each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let acceptor = acceptor; for (i, stream) in acceptor.incoming().enumerate().take(MAX) { // Start another thread to handle the connection let _t = thread::spawn(move|| { let mut stream = t!(stream); let mut buf = [0]; t!(stream.read(&mut buf)); assert!(buf[0] == i as u8); }); } }); connect(0, addr); }); fn connect(i: usize, addr: SocketAddr) { if i == MAX { return } let t = thread::spawn(move|| { let mut stream = t!(TcpStream::connect(&addr)); // Connect again before writing connect(i + 1, addr); t!(stream.write(&[i as u8])); }); t.join().ok().unwrap(); } } #[test] fn multiple_connect_interleaved_lazy_schedule() { const MAX: usize = 10; each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { for stream in acceptor.incoming().take(MAX) { // Start another thread to handle the connection let _t = thread::spawn(move|| { let mut stream = t!(stream); let mut buf = [0]; t!(stream.read(&mut buf)); assert!(buf[0] == 99); }); } }); connect(0, addr); }); fn connect(i: usize, addr: SocketAddr) { if i == MAX { return } let t = thread::spawn(move|| { let mut stream = t!(TcpStream::connect(&addr)); connect(i + 1, addr); t!(stream.write(&[99])); }); t.join().ok().unwrap(); } } #[test] fn socket_and_peer_name() { each_ip(&mut |addr| { let listener = t!(TcpListener::bind(&addr)); let so_name = t!(listener.local_addr()); assert_eq!(addr, so_name); let _t = thread::spawn(move|| { t!(listener.accept()); }); let stream = t!(TcpStream::connect(&addr)); assert_eq!(addr, t!(stream.peer_addr())); }) } #[test] fn partial_read() { each_ip(&mut |addr| { let (tx, rx) = channel(); let srv = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let mut cl = t!(srv.accept()).0; cl.write(&[10]).unwrap(); let mut b = [0]; t!(cl.read(&mut b)); tx.send(()).unwrap(); }); let mut c = t!(TcpStream::connect(&addr)); let mut b = [0; 10]; assert_eq!(c.read(&mut b).unwrap(), 1); t!(c.write(&[1])); rx.recv().unwrap(); }) } #[test] fn double_bind() { each_ip(&mut |addr| { let _listener = t!(TcpListener::bind(&addr)); match TcpListener::bind(&addr) { Ok(..) => panic!(), Err(e) => { assert!(e.kind() == ErrorKind::ConnectionRefused || e.kind() == ErrorKind::Other || e.kind() == ErrorKind::AddrInUse, "unknown error: {} {:?}", e, e.kind()); } } }) } #[test] fn fast_rebind() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { t!(TcpStream::connect(&addr)); }); t!(acceptor.accept()); drop(acceptor); t!(TcpListener::bind(&addr)); }); } #[test] fn tcp_clone_smoke() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let mut s = t!(TcpStream::connect(&addr)); let mut buf = [0, 0]; assert_eq!(s.read(&mut buf).unwrap(), 1); assert_eq!(buf[0], 1); t!(s.write(&[2])); }); let mut s1 = t!(acceptor.accept()).0; let s2 = t!(s1.try_clone()); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let _t = thread::spawn(move|| { let mut s2 = s2; rx1.recv().unwrap(); t!(s2.write(&[1])); tx2.send(()).unwrap(); }); tx1.send(()).unwrap(); let mut buf = [0, 0]; assert_eq!(s1.read(&mut buf).unwrap(), 1); rx2.recv().unwrap(); }) } #[test] fn tcp_clone_two_read() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let (tx1, rx) = channel(); let tx2 = tx1.clone(); let _t = thread::spawn(move|| { let mut s = t!(TcpStream::connect(&addr)); t!(s.write(&[1])); rx.recv().unwrap(); t!(s.write(&[2])); rx.recv().unwrap(); }); let mut s1 = t!(acceptor.accept()).0; let s2 = t!(s1.try_clone()); let (done, rx) = channel(); let _t = thread::spawn(move|| { let mut s2 = s2; let mut buf = [0, 0]; t!(s2.read(&mut buf)); tx2.send(()).unwrap(); done.send(()).unwrap(); }); let mut buf = [0, 0]; t!(s1.read(&mut buf)); tx1.send(()).unwrap(); rx.recv().unwrap(); }) } #[test] fn tcp_clone_two_write() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let mut s = t!(TcpStream::connect(&addr)); let mut buf = [0, 1]; t!(s.read(&mut buf)); t!(s.read(&mut buf)); }); let mut s1 = t!(acceptor.accept()).0; let s2 = t!(s1.try_clone()); let (done, rx) = channel(); let _t = thread::spawn(move|| { let mut s2 = s2; t!(s2.write(&[1])); done.send(()).unwrap(); }); t!(s1.write(&[2])); rx.recv().unwrap(); }) } #[test] fn shutdown_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); let _t = thread::spawn(move|| { let mut c = t!(a.accept()).0; let mut b = [0]; assert_eq!(c.read(&mut b).unwrap(), 0); t!(c.write(&[1])); }); let mut s = t!(TcpStream::connect(&addr)); t!(s.shutdown(Shutdown::Write)); assert!(s.write(&[1]).is_err()); let mut b = [0, 0]; assert_eq!(t!(s.read(&mut b)), 1); assert_eq!(b[0], 1); }) } #[test] fn close_readwrite_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); let (tx, rx) = channel::<()>(); let _t = thread::spawn(move|| { let _s = t!(a.accept()); let _ = rx.recv(); }); let mut b = [0]; let mut s = t!(TcpStream::connect(&addr)); let mut s2 = t!(s.try_clone()); // closing should prevent reads/writes t!(s.shutdown(Shutdown::Write)); assert!(s.write(&[0]).is_err()); t!(s.shutdown(Shutdown::Read)); assert_eq!(s.read(&mut b).unwrap(), 0); // closing should affect previous handles assert!(s2.write(&[0]).is_err()); assert_eq!(s2.read(&mut b).unwrap(), 0); // closing should affect new handles let mut s3 = t!(s.try_clone()); assert!(s3.write(&[0]).is_err()); assert_eq!(s3.read(&mut b).unwrap(), 0); // make sure these don't die let _ = s2.shutdown(Shutdown::Read); let _ = s2.shutdown(Shutdown::Write); let _ = s3.shutdown(Shutdown::Read); let _ = s3.shutdown(Shutdown::Write); drop(tx); }) } #[test] fn close_read_wakes_up() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); let (tx1, rx) = channel::<()>(); let _t = thread::spawn(move|| { let _s = t!(a.accept()); let _ = rx.recv(); }); let s = t!(TcpStream::connect(&addr)); let s2 = t!(s.try_clone()); let (tx, rx) = channel(); let _t = thread::spawn(move|| { let mut s2 = s2; assert_eq!(t!(s2.read(&mut [0])), 0); tx.send(()).unwrap(); }); // this should wake up the child thread t!(s.shutdown(Shutdown::Read)); // this test will never finish if the child doesn't wake up rx.recv().unwrap(); drop(tx1); }) } #[test] fn clone_while_reading() { each_ip(&mut |addr| { let accept = t!(TcpListener::bind(&addr)); // Enqueue a thread to write to a socket let (tx, rx) = channel(); let (txdone, rxdone) = channel(); let txdone2 = txdone.clone(); let _t = thread::spawn(move|| { let mut tcp = t!(TcpStream::connect(&addr)); rx.recv().unwrap(); t!(tcp.write(&[0])); txdone2.send(()).unwrap(); }); // Spawn off a reading clone let tcp = t!(accept.accept()).0; let tcp2 = t!(tcp.try_clone()); let txdone3 = txdone.clone(); let _t = thread::spawn(move|| { let mut tcp2 = tcp2; t!(tcp2.read(&mut [0])); txdone3.send(()).unwrap(); }); // Try to ensure that the reading clone is indeed reading for _ in 0..50 { thread::yield_now(); } // clone the handle again while it's reading, then let it finish the // read. let _ = t!(tcp.try_clone()); tx.send(()).unwrap(); rxdone.recv().unwrap(); rxdone.recv().unwrap(); }) } #[test] fn clone_accept_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); let a2 = t!(a.try_clone()); let _t = thread::spawn(move|| { let _ = TcpStream::connect(&addr); }); let _t = thread::spawn(move|| { let _ = TcpStream::connect(&addr); }); t!(a.accept()); t!(a2.accept()); }) } #[test] fn clone_accept_concurrent() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); let a2 = t!(a.try_clone()); let (tx, rx) = channel(); let tx2 = tx.clone(); let _t = thread::spawn(move|| { tx.send(t!(a.accept())).unwrap(); }); let _t = thread::spawn(move|| { tx2.send(t!(a2.accept())).unwrap(); }); let _t = thread::spawn(move|| { let _ = TcpStream::connect(&addr); }); let _t = thread::spawn(move|| { let _ = TcpStream::connect(&addr); }); rx.recv().unwrap(); rx.recv().unwrap(); }) } #[test] fn debug() { let name = if cfg!(windows) {"socket"} else {"fd"}; let socket_addr = next_test_ip4(); let listener = t!(TcpListener::bind(&socket_addr)); let listener_inner = listener.0.socket().as_inner(); let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}", socket_addr, name, listener_inner); assert_eq!(format!("{:?}", listener), compare); let stream = t!(TcpStream::connect(&("localhost", socket_addr.port()))); let stream_inner = stream.0.socket().as_inner(); let compare = format!("TcpStream {{ addr: {:?}, \ peer: {:?}, {}: {:?} }}", stream.local_addr().unwrap(), stream.peer_addr().unwrap(), name, stream_inner); assert_eq!(format!("{:?}", stream), compare); } // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code // no longer has rounding errors. #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)] #[test] fn timeouts() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); let stream = t!(TcpStream::connect(&("localhost", addr.port()))); let dur = Duration::new(15410, 0); assert_eq!(None, t!(stream.read_timeout())); t!(stream.set_read_timeout(Some(dur))); assert_eq!(Some(dur), t!(stream.read_timeout())); assert_eq!(None, t!(stream.write_timeout())); t!(stream.set_write_timeout(Some(dur))); assert_eq!(Some(dur), t!(stream.write_timeout())); t!(stream.set_read_timeout(None)); assert_eq!(None, t!(stream.read_timeout())); t!(stream.set_write_timeout(None)); assert_eq!(None, t!(stream.write_timeout())); drop(listener); } #[test] fn test_read_timeout() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); let mut stream = t!(TcpStream::connect(&("localhost", addr.port()))); t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); let mut buf = [0; 10]; let start = Instant::now(); let kind = stream.read(&mut buf).err().expect("expected error").kind(); assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); assert!(start.elapsed() > Duration::from_millis(400)); drop(listener); } #[test] fn test_read_with_timeout() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); let mut stream = t!(TcpStream::connect(&("localhost", addr.port()))); t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); let mut other_end = t!(listener.accept()).0; t!(other_end.write_all(b"hello world")); let mut buf = [0; 11]; t!(stream.read(&mut buf)); assert_eq!(b"hello world", &buf[..]); let start = Instant::now(); let kind = stream.read(&mut buf).err().expect("expected error").kind(); assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); assert!(start.elapsed() > Duration::from_millis(400)); drop(listener); } #[test] fn nodelay() { let addr = next_test_ip4(); let _listener = t!(TcpListener::bind(&addr)); let stream = t!(TcpStream::connect(&("localhost", addr.port()))); assert_eq!(false, t!(stream.nodelay())); t!(stream.set_nodelay(true)); assert_eq!(true, t!(stream.nodelay())); t!(stream.set_nodelay(false)); assert_eq!(false, t!(stream.nodelay())); } #[test] fn ttl() { let ttl = 100; let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); t!(listener.set_ttl(ttl)); assert_eq!(ttl, t!(listener.ttl())); let stream = t!(TcpStream::connect(&("localhost", addr.port()))); t!(stream.set_ttl(ttl)); assert_eq!(ttl, t!(stream.ttl())); } #[test] fn set_nonblocking() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); t!(listener.set_nonblocking(true)); t!(listener.set_nonblocking(false)); let mut stream = t!(TcpStream::connect(&("localhost", addr.port()))); t!(stream.set_nonblocking(false)); t!(stream.set_nonblocking(true)); let mut buf = [0]; match stream.read(&mut buf) { Ok(_) => panic!("expected error"), Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} Err(e) => panic!("unexpected error {}", e), } } }