diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-01-22 19:32:16 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-02-05 11:43:49 -0800 |
| commit | 56080c476712e478ffe4ef8d6d727c0e3d21cfd0 (patch) | |
| tree | 24bdce82bbf3122bf4bd8c0a66e307b667c6184f /src/libstd | |
| parent | ef53b7a97c58f65ac6967dfc6d30a4354afa34a3 (diff) | |
| download | rust-56080c476712e478ffe4ef8d6d727c0e3d21cfd0.tar.gz rust-56080c476712e478ffe4ef8d6d727c0e3d21cfd0.zip | |
Implement clone() for TCP/UDP/Unix sockets
This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.
The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.
The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.
The cloning support is pretty specific per platform/lib combination:
* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
of a handle, this implies that we're allowing simultaneous writes
and reads to happen. It turns out that libuv doesn't support two
simultaneous reads or writes of the same object. It does support
*one* read and *one* write at the same time, however. Some extra
infrastructure was added to just block concurrent writers/readers
until the previous read/write operation was completed.
I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 181 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 117 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 96 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 6 | ||||
| -rw-r--r-- | src/libstd/libc.rs | 49 | ||||
| -rw-r--r-- | src/libstd/option.rs | 1 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 4 | ||||
| -rw-r--r-- | src/libstd/unstable/mutex.rs | 1 | ||||
| -rw-r--r-- | src/libstd/util.rs | 1 | ||||
| -rw-r--r-- | src/libstd/vec.rs | 2 |
10 files changed, 453 insertions, 5 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a0bdc193d98..66ceb03082f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -8,11 +8,42 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! TCP network connections +//! +//! This module contains the ability to open a TCP stream to a socket address, +//! as well as creating a socket server to accept incoming connections. The +//! destination and binding addresses can either be an IPv4 or IPv6 address. +//! +//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP +//! listener (socket server) implements the `Listener` and `Acceptor` traits. + +#[deny(missing_doc)]; + +use clone::Clone; use io::net::ip::SocketAddr; -use io::{Reader, Writer, Listener, Acceptor, IoResult}; +use io::{Reader, Writer, Listener, Acceptor}; +use io::IoResult; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; +/// A structure which represents a TCP stream between a local socket and a +/// remote socket. +/// +/// # Example +/// +/// ```rust +/// # #[allow(unused_must_use)]; +/// use std::io::net::tcp::TcpStream; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; +/// let mut stream = TcpStream::connect(addr); +/// +/// stream.write([1]); +/// let mut buf = [0]; +/// stream.read(buf); +/// drop(stream); // close the connection +/// ``` pub struct TcpStream { priv obj: ~RtioTcpStream } @@ -22,21 +53,40 @@ impl TcpStream { TcpStream { obj: s } } + /// Creates a TCP connection to a remote socket address. + /// + /// If no error is encountered, then `Ok(stream)` is returned. pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> { LocalIo::maybe_raise(|io| { io.tcp_connect(addr).map(TcpStream::new) }) } + /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult<SocketAddr> { self.obj.peer_name() } + /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { self.obj.socket_name() } } +impl Clone for TcpStream { + /// Creates a new handle to this TCP stream, allowing for simultaneous reads + /// and writes of this connection. + /// + /// The underlying TCP stream will not be closed until all handles to the + /// stream have been deallocated. All handles will also follow the same + /// stream, but two concurrent reads will not receive the same data. + /// Instead, the first read will receive the first packet received, and the + /// second read will receive the second packet. + fn clone(&self) -> TcpStream { + TcpStream { obj: self.obj.clone() } + } +} + impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } @@ -45,17 +95,56 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } } +/// A structure representing a socket server. This listener is used to create a +/// `TcpAcceptor` which can be used to accept sockets on a local port. +/// +/// # Example +/// +/// ```rust +/// # fn main() {} +/// # fn foo() { +/// # #[allow(unused_must_use, dead_code)]; +/// use std::io::net::tcp::TcpListener; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// use std::io::{Acceptor, Listener}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 }; +/// let listener = TcpListener::bind(addr); +/// +/// // bind the listener to the specified address +/// let mut acceptor = listener.listen(); +/// +/// // accept connections and process them +/// # fn handle_client<T>(_: T) {} +/// for stream in acceptor.incoming() { +/// spawn(proc() { +/// handle_client(stream); +/// }); +/// } +/// +/// // close the socket server +/// drop(acceptor); +/// # } +/// ``` pub struct TcpListener { priv obj: ~RtioTcpListener } impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified local + /// socket address. This listener is not ready for accepting connections, + /// `listen` must be called on it before that's possible. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the + /// `socket_name` function. pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> { LocalIo::maybe_raise(|io| { io.tcp_bind(addr).map(|l| TcpListener { obj: l }) }) } + /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { self.obj.socket_name() } @@ -67,6 +156,9 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { } } +/// The accepting half of a TCP socket server. This structure is created through +/// a `TcpListener`'s `listen` method, and this object can be used to accept new +/// `TcpStream` instances. pub struct TcpAcceptor { priv obj: ~RtioTcpAcceptor } @@ -573,4 +665,91 @@ mod test { } let _listener = TcpListener::bind(addr); }) + + iotest!(fn tcp_clone_smoke() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + }) + + iotest!(fn tcp_clone_two_read() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn tcp_clone_two_write() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + }) } + diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 0ef62648afc..3c02f563847 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use clone::Clone; use result::{Ok, Err}; use io::net::ip::SocketAddr; use io::{Reader, Writer, IoResult}; @@ -41,6 +42,19 @@ impl UdpSocket { } } +impl Clone for UdpSocket { + /// Creates a new handle to this UDP socket, allowing for simultaneous reads + /// and writes of the socket. + /// + /// The underlying UDP socket will not be closed until all handles to the + /// socket have been deallocated. Two concurrent reads will not receive the + /// same data. Instead, the first read will receive the first packet + /// received, and the second read will receive the second packet. + fn clone(&self) -> UdpSocket { + UdpSocket { obj: self.obj.clone() } + } +} + pub struct UdpStream { priv socket: UdpSocket, priv connectedTo: SocketAddr @@ -250,4 +264,107 @@ mod test { iotest!(fn socket_name_ip6() { socket_name(next_test_ip6()); }) + + iotest!(fn udp_clone_smoke() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 0]; + assert_eq!(sock2.recvfrom(buf), Ok((1, addr1))); + assert_eq!(buf[0], 1); + sock2.sendto([2], addr1).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + p1.recv(); + sock3.sendto([1], addr2).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(sock1.recvfrom(buf), Ok((1, addr2))); + p2.recv(); + }) + + iotest!(fn udp_clone_two_read() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut sock2 = sock2; + sock2.sendto([1], addr1).unwrap(); + p.recv(); + sock2.sendto([2], addr1).unwrap(); + p.recv(); + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + let mut buf = [0, 0]; + sock3.recvfrom(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + sock1.recvfrom(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn udp_clone_two_write() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let (p, c) = SharedChan::new(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 1]; + + for _ in p.iter() { + match sock2.recvfrom(buf) { + Ok(..) => {} + Err(e) => fail!("failed receive: {}", e), + } + } + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + let c2 = c.clone(); + spawn(proc() { + let mut sock3 = sock3; + match sock3.sendto([1], addr2) { + Ok(..) => c2.send(()), + Err(..) => {} + } + done.send(()); + }); + match sock1.sendto([2], addr2) { + Ok(..) => c.send(()), + Err(..) => {} + } + + p.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index ce95b987663..3c7db9c8686 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -25,6 +25,7 @@ instances as clients. use prelude::*; use c_str::ToCStr; +use clone::Clone; use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; use rt::rtio::{RtioUnixAcceptor, RtioPipe}; use io::pipe::PipeStream; @@ -62,6 +63,12 @@ impl UnixStream { } } +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { obj: self.obj.clone() } + } +} + impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } @@ -228,4 +235,93 @@ mod tests { let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); } + + #[test] + fn unix_clone_smoke() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + } + + #[test] + fn unix_clone_two_read() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + } + + #[test] + fn unix_clone_two_write() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index ca85707149b..83250bdae73 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -51,6 +51,12 @@ impl PipeStream { } } +impl Clone for PipeStream { + fn clone(&self) -> PipeStream { + PipeStream { obj: self.obj.clone() } + } +} + impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 11a7b5dd191..057d618f444 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -960,6 +960,8 @@ pub mod types { } pub mod extra { use ptr; + use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN, + WSAPROTOCOL_LEN}; use libc::types::common::c95::c_void; use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; use libc::types::os::arch::c95::{c_long, c_ulong}; @@ -1106,6 +1108,47 @@ pub mod types { } pub type LPFILETIME = *mut FILETIME; + + pub struct GUID { + Data1: DWORD, + Data2: DWORD, + Data3: DWORD, + Data4: [BYTE, ..8], + } + + struct WSAPROTOCOLCHAIN { + ChainLen: c_int, + ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN], + } + + pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN; + + pub struct WSAPROTOCOL_INFO { + dwServiceFlags1: DWORD, + dwServiceFlags2: DWORD, + dwServiceFlags3: DWORD, + dwServiceFlags4: DWORD, + dwProviderFlags: DWORD, + ProviderId: GUID, + dwCatalogEntryId: DWORD, + ProtocolChain: WSAPROTOCOLCHAIN, + iVersion: c_int, + iAddressFamily: c_int, + iMaxSockAddr: c_int, + iMinSockAddr: c_int, + iSocketType: c_int, + iProtocol: c_int, + iProtocolMaxOffset: c_int, + iNetworkByteOrder: c_int, + iSecurityScheme: c_int, + dwMessageSize: DWORD, + dwProviderReserved: DWORD, + szProtocol: [u8, ..WSAPROTOCOL_LEN+1], + } + + pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO; + + pub type GROUP = c_uint; } } } @@ -1721,6 +1764,10 @@ pub mod consts { pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; pub static FILE_END: DWORD = 2; + + pub static MAX_PROTOCOL_CHAIN: DWORD = 7; + pub static WSAPROTOCOL_LEN: DWORD = 255; + pub static INVALID_SOCKET: DWORD = !0; } pub mod sysconf { } @@ -4098,6 +4145,8 @@ pub mod funcs { lpFrequency: *mut LARGE_INTEGER) -> BOOL; pub fn QueryPerformanceCounter( lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; + + pub fn GetCurrentProcessId() -> DWORD; } } diff --git a/src/libstd/option.rs b/src/libstd/option.rs index 39b516aeb12..7bb29fdfacf 100644 --- a/src/libstd/option.rs +++ b/src/libstd/option.rs @@ -480,7 +480,6 @@ mod tests { use iter::range; use str::StrSlice; - use util; use kinds::marker; use vec::ImmutableVector; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 35b1e21df06..8d02048d55c 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket { fn nodelay(&mut self) -> Result<(), IoError>; fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>; fn letdie(&mut self) -> Result<(), IoError>; + fn clone(&self) -> ~RtioTcpStream; } pub trait RtioSocket { @@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket { fn hear_broadcasts(&mut self) -> Result<(), IoError>; fn ignore_broadcasts(&mut self) -> Result<(), IoError>; + + fn clone(&self) -> ~RtioUdpSocket; } pub trait RtioTimer { @@ -253,6 +256,7 @@ pub trait RtioProcess { pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn clone(&self) -> ~RtioPipe; } pub trait RtioUnixListener { diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4804de75687..82957cd93ce 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -380,7 +380,6 @@ mod test { use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use task; #[test] fn somke_lock() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index c075f9b4ba8..715a10b9112 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -69,7 +69,6 @@ impl Void { mod tests { use super::*; use prelude::*; - use mem::size_of; #[test] fn identity_crisis() { diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs index 4a6a4d54ae3..d53c2dceba2 100644 --- a/src/libstd/vec.rs +++ b/src/libstd/vec.rs @@ -4253,7 +4253,7 @@ mod tests { let h = x.mut_last(); assert_eq!(*h.unwrap(), 5); - let mut y: &mut [int] = []; + let y: &mut [int] = []; assert!(y.mut_last().is_none()); } } |
