diff options
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 181 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 117 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 96 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 6 | ||||
| -rw-r--r-- | src/libstd/libc.rs | 49 | ||||
| -rw-r--r-- | src/libstd/option.rs | 1 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 4 | ||||
| -rw-r--r-- | src/libstd/unstable/mutex.rs | 1 | ||||
| -rw-r--r-- | src/libstd/util.rs | 1 | ||||
| -rw-r--r-- | src/libstd/vec.rs | 2 |
10 files changed, 453 insertions, 5 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a0bdc193d98..66ceb03082f 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -8,11 +8,42 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! TCP network connections +//! +//! This module contains the ability to open a TCP stream to a socket address, +//! as well as creating a socket server to accept incoming connections. The +//! destination and binding addresses can either be an IPv4 or IPv6 address. +//! +//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP +//! listener (socket server) implements the `Listener` and `Acceptor` traits. + +#[deny(missing_doc)]; + +use clone::Clone; use io::net::ip::SocketAddr; -use io::{Reader, Writer, Listener, Acceptor, IoResult}; +use io::{Reader, Writer, Listener, Acceptor}; +use io::IoResult; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; +/// A structure which represents a TCP stream between a local socket and a +/// remote socket. +/// +/// # Example +/// +/// ```rust +/// # #[allow(unused_must_use)]; +/// use std::io::net::tcp::TcpStream; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; +/// let mut stream = TcpStream::connect(addr); +/// +/// stream.write([1]); +/// let mut buf = [0]; +/// stream.read(buf); +/// drop(stream); // close the connection +/// ``` pub struct TcpStream { priv obj: ~RtioTcpStream } @@ -22,21 +53,40 @@ impl TcpStream { TcpStream { obj: s } } + /// Creates a TCP connection to a remote socket address. + /// + /// If no error is encountered, then `Ok(stream)` is returned. pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> { LocalIo::maybe_raise(|io| { io.tcp_connect(addr).map(TcpStream::new) }) } + /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult<SocketAddr> { self.obj.peer_name() } + /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { self.obj.socket_name() } } +impl Clone for TcpStream { + /// Creates a new handle to this TCP stream, allowing for simultaneous reads + /// and writes of this connection. + /// + /// The underlying TCP stream will not be closed until all handles to the + /// stream have been deallocated. All handles will also follow the same + /// stream, but two concurrent reads will not receive the same data. + /// Instead, the first read will receive the first packet received, and the + /// second read will receive the second packet. + fn clone(&self) -> TcpStream { + TcpStream { obj: self.obj.clone() } + } +} + impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } @@ -45,17 +95,56 @@ impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } } +/// A structure representing a socket server. This listener is used to create a +/// `TcpAcceptor` which can be used to accept sockets on a local port. +/// +/// # Example +/// +/// ```rust +/// # fn main() {} +/// # fn foo() { +/// # #[allow(unused_must_use, dead_code)]; +/// use std::io::net::tcp::TcpListener; +/// use std::io::net::ip::{Ipv4Addr, SocketAddr}; +/// use std::io::{Acceptor, Listener}; +/// +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 }; +/// let listener = TcpListener::bind(addr); +/// +/// // bind the listener to the specified address +/// let mut acceptor = listener.listen(); +/// +/// // accept connections and process them +/// # fn handle_client<T>(_: T) {} +/// for stream in acceptor.incoming() { +/// spawn(proc() { +/// handle_client(stream); +/// }); +/// } +/// +/// // close the socket server +/// drop(acceptor); +/// # } +/// ``` pub struct TcpListener { priv obj: ~RtioTcpListener } impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified local + /// socket address. This listener is not ready for accepting connections, + /// `listen` must be called on it before that's possible. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the + /// `socket_name` function. pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> { LocalIo::maybe_raise(|io| { io.tcp_bind(addr).map(|l| TcpListener { obj: l }) }) } + /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { self.obj.socket_name() } @@ -67,6 +156,9 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { } } +/// The accepting half of a TCP socket server. This structure is created through +/// a `TcpListener`'s `listen` method, and this object can be used to accept new +/// `TcpStream` instances. pub struct TcpAcceptor { priv obj: ~RtioTcpAcceptor } @@ -573,4 +665,91 @@ mod test { } let _listener = TcpListener::bind(addr); }) + + iotest!(fn tcp_clone_smoke() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + }) + + iotest!(fn tcp_clone_two_read() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn tcp_clone_two_write() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + spawn(proc() { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + }) } + diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 0ef62648afc..3c02f563847 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use clone::Clone; use result::{Ok, Err}; use io::net::ip::SocketAddr; use io::{Reader, Writer, IoResult}; @@ -41,6 +42,19 @@ impl UdpSocket { } } +impl Clone for UdpSocket { + /// Creates a new handle to this UDP socket, allowing for simultaneous reads + /// and writes of the socket. + /// + /// The underlying UDP socket will not be closed until all handles to the + /// socket have been deallocated. Two concurrent reads will not receive the + /// same data. Instead, the first read will receive the first packet + /// received, and the second read will receive the second packet. + fn clone(&self) -> UdpSocket { + UdpSocket { obj: self.obj.clone() } + } +} + pub struct UdpStream { priv socket: UdpSocket, priv connectedTo: SocketAddr @@ -250,4 +264,107 @@ mod test { iotest!(fn socket_name_ip6() { socket_name(next_test_ip6()); }) + + iotest!(fn udp_clone_smoke() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 0]; + assert_eq!(sock2.recvfrom(buf), Ok((1, addr1))); + assert_eq!(buf[0], 1); + sock2.sendto([2], addr1).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + p1.recv(); + sock3.sendto([1], addr2).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(sock1.recvfrom(buf), Ok((1, addr2))); + p2.recv(); + }) + + iotest!(fn udp_clone_two_read() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut sock2 = sock2; + sock2.sendto([1], addr1).unwrap(); + p.recv(); + sock2.sendto([2], addr1).unwrap(); + p.recv(); + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut sock3 = sock3; + let mut buf = [0, 0]; + sock3.recvfrom(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + sock1.recvfrom(buf).unwrap(); + c.send(()); + + p.recv(); + }) + + iotest!(fn udp_clone_two_write() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let (p, c) = SharedChan::new(); + + spawn(proc() { + let mut sock2 = sock2; + let mut buf = [0, 1]; + + for _ in p.iter() { + match sock2.recvfrom(buf) { + Ok(..) => {} + Err(e) => fail!("failed receive: {}", e), + } + } + }); + + let sock3 = sock1.clone(); + + let (p, done) = Chan::new(); + let c2 = c.clone(); + spawn(proc() { + let mut sock3 = sock3; + match sock3.sendto([1], addr2) { + Ok(..) => c2.send(()), + Err(..) => {} + } + done.send(()); + }); + match sock1.sendto([2], addr2) { + Ok(..) => c.send(()), + Err(..) => {} + } + + p.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index ce95b987663..3c7db9c8686 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -25,6 +25,7 @@ instances as clients. use prelude::*; use c_str::ToCStr; +use clone::Clone; use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; use rt::rtio::{RtioUnixAcceptor, RtioPipe}; use io::pipe::PipeStream; @@ -62,6 +63,12 @@ impl UnixStream { } } +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { obj: self.obj.clone() } + } +} + impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } @@ -228,4 +235,93 @@ mod tests { let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); } + + #[test] + fn unix_clone_smoke() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 0]; + assert_eq!(s.read(buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write([2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + p1.recv(); + s2.write([1]).unwrap(); + c2.send(()); + }); + c1.send(()); + let mut buf = [0, 0]; + assert_eq!(s1.read(buf), Ok(1)); + p2.recv(); + } + + #[test] + fn unix_clone_two_read() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + let (p, c) = SharedChan::new(); + let c2 = c.clone(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + s.write([1]).unwrap(); + p.recv(); + s.write([2]).unwrap(); + p.recv(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(buf).unwrap(); + c2.send(()); + done.send(()); + }); + let mut buf = [0, 0]; + s1.read(buf).unwrap(); + c.send(()); + + p.recv(); + } + + #[test] + fn unix_clone_two_write() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + spawn(proc() { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (p, done) = Chan::new(); + spawn(proc() { + let mut s2 = s2; + s2.write([1]).unwrap(); + done.send(()); + }); + s1.write([2]).unwrap(); + + p.recv(); + } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index ca85707149b..83250bdae73 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -51,6 +51,12 @@ impl PipeStream { } } +impl Clone for PipeStream { + fn clone(&self) -> PipeStream { + PipeStream { obj: self.obj.clone() } + } +} + impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 11a7b5dd191..057d618f444 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -960,6 +960,8 @@ pub mod types { } pub mod extra { use ptr; + use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN, + WSAPROTOCOL_LEN}; use libc::types::common::c95::c_void; use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; use libc::types::os::arch::c95::{c_long, c_ulong}; @@ -1106,6 +1108,47 @@ pub mod types { } pub type LPFILETIME = *mut FILETIME; + + pub struct GUID { + Data1: DWORD, + Data2: DWORD, + Data3: DWORD, + Data4: [BYTE, ..8], + } + + struct WSAPROTOCOLCHAIN { + ChainLen: c_int, + ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN], + } + + pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN; + + pub struct WSAPROTOCOL_INFO { + dwServiceFlags1: DWORD, + dwServiceFlags2: DWORD, + dwServiceFlags3: DWORD, + dwServiceFlags4: DWORD, + dwProviderFlags: DWORD, + ProviderId: GUID, + dwCatalogEntryId: DWORD, + ProtocolChain: WSAPROTOCOLCHAIN, + iVersion: c_int, + iAddressFamily: c_int, + iMaxSockAddr: c_int, + iMinSockAddr: c_int, + iSocketType: c_int, + iProtocol: c_int, + iProtocolMaxOffset: c_int, + iNetworkByteOrder: c_int, + iSecurityScheme: c_int, + dwMessageSize: DWORD, + dwProviderReserved: DWORD, + szProtocol: [u8, ..WSAPROTOCOL_LEN+1], + } + + pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO; + + pub type GROUP = c_uint; } } } @@ -1721,6 +1764,10 @@ pub mod consts { pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; pub static FILE_END: DWORD = 2; + + pub static MAX_PROTOCOL_CHAIN: DWORD = 7; + pub static WSAPROTOCOL_LEN: DWORD = 255; + pub static INVALID_SOCKET: DWORD = !0; } pub mod sysconf { } @@ -4098,6 +4145,8 @@ pub mod funcs { lpFrequency: *mut LARGE_INTEGER) -> BOOL; pub fn QueryPerformanceCounter( lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; + + pub fn GetCurrentProcessId() -> DWORD; } } diff --git a/src/libstd/option.rs b/src/libstd/option.rs index 39b516aeb12..7bb29fdfacf 100644 --- a/src/libstd/option.rs +++ b/src/libstd/option.rs @@ -480,7 +480,6 @@ mod tests { use iter::range; use str::StrSlice; - use util; use kinds::marker; use vec::ImmutableVector; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 35b1e21df06..8d02048d55c 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket { fn nodelay(&mut self) -> Result<(), IoError>; fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>; fn letdie(&mut self) -> Result<(), IoError>; + fn clone(&self) -> ~RtioTcpStream; } pub trait RtioSocket { @@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket { fn hear_broadcasts(&mut self) -> Result<(), IoError>; fn ignore_broadcasts(&mut self) -> Result<(), IoError>; + + fn clone(&self) -> ~RtioUdpSocket; } pub trait RtioTimer { @@ -253,6 +256,7 @@ pub trait RtioProcess { pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn clone(&self) -> ~RtioPipe; } pub trait RtioUnixListener { diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4804de75687..82957cd93ce 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -380,7 +380,6 @@ mod test { use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use task; #[test] fn somke_lock() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index c075f9b4ba8..715a10b9112 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -69,7 +69,6 @@ impl Void { mod tests { use super::*; use prelude::*; - use mem::size_of; #[test] fn identity_crisis() { diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs index 4a6a4d54ae3..d53c2dceba2 100644 --- a/src/libstd/vec.rs +++ b/src/libstd/vec.rs @@ -4253,7 +4253,7 @@ mod tests { let h = x.mut_last(); assert_eq!(*h.unwrap(), 5); - let mut y: &mut [int] = []; + let y: &mut [int] = []; assert!(y.mut_last().is_none()); } } |
