diff options
Diffstat (limited to 'src/libstd/old_io/net')
| -rw-r--r-- | src/libstd/old_io/net/addrinfo.rs | 137 | ||||
| -rw-r--r-- | src/libstd/old_io/net/ip.rs | 700 | ||||
| -rw-r--r-- | src/libstd/old_io/net/mod.rs | 46 | ||||
| -rw-r--r-- | src/libstd/old_io/net/pipe.rs | 864 | ||||
| -rw-r--r-- | src/libstd/old_io/net/tcp.rs | 1475 | ||||
| -rw-r--r-- | src/libstd/old_io/net/udp.rs | 457 |
6 files changed, 3679 insertions, 0 deletions
diff --git a/src/libstd/old_io/net/addrinfo.rs b/src/libstd/old_io/net/addrinfo.rs new file mode 100644 index 00000000000..9800cc6829e --- /dev/null +++ b/src/libstd/old_io/net/addrinfo.rs @@ -0,0 +1,137 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Synchronous DNS Resolution +//! +//! Contains the functionality to perform DNS resolution or reverse lookup, +//! in a style related to `getaddrinfo()` and `getnameinfo()`, respectively. + +#![allow(missing_docs)] + +pub use self::SocketType::*; +pub use self::Flag::*; +pub use self::Protocol::*; + +use iter::IteratorExt; +use old_io::{IoResult}; +use old_io::net::ip::{SocketAddr, IpAddr}; +use option::Option; +use option::Option::{Some, None}; +use string::String; +use sys; +use vec::Vec; + +/// Hints to the types of sockets that are desired when looking up hosts +#[derive(Copy, Show)] +pub enum SocketType { + Stream, Datagram, Raw +} + +/// Flags which can be or'd into the `flags` field of a `Hint`. These are used +/// to manipulate how a query is performed. +/// +/// The meaning of each of these flags can be found with `man -s 3 getaddrinfo` +#[derive(Copy, Show)] +pub enum Flag { + AddrConfig, + All, + CanonName, + NumericHost, + NumericServ, + Passive, + V4Mapped, +} + +/// A transport protocol associated with either a hint or a return value of +/// `lookup` +#[derive(Copy, Show)] +pub enum Protocol { + TCP, UDP +} + +/// This structure is used to provide hints when fetching addresses for a +/// remote host to control how the lookup is performed. +/// +/// For details on these fields, see their corresponding definitions via +/// `man -s 3 getaddrinfo` +#[derive(Copy, Show)] +pub struct Hint { + pub family: uint, + pub socktype: Option<SocketType>, + pub protocol: Option<Protocol>, + pub flags: uint, +} + +#[derive(Copy, Show)] +pub struct Info { + pub address: SocketAddr, + pub family: uint, + pub socktype: Option<SocketType>, + pub protocol: Option<Protocol>, + pub flags: uint, +} + +/// Easy name resolution. Given a hostname, returns the list of IP addresses for +/// that hostname. +pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> { + lookup(Some(host), None, None).map(|a| a.into_iter().map(|i| i.address.ip).collect()) +} + +/// Reverse name resolution. Given an address, returns the corresponding +/// hostname. +pub fn get_address_name(addr: IpAddr) -> IoResult<String> { + sys::addrinfo::get_address_name(addr) +} + +/// Full-fledged resolution. This function will perform a synchronous call to +/// getaddrinfo, controlled by the parameters +/// +/// # Arguments +/// +/// * hostname - an optional hostname to lookup against +/// * servname - an optional service name, listed in the system services +/// * hint - see the hint structure, and "man -s 3 getaddrinfo", for how this +/// controls lookup +/// +/// FIXME: this is not public because the `Hint` structure is not ready for public +/// consumption just yet. +#[allow(unused_variables)] +fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>) + -> IoResult<Vec<Info>> { + sys::addrinfo::get_host_addresses(hostname, servname, hint) +} + +// Ignored on android since we cannot give tcp/ip +// permission without help of apk +#[cfg(all(test, not(target_os = "android")))] +mod test { + use prelude::v1::*; + use super::*; + use old_io::net::ip::*; + + #[test] + fn dns_smoke_test() { + let ipaddrs = get_host_addresses("localhost").unwrap(); + let mut found_local = false; + let local_addr = &Ipv4Addr(127, 0, 0, 1); + for addr in ipaddrs.iter() { + found_local = found_local || addr == local_addr; + } + assert!(found_local); + } + + #[ignore] + #[test] + fn issue_10663() { + // Something should happen here, but this certainly shouldn't cause + // everything to die. The actual outcome we don't care too much about. + get_host_addresses("example.com").unwrap(); + } +} diff --git a/src/libstd/old_io/net/ip.rs b/src/libstd/old_io/net/ip.rs new file mode 100644 index 00000000000..26eb068b151 --- /dev/null +++ b/src/libstd/old_io/net/ip.rs @@ -0,0 +1,700 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Internet Protocol (IP) addresses. +//! +//! This module contains functions useful for parsing, formatting, and +//! manipulating IP addresses. + +#![allow(missing_docs)] + +pub use self::IpAddr::*; + +use boxed::Box; +use fmt; +use old_io::{self, IoResult, IoError}; +use old_io::net; +use iter::{Iterator, IteratorExt}; +use ops::{FnOnce, FnMut}; +use option::Option; +use option::Option::{None, Some}; +use result::Result::{Ok, Err}; +use slice::SliceExt; +use str::{FromStr, StrExt}; +use vec::Vec; + +pub type Port = u16; + +#[derive(Copy, PartialEq, Eq, Clone, Hash, Show)] +pub enum IpAddr { + Ipv4Addr(u8, u8, u8, u8), + Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16) +} + +#[stable] +impl fmt::Display for IpAddr { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + Ipv4Addr(a, b, c, d) => + write!(fmt, "{}.{}.{}.{}", a, b, c, d), + + // Ipv4 Compatible address + Ipv6Addr(0, 0, 0, 0, 0, 0, g, h) => { + write!(fmt, "::{}.{}.{}.{}", (g >> 8) as u8, g as u8, + (h >> 8) as u8, h as u8) + } + + // Ipv4-Mapped address + Ipv6Addr(0, 0, 0, 0, 0, 0xFFFF, g, h) => { + write!(fmt, "::FFFF:{}.{}.{}.{}", (g >> 8) as u8, g as u8, + (h >> 8) as u8, h as u8) + } + + Ipv6Addr(a, b, c, d, e, f, g, h) => + write!(fmt, "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + a, b, c, d, e, f, g, h) + } + } +} + +#[derive(Copy, PartialEq, Eq, Clone, Hash, Show)] +pub struct SocketAddr { + pub ip: IpAddr, + pub port: Port, +} + +#[stable] +impl fmt::Display for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.ip { + Ipv4Addr(..) => write!(f, "{}:{}", self.ip, self.port), + Ipv6Addr(..) => write!(f, "[{}]:{}", self.ip, self.port), + } + } +} + +struct Parser<'a> { + // parsing as ASCII, so can use byte array + s: &'a [u8], + pos: uint, +} + +impl<'a> Parser<'a> { + fn new(s: &'a str) -> Parser<'a> { + Parser { + s: s.as_bytes(), + pos: 0, + } + } + + fn is_eof(&self) -> bool { + self.pos == self.s.len() + } + + // Commit only if parser returns Some + fn read_atomically<T, F>(&mut self, cb: F) -> Option<T> where + F: FnOnce(&mut Parser) -> Option<T>, + { + let pos = self.pos; + let r = cb(self); + if r.is_none() { + self.pos = pos; + } + r + } + + // Commit only if parser read till EOF + fn read_till_eof<T, F>(&mut self, cb: F) -> Option<T> where + F: FnOnce(&mut Parser) -> Option<T>, + { + self.read_atomically(move |p| { + match cb(p) { + Some(x) => if p.is_eof() {Some(x)} else {None}, + None => None, + } + }) + } + + // Return result of first successful parser + fn read_or<T>(&mut self, parsers: &mut [Box<FnMut(&mut Parser) -> Option<T>>]) + -> Option<T> { + for pf in parsers.iter_mut() { + match self.read_atomically(|p: &mut Parser| pf.call_mut((p,))) { + Some(r) => return Some(r), + None => {} + } + } + None + } + + // Apply 3 parsers sequentially + fn read_seq_3<A, B, C, PA, PB, PC>(&mut self, + pa: PA, + pb: PB, + pc: PC) + -> Option<(A, B, C)> where + PA: FnOnce(&mut Parser) -> Option<A>, + PB: FnOnce(&mut Parser) -> Option<B>, + PC: FnOnce(&mut Parser) -> Option<C>, + { + self.read_atomically(move |p| { + let a = pa(p); + let b = if a.is_some() { pb(p) } else { None }; + let c = if b.is_some() { pc(p) } else { None }; + match (a, b, c) { + (Some(a), Some(b), Some(c)) => Some((a, b, c)), + _ => None + } + }) + } + + // Read next char + fn read_char(&mut self) -> Option<char> { + if self.is_eof() { + None + } else { + let r = self.s[self.pos] as char; + self.pos += 1; + Some(r) + } + } + + // Return char and advance iff next char is equal to requested + fn read_given_char(&mut self, c: char) -> Option<char> { + self.read_atomically(|p| { + match p.read_char() { + Some(next) if next == c => Some(next), + _ => None, + } + }) + } + + // Read digit + fn read_digit(&mut self, radix: u8) -> Option<u8> { + fn parse_digit(c: char, radix: u8) -> Option<u8> { + let c = c as u8; + // assuming radix is either 10 or 16 + if c >= b'0' && c <= b'9' { + Some(c - b'0') + } else if radix > 10 && c >= b'a' && c < b'a' + (radix - 10) { + Some(c - b'a' + 10) + } else if radix > 10 && c >= b'A' && c < b'A' + (radix - 10) { + Some(c - b'A' + 10) + } else { + None + } + } + + self.read_atomically(|p| { + p.read_char().and_then(|c| parse_digit(c, radix)) + }) + } + + fn read_number_impl(&mut self, radix: u8, max_digits: u32, upto: u32) -> Option<u32> { + let mut r = 0u32; + let mut digit_count = 0; + loop { + match self.read_digit(radix) { + Some(d) => { + r = r * (radix as u32) + (d as u32); + digit_count += 1; + if digit_count > max_digits || r >= upto { + return None + } + } + None => { + if digit_count == 0 { + return None + } else { + return Some(r) + } + } + }; + } + } + + // Read number, failing if max_digits of number value exceeded + fn read_number(&mut self, radix: u8, max_digits: u32, upto: u32) -> Option<u32> { + self.read_atomically(|p| p.read_number_impl(radix, max_digits, upto)) + } + + fn read_ipv4_addr_impl(&mut self) -> Option<IpAddr> { + let mut bs = [0u8; 4]; + let mut i = 0; + while i < 4 { + if i != 0 && self.read_given_char('.').is_none() { + return None; + } + + let octet = self.read_number(10, 3, 0x100).map(|n| n as u8); + match octet { + Some(d) => bs[i] = d, + None => return None, + }; + i += 1; + } + Some(Ipv4Addr(bs[0], bs[1], bs[2], bs[3])) + } + + // Read IPv4 address + fn read_ipv4_addr(&mut self) -> Option<IpAddr> { + self.read_atomically(|p| p.read_ipv4_addr_impl()) + } + + fn read_ipv6_addr_impl(&mut self) -> Option<IpAddr> { + fn ipv6_addr_from_head_tail(head: &[u16], tail: &[u16]) -> IpAddr { + assert!(head.len() + tail.len() <= 8); + let mut gs = [0u16; 8]; + gs.clone_from_slice(head); + gs[(8 - tail.len()) .. 8].clone_from_slice(tail); + Ipv6Addr(gs[0], gs[1], gs[2], gs[3], gs[4], gs[5], gs[6], gs[7]) + } + + fn read_groups(p: &mut Parser, groups: &mut [u16; 8], limit: uint) -> (uint, bool) { + let mut i = 0; + while i < limit { + if i < limit - 1 { + let ipv4 = p.read_atomically(|p| { + if i == 0 || p.read_given_char(':').is_some() { + p.read_ipv4_addr() + } else { + None + } + }); + match ipv4 { + Some(Ipv4Addr(a, b, c, d)) => { + groups[i + 0] = ((a as u16) << 8) | (b as u16); + groups[i + 1] = ((c as u16) << 8) | (d as u16); + return (i + 2, true); + } + _ => {} + } + } + + let group = p.read_atomically(|p| { + if i == 0 || p.read_given_char(':').is_some() { + p.read_number(16, 4, 0x10000).map(|n| n as u16) + } else { + None + } + }); + match group { + Some(g) => groups[i] = g, + None => return (i, false) + } + i += 1; + } + (i, false) + } + + let mut head = [0u16; 8]; + let (head_size, head_ipv4) = read_groups(self, &mut head, 8); + + if head_size == 8 { + return Some(Ipv6Addr( + head[0], head[1], head[2], head[3], + head[4], head[5], head[6], head[7])) + } + + // IPv4 part is not allowed before `::` + if head_ipv4 { + return None + } + + // read `::` if previous code parsed less than 8 groups + if !self.read_given_char(':').is_some() || !self.read_given_char(':').is_some() { + return None; + } + + let mut tail = [0u16; 8]; + let (tail_size, _) = read_groups(self, &mut tail, 8 - head_size); + Some(ipv6_addr_from_head_tail(&head[..head_size], &tail[..tail_size])) + } + + fn read_ipv6_addr(&mut self) -> Option<IpAddr> { + self.read_atomically(|p| p.read_ipv6_addr_impl()) + } + + fn read_ip_addr(&mut self) -> Option<IpAddr> { + let ipv4_addr = |&mut: p: &mut Parser| p.read_ipv4_addr(); + let ipv6_addr = |&mut: p: &mut Parser| p.read_ipv6_addr(); + self.read_or(&mut [box ipv4_addr, box ipv6_addr]) + } + + fn read_socket_addr(&mut self) -> Option<SocketAddr> { + let ip_addr = |&: p: &mut Parser| { + let ipv4_p = |&mut: p: &mut Parser| p.read_ip_addr(); + let ipv6_p = |&mut: p: &mut Parser| { + let open_br = |&: p: &mut Parser| p.read_given_char('['); + let ip_addr = |&: p: &mut Parser| p.read_ipv6_addr(); + let clos_br = |&: p: &mut Parser| p.read_given_char(']'); + p.read_seq_3::<char, IpAddr, char, _, _, _>(open_br, ip_addr, clos_br) + .map(|t| match t { (_, ip, _) => ip }) + }; + p.read_or(&mut [box ipv4_p, box ipv6_p]) + }; + let colon = |&: p: &mut Parser| p.read_given_char(':'); + let port = |&: p: &mut Parser| p.read_number(10, 5, 0x10000).map(|n| n as u16); + + // host, colon, port + self.read_seq_3::<IpAddr, char, u16, _, _, _>(ip_addr, colon, port) + .map(|t| match t { (ip, _, port) => SocketAddr { ip: ip, port: port } }) + } +} + +impl FromStr for IpAddr { + fn from_str(s: &str) -> Option<IpAddr> { + Parser::new(s).read_till_eof(|p| p.read_ip_addr()) + } +} + +impl FromStr for SocketAddr { + fn from_str(s: &str) -> Option<SocketAddr> { + Parser::new(s).read_till_eof(|p| p.read_socket_addr()) + } +} + +/// A trait for objects which can be converted or resolved to one or more `SocketAddr` values. +/// +/// Implementing types minimally have to implement either `to_socket_addr` or `to_socket_addr_all` +/// method, and its trivial counterpart will be available automatically. +/// +/// This trait is used for generic address resolution when constructing network objects. +/// By default it is implemented for the following types: +/// +/// * `SocketAddr` - `to_socket_addr` is identity function. +/// +/// * `(IpAddr, u16)` - `to_socket_addr` constructs `SocketAddr` trivially. +/// +/// * `(&str, u16)` - the string should be either a string representation of an IP address +/// expected by `FromStr` implementation for `IpAddr` or a host name. +/// +/// For the former, `to_socket_addr_all` returns a vector with a single element corresponding +/// to that IP address joined with the given port. +/// +/// For the latter, it tries to resolve the host name and returns a vector of all IP addresses +/// for the host name, each joined with the given port. +/// +/// * `&str` - the string should be either a string representation of a `SocketAddr` as +/// expected by its `FromStr` implementation or a string like `<host_name>:<port>` pair +/// where `<port>` is a `u16` value. +/// +/// For the former, `to_socket_addr_all` returns a vector with a single element corresponding +/// to that socket address. +/// +/// For the latter, it tries to resolve the host name and returns a vector of all IP addresses +/// for the host name, each joined with the port. +/// +/// +/// This trait allows constructing network objects like `TcpStream` or `UdpSocket` easily with +/// values of various types for the bind/connection address. It is needed because sometimes +/// one type is more appropriate than the other: for simple uses a string like `"localhost:12345"` +/// is much nicer than manual construction of the corresponding `SocketAddr`, but sometimes +/// `SocketAddr` value is *the* main source of the address, and converting it to some other type +/// (e.g. a string) just for it to be converted back to `SocketAddr` in constructor methods +/// is pointless. +/// +/// Some examples: +/// +/// ```rust,no_run +/// # #![allow(unused_must_use)] +/// +/// use std::old_io::{TcpStream, TcpListener}; +/// use std::old_io::net::udp::UdpSocket; +/// use std::old_io::net::ip::{Ipv4Addr, SocketAddr}; +/// +/// fn main() { +/// // The following lines are equivalent modulo possible "localhost" name resolution +/// // differences +/// let tcp_s = TcpStream::connect(SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 12345 }); +/// let tcp_s = TcpStream::connect((Ipv4Addr(127, 0, 0, 1), 12345u16)); +/// let tcp_s = TcpStream::connect(("127.0.0.1", 12345u16)); +/// let tcp_s = TcpStream::connect(("localhost", 12345u16)); +/// let tcp_s = TcpStream::connect("127.0.0.1:12345"); +/// let tcp_s = TcpStream::connect("localhost:12345"); +/// +/// // TcpListener::bind(), UdpSocket::bind() and UdpSocket::send_to() behave similarly +/// let tcp_l = TcpListener::bind("localhost:12345"); +/// +/// let mut udp_s = UdpSocket::bind(("127.0.0.1", 23451u16)).unwrap(); +/// udp_s.send_to([7u8, 7u8, 7u8].as_slice(), (Ipv4Addr(127, 0, 0, 1), 23451u16)); +/// } +/// ``` +pub trait ToSocketAddr { + /// Converts this object to single socket address value. + /// + /// If more than one value is available, this method returns the first one. If no + /// values are available, this method returns an `IoError`. + /// + /// By default this method delegates to `to_socket_addr_all` method, taking the first + /// item from its result. + fn to_socket_addr(&self) -> IoResult<SocketAddr> { + self.to_socket_addr_all() + .and_then(|v| v.into_iter().next().ok_or_else(|| IoError { + kind: old_io::InvalidInput, + desc: "no address available", + detail: None + })) + } + + /// Converts this object to all available socket address values. + /// + /// Some values like host name string naturally correspond to multiple IP addresses. + /// This method tries to return all available addresses corresponding to this object. + /// + /// By default this method delegates to `to_socket_addr` method, creating a singleton + /// vector from its result. + #[inline] + fn to_socket_addr_all(&self) -> IoResult<Vec<SocketAddr>> { + self.to_socket_addr().map(|a| vec![a]) + } +} + +impl ToSocketAddr for SocketAddr { + #[inline] + fn to_socket_addr(&self) -> IoResult<SocketAddr> { Ok(*self) } +} + +impl ToSocketAddr for (IpAddr, u16) { + #[inline] + fn to_socket_addr(&self) -> IoResult<SocketAddr> { + let (ip, port) = *self; + Ok(SocketAddr { ip: ip, port: port }) + } +} + +fn resolve_socket_addr(s: &str, p: u16) -> IoResult<Vec<SocketAddr>> { + net::get_host_addresses(s) + .map(|v| v.into_iter().map(|a| SocketAddr { ip: a, port: p }).collect()) +} + +fn parse_and_resolve_socket_addr(s: &str) -> IoResult<Vec<SocketAddr>> { + macro_rules! try_opt { + ($e:expr, $msg:expr) => ( + match $e { + Some(r) => r, + None => return Err(IoError { + kind: old_io::InvalidInput, + desc: $msg, + detail: None + }) + } + ) + } + + // split the string by ':' and convert the second part to u16 + let mut parts_iter = s.rsplitn(2, ':'); + let port_str = try_opt!(parts_iter.next(), "invalid socket address"); + let host = try_opt!(parts_iter.next(), "invalid socket address"); + let port: u16 = try_opt!(FromStr::from_str(port_str), "invalid port value"); + resolve_socket_addr(host, port) +} + +impl<'a> ToSocketAddr for (&'a str, u16) { + fn to_socket_addr_all(&self) -> IoResult<Vec<SocketAddr>> { + let (host, port) = *self; + + // try to parse the host as a regular IpAddr first + match FromStr::from_str(host) { + Some(addr) => return Ok(vec![SocketAddr { + ip: addr, + port: port + }]), + None => {} + } + + resolve_socket_addr(host, port) + } +} + +// accepts strings like 'localhost:12345' +impl<'a> ToSocketAddr for &'a str { + fn to_socket_addr(&self) -> IoResult<SocketAddr> { + // try to parse as a regular SocketAddr first + match FromStr::from_str(*self) { + Some(addr) => return Ok(addr), + None => {} + } + + parse_and_resolve_socket_addr(*self) + .and_then(|v| v.into_iter().next() + .ok_or_else(|| IoError { + kind: old_io::InvalidInput, + desc: "no address available", + detail: None + }) + ) + } + + fn to_socket_addr_all(&self) -> IoResult<Vec<SocketAddr>> { + // try to parse as a regular SocketAddr first + match FromStr::from_str(*self) { + Some(addr) => return Ok(vec![addr]), + None => {} + } + + parse_and_resolve_socket_addr(*self) + } +} + + +#[cfg(test)] +mod test { + use prelude::v1::*; + use super::*; + use str::FromStr; + + #[test] + fn test_from_str_ipv4() { + assert_eq!(Some(Ipv4Addr(127, 0, 0, 1)), FromStr::from_str("127.0.0.1")); + assert_eq!(Some(Ipv4Addr(255, 255, 255, 255)), FromStr::from_str("255.255.255.255")); + assert_eq!(Some(Ipv4Addr(0, 0, 0, 0)), FromStr::from_str("0.0.0.0")); + + // out of range + let none: Option<IpAddr> = FromStr::from_str("256.0.0.1"); + assert_eq!(None, none); + // too short + let none: Option<IpAddr> = FromStr::from_str("255.0.0"); + assert_eq!(None, none); + // too long + let none: Option<IpAddr> = FromStr::from_str("255.0.0.1.2"); + assert_eq!(None, none); + // no number between dots + let none: Option<IpAddr> = FromStr::from_str("255.0..1"); + assert_eq!(None, none); + } + + #[test] + fn test_from_str_ipv6() { + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 0)), FromStr::from_str("0:0:0:0:0:0:0:0")); + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1)), FromStr::from_str("0:0:0:0:0:0:0:1")); + + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1)), FromStr::from_str("::1")); + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 0)), FromStr::from_str("::")); + + assert_eq!(Some(Ipv6Addr(0x2a02, 0x6b8, 0, 0, 0, 0, 0x11, 0x11)), + FromStr::from_str("2a02:6b8::11:11")); + + // too long group + let none: Option<IpAddr> = FromStr::from_str("::00000"); + assert_eq!(None, none); + // too short + let none: Option<IpAddr> = FromStr::from_str("1:2:3:4:5:6:7"); + assert_eq!(None, none); + // too long + let none: Option<IpAddr> = FromStr::from_str("1:2:3:4:5:6:7:8:9"); + assert_eq!(None, none); + // triple colon + let none: Option<IpAddr> = FromStr::from_str("1:2:::6:7:8"); + assert_eq!(None, none); + // two double colons + let none: Option<IpAddr> = FromStr::from_str("1:2::6::8"); + assert_eq!(None, none); + } + + #[test] + fn test_from_str_ipv4_in_ipv6() { + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0, 49152, 545)), + FromStr::from_str("::192.0.2.33")); + assert_eq!(Some(Ipv6Addr(0, 0, 0, 0, 0, 0xFFFF, 49152, 545)), + FromStr::from_str("::FFFF:192.0.2.33")); + assert_eq!(Some(Ipv6Addr(0x64, 0xff9b, 0, 0, 0, 0, 49152, 545)), + FromStr::from_str("64:ff9b::192.0.2.33")); + assert_eq!(Some(Ipv6Addr(0x2001, 0xdb8, 0x122, 0xc000, 0x2, 0x2100, 49152, 545)), + FromStr::from_str("2001:db8:122:c000:2:2100:192.0.2.33")); + + // colon after v4 + let none: Option<IpAddr> = FromStr::from_str("::127.0.0.1:"); + assert_eq!(None, none); + // not enough groups + let none: Option<IpAddr> = FromStr::from_str("1.2.3.4.5:127.0.0.1"); + assert_eq!(None, none); + // too many groups + let none: Option<IpAddr> = + FromStr::from_str("1.2.3.4.5:6:7:127.0.0.1"); + assert_eq!(None, none); + } + + #[test] + fn test_from_str_socket_addr() { + assert_eq!(Some(SocketAddr { ip: Ipv4Addr(77, 88, 21, 11), port: 80 }), + FromStr::from_str("77.88.21.11:80")); + assert_eq!(Some(SocketAddr { ip: Ipv6Addr(0x2a02, 0x6b8, 0, 1, 0, 0, 0, 1), port: 53 }), + FromStr::from_str("[2a02:6b8:0:1::1]:53")); + assert_eq!(Some(SocketAddr { ip: Ipv6Addr(0, 0, 0, 0, 0, 0, 0x7F00, 1), port: 22 }), + FromStr::from_str("[::127.0.0.1]:22")); + + // without port + let none: Option<SocketAddr> = FromStr::from_str("127.0.0.1"); + assert_eq!(None, none); + // without port + let none: Option<SocketAddr> = FromStr::from_str("127.0.0.1:"); + assert_eq!(None, none); + // wrong brackets around v4 + let none: Option<SocketAddr> = FromStr::from_str("[127.0.0.1]:22"); + assert_eq!(None, none); + // port out of range + let none: Option<SocketAddr> = FromStr::from_str("127.0.0.1:123456"); + assert_eq!(None, none); + } + + #[test] + fn ipv6_addr_to_string() { + let a1 = Ipv6Addr(0, 0, 0, 0, 0, 0xffff, 0xc000, 0x280); + assert!(a1.to_string() == "::ffff:192.0.2.128" || + a1.to_string() == "::FFFF:192.0.2.128"); + assert_eq!(Ipv6Addr(8, 9, 10, 11, 12, 13, 14, 15).to_string(), + "8:9:a:b:c:d:e:f"); + } + + #[test] + fn to_socket_addr_socketaddr() { + let a = SocketAddr { ip: Ipv4Addr(77, 88, 21, 11), port: 12345 }; + assert_eq!(Ok(a), a.to_socket_addr()); + assert_eq!(Ok(vec![a]), a.to_socket_addr_all()); + } + + #[test] + fn to_socket_addr_ipaddr_u16() { + let a = Ipv4Addr(77, 88, 21, 11); + let p = 12345u16; + let e = SocketAddr { ip: a, port: p }; + assert_eq!(Ok(e), (a, p).to_socket_addr()); + assert_eq!(Ok(vec![e]), (a, p).to_socket_addr_all()); + } + + #[test] + fn to_socket_addr_str_u16() { + let a = SocketAddr { ip: Ipv4Addr(77, 88, 21, 11), port: 24352 }; + assert_eq!(Ok(a), ("77.88.21.11", 24352u16).to_socket_addr()); + assert_eq!(Ok(vec![a]), ("77.88.21.11", 24352u16).to_socket_addr_all()); + + let a = SocketAddr { ip: Ipv6Addr(0x2a02, 0x6b8, 0, 1, 0, 0, 0, 1), port: 53 }; + assert_eq!(Ok(a), ("2a02:6b8:0:1::1", 53).to_socket_addr()); + assert_eq!(Ok(vec![a]), ("2a02:6b8:0:1::1", 53).to_socket_addr_all()); + + let a = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 23924 }; + assert!(("localhost", 23924u16).to_socket_addr_all().unwrap().contains(&a)); + } + + #[test] + fn to_socket_addr_str() { + let a = SocketAddr { ip: Ipv4Addr(77, 88, 21, 11), port: 24352 }; + assert_eq!(Ok(a), "77.88.21.11:24352".to_socket_addr()); + assert_eq!(Ok(vec![a]), "77.88.21.11:24352".to_socket_addr_all()); + + let a = SocketAddr { ip: Ipv6Addr(0x2a02, 0x6b8, 0, 1, 0, 0, 0, 1), port: 53 }; + assert_eq!(Ok(a), "[2a02:6b8:0:1::1]:53".to_socket_addr()); + assert_eq!(Ok(vec![a]), "[2a02:6b8:0:1::1]:53".to_socket_addr_all()); + + let a = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 23924 }; + assert!("localhost:23924".to_socket_addr_all().unwrap().contains(&a)); + } +} diff --git a/src/libstd/old_io/net/mod.rs b/src/libstd/old_io/net/mod.rs new file mode 100644 index 00000000000..d8394aa8b6a --- /dev/null +++ b/src/libstd/old_io/net/mod.rs @@ -0,0 +1,46 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Networking I/O + +use old_io::{IoError, IoResult, InvalidInput}; +use ops::FnMut; +use option::Option::None; +use result::Result::{Ok, Err}; +use self::ip::{SocketAddr, ToSocketAddr}; + +pub use self::addrinfo::get_host_addresses; + +pub mod addrinfo; +pub mod tcp; +pub mod udp; +pub mod ip; +pub mod pipe; + +fn with_addresses<A, T, F>(addr: A, mut action: F) -> IoResult<T> where + A: ToSocketAddr, + F: FnMut(SocketAddr) -> IoResult<T>, +{ + const DEFAULT_ERROR: IoError = IoError { + kind: InvalidInput, + desc: "no addresses found for hostname", + detail: None + }; + + let addresses = try!(addr.to_socket_addr_all()); + let mut err = DEFAULT_ERROR; + for addr in addresses.into_iter() { + match action(addr) { + Ok(r) => return Ok(r), + Err(e) => err = e + } + } + Err(err) +} diff --git a/src/libstd/old_io/net/pipe.rs b/src/libstd/old_io/net/pipe.rs new file mode 100644 index 00000000000..2ed6d8118d5 --- /dev/null +++ b/src/libstd/old_io/net/pipe.rs @@ -0,0 +1,864 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Named pipes +//! +//! This module contains the ability to communicate over named pipes with +//! synchronous I/O. On windows, this corresponds to talking over a Named Pipe, +//! while on Unix it corresponds to UNIX domain sockets. +//! +//! These pipes are similar to TCP in the sense that you can have both a stream to a +//! server and a server itself. The server provided accepts other `UnixStream` +//! instances as clients. + +#![allow(missing_docs)] + +use prelude::v1::*; + +use ffi::CString; +use path::BytesContainer; +use old_io::{Listener, Acceptor, IoResult, TimedOut, standard_error}; +use sys::pipe::UnixAcceptor as UnixAcceptorImp; +use sys::pipe::UnixListener as UnixListenerImp; +use sys::pipe::UnixStream as UnixStreamImp; +use time::Duration; + +use sys_common; + +/// A stream which communicates over a named pipe. +pub struct UnixStream { + inner: UnixStreamImp, +} + +impl UnixStream { + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Example + /// + /// ```rust + /// # #![allow(unused_must_use)] + /// use std::old_io::net::pipe::UnixStream; + /// + /// let server = Path::new("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write(&[1, 2, 3]); + /// ``` + pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> { + let path = CString::from_slice(path.container_as_bytes()); + UnixStreamImp::connect(&path, None) + .map(|inner| UnixStream { inner: inner }) + } + + /// Connect to a pipe named by `path`, timing out if the specified number of + /// milliseconds. + /// + /// This function is similar to `connect`, except that if `timeout` + /// elapses the function will return an error of kind `TimedOut`. + /// + /// If a `timeout` with zero or negative duration is specified then + /// the function returns `Err`, with the error kind set to `TimedOut`. + #[unstable = "the timeout argument is likely to change types"] + pub fn connect_timeout<P>(path: P, timeout: Duration) + -> IoResult<UnixStream> + where P: BytesContainer { + if timeout <= Duration::milliseconds(0) { + return Err(standard_error(TimedOut)); + } + + let path = CString::from_slice(path.container_as_bytes()); + UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64)) + .map(|inner| UnixStream { inner: inner }) + } + + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { + self.inner.close_read() + } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all pending and future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { + self.inner.close_write() + } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_write_timeout(timeout_ms) + } +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { inner: self.inner.clone() } + } +} + +impl Reader for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + self.inner.read(buf) + } +} + +impl Writer for UnixStream { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.inner.write(buf) + } +} + +impl sys_common::AsInner<UnixStreamImp> for UnixStream { + fn as_inner(&self) -> &UnixStreamImp { + &self.inner + } +} + +/// A value that can listen for incoming named pipe connection requests. +pub struct UnixListener { + /// The internal, opaque runtime Unix listener. + inner: UnixListenerImp, +} + +impl UnixListener { + /// Creates a new listener, ready to receive incoming connections on the + /// specified socket. The server will be named by `path`. + /// + /// This listener will be closed when it falls out of scope. + /// + /// # Example + /// + /// ``` + /// # fn foo() { + /// use std::old_io::net::pipe::UnixListener; + /// use std::old_io::{Listener, Acceptor}; + /// + /// let server = Path::new("/path/to/my/socket"); + /// let stream = UnixListener::bind(&server); + /// for mut client in stream.listen().incoming() { + /// client.write(&[1, 2, 3, 4]); + /// } + /// # } + /// ``` + pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> { + let path = CString::from_slice(path.container_as_bytes()); + UnixListenerImp::bind(&path) + .map(|inner| UnixListener { inner: inner }) + } +} + +impl Listener<UnixStream, UnixAcceptor> for UnixListener { + fn listen(self) -> IoResult<UnixAcceptor> { + self.inner.listen() + .map(|inner| UnixAcceptor { inner: inner }) + } +} + +impl sys_common::AsInner<UnixListenerImp> for UnixListener { + fn as_inner(&self) -> &UnixListenerImp { + &self.inner + } +} + +/// A value that can accept named pipe connections, returned from `listen()`. +pub struct UnixAcceptor { + /// The internal, opaque runtime Unix acceptor. + inner: UnixAcceptorImp +} + +impl UnixAcceptor { + /// Sets a timeout for this acceptor, after which accept() will no longer + /// block indefinitely. + /// + /// The argument specified is the amount of time, in milliseconds, into the + /// future after which all invocations of accept() will not block (and any + /// pending invocation will return). A value of `None` will clear any + /// existing timeout. + /// + /// When using this method, it is likely necessary to reset the timeout as + /// appropriate, the timeout specified is specific to this object, not + /// specific to the next request. + #[unstable = "the name and arguments to this function are likely \ + to change"] + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_timeout(timeout_ms) + } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function has the same semantics as `TcpAcceptor::close_accept`, and + /// more information can be found in that documentation. + #[unstable] + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.close_accept() + } +} + +impl Acceptor<UnixStream> for UnixAcceptor { + fn accept(&mut self) -> IoResult<UnixStream> { + self.inner.accept().map(|s| { + UnixStream { inner: s } + }) + } +} + +impl Clone for UnixAcceptor { + /// Creates a new handle to this unix acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying unix acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> UnixAcceptor { + UnixAcceptor { inner: self.inner.clone() } + } +} + +impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor { + fn as_inner(&self) -> &UnixAcceptorImp { + &self.inner + } +} + +#[cfg(test)] +mod tests { + use prelude::v1::*; + + use old_io::fs::PathExtensions; + use old_io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset}; + use old_io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError}; + use old_io::{PermissionDenied, Acceptor, Listener}; + use old_io::test::*; + use super::*; + use sync::mpsc::channel; + use thread::Thread; + use time::Duration; + + pub fn smalltest<F,G>(server: F, client: G) + where F : FnOnce(UnixStream), F : Send, + G : FnOnce(UnixStream), G : Send + { + let path1 = next_test_unix(); + let path2 = path1.clone(); + + let mut acceptor = UnixListener::bind(&path1).listen(); + + let _t = Thread::spawn(move|| { + match UnixStream::connect(&path2) { + Ok(c) => client(c), + Err(e) => panic!("failed connect: {}", e), + } + }); + + match acceptor.accept() { + Ok(c) => server(c), + Err(e) => panic!("failed accept: {}", e), + } + } + + #[test] + fn bind_error() { + let path = "path/to/nowhere"; + match UnixListener::bind(&path) { + Ok(..) => panic!(), + Err(e) => { + assert!(e.kind == PermissionDenied || e.kind == FileNotFound || + e.kind == InvalidInput); + } + } + } + + #[test] + fn connect_error() { + let path = if cfg!(windows) { + r"\\.\pipe\this_should_not_exist_ever" + } else { + "path/to/nowhere" + }; + match UnixStream::connect(&path) { + Ok(..) => panic!(), + Err(e) => { + assert!(e.kind == FileNotFound || e.kind == OtherIoError); + } + } + } + + #[test] + fn smoke() { + smalltest(move |mut server| { + let mut buf = [0]; + server.read(&mut buf).unwrap(); + assert!(buf[0] == 99); + }, move|mut client| { + client.write(&[99]).unwrap(); + }) + } + + #[cfg_attr(windows, ignore)] // FIXME(#12516) + #[test] + fn read_eof() { + smalltest(move|mut server| { + let mut buf = [0]; + assert!(server.read(&mut buf).is_err()); + assert!(server.read(&mut buf).is_err()); + }, move|_client| { + // drop the client + }) + } + + #[test] + fn write_begone() { + smalltest(move|mut server| { + let buf = [0]; + loop { + match server.write(&buf) { + Ok(..) => {} + Err(e) => { + assert!(e.kind == BrokenPipe || + e.kind == NotConnected || + e.kind == ConnectionReset, + "unknown error {}", e); + break; + } + } + } + }, move|_client| { + // drop the client + }) + } + + #[test] + fn accept_lots() { + let times = 10; + let path1 = next_test_unix(); + let path2 = path1.clone(); + + let mut acceptor = match UnixListener::bind(&path1).listen() { + Ok(a) => a, + Err(e) => panic!("failed listen: {}", e), + }; + + let _t = Thread::spawn(move|| { + for _ in range(0u, times) { + let mut stream = UnixStream::connect(&path2); + match stream.write(&[100]) { + Ok(..) => {} + Err(e) => panic!("failed write: {}", e) + } + } + }); + + for _ in range(0, times) { + let mut client = acceptor.accept(); + let mut buf = [0]; + match client.read(&mut buf) { + Ok(..) => {} + Err(e) => panic!("failed read/accept: {}", e), + } + assert_eq!(buf[0], 100); + } + } + + #[cfg(unix)] + #[test] + fn path_exists() { + let path = next_test_unix(); + let _acceptor = UnixListener::bind(&path).listen(); + assert!(path.exists()); + } + + #[test] + fn unix_clone_smoke() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + let _t = Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr); + let mut buf = [0, 0]; + debug!("client reading"); + assert_eq!(s.read(&mut buf), Ok(1)); + assert_eq!(buf[0], 1); + debug!("client writing"); + s.write(&[2]).unwrap(); + debug!("client dropping"); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + rx1.recv().unwrap(); + debug!("writer writing"); + s2.write(&[1]).unwrap(); + debug!("writer done"); + tx2.send(()).unwrap(); + }); + tx1.send(()).unwrap(); + let mut buf = [0, 0]; + debug!("reader reading"); + assert_eq!(s1.read(&mut buf), Ok(1)); + debug!("reader done"); + rx2.recv().unwrap(); + } + + #[test] + fn unix_clone_two_read() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); + + let _t = Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr); + s.write(&[1]).unwrap(); + rx.recv().unwrap(); + s.write(&[2]).unwrap(); + rx.recv().unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (done, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(&mut buf).unwrap(); + tx2.send(()).unwrap(); + done.send(()).unwrap(); + }); + let mut buf = [0, 0]; + s1.read(&mut buf).unwrap(); + tx1.send(()).unwrap(); + + rx.recv().unwrap(); + } + + #[test] + fn unix_clone_two_write() { + let addr = next_test_unix(); + let mut acceptor = UnixListener::bind(&addr).listen(); + + let _t = Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr); + let buf = &mut [0, 1]; + s.read(buf).unwrap(); + s.read(buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + s2.write(&[1]).unwrap(); + tx.send(()).unwrap(); + }); + s1.write(&[2]).unwrap(); + + rx.recv().unwrap(); + } + + #[cfg(not(windows))] + #[test] + fn drop_removes_listener_path() { + let path = next_test_unix(); + let l = UnixListener::bind(&path).unwrap(); + assert!(path.exists()); + drop(l); + assert!(!path.exists()); + } + + #[cfg(not(windows))] + #[test] + fn drop_removes_acceptor_path() { + let path = next_test_unix(); + let l = UnixListener::bind(&path).unwrap(); + assert!(path.exists()); + drop(l.listen().unwrap()); + assert!(!path.exists()); + } + + #[test] + fn accept_timeout() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + + a.set_timeout(Some(10)); + + // Make sure we time out once and future invocations also time out + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + + // Also make sure that even though the timeout is expired that we will + // continue to receive any pending connections. + let (tx, rx) = channel(); + let addr2 = addr.clone(); + let _t = Thread::spawn(move|| { + tx.send(UnixStream::connect(&addr2).unwrap()).unwrap(); + }); + let l = rx.recv().unwrap(); + for i in range(0u, 1001) { + match a.accept() { + Ok(..) => break, + Err(ref e) if e.kind == TimedOut => {} + Err(e) => panic!("error: {}", e), + } + ::thread::Thread::yield_now(); + if i == 1000 { panic!("should have a pending connection") } + } + drop(l); + + // Unset the timeout and make sure that this always blocks. + a.set_timeout(None); + let addr2 = addr.clone(); + let _t = Thread::spawn(move|| { + drop(UnixStream::connect(&addr2).unwrap()); + }); + a.accept().unwrap(); + } + + #[test] + fn connect_timeout_error() { + let addr = next_test_unix(); + assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err()); + } + + #[test] + fn connect_timeout_success() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok()); + } + + #[test] + fn connect_timeout_zero() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err()); + } + + #[test] + fn connect_timeout_negative() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err()); + } + + #[test] + fn close_readwrite_smoke() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv(); + }); + + let mut b = [0]; + let mut s = UnixStream::connect(&addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write(&[0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(&mut b).is_err()); + + // closing should affect previous handles + assert!(s2.write(&[0]).is_err()); + assert!(s2.read(&mut b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write(&[0]).is_err()); + assert!(s3.read(&mut b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + } + + #[test] + fn close_read_wakes_up() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv(); + }); + + let mut s = UnixStream::connect(&addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + assert!(s2.read(&mut [0]).is_err()); + tx.send(()).unwrap(); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv().unwrap(); + } + + #[test] + fn readwrite_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv().unwrap(); + assert!(s.write(&[0]).is_ok()); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0u, 1001) { + match s.write(&[0; 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => panic!("{}", e), + } + if i == 1000 { panic!("should have filled up?!"); } + } + + // I'm not sure as to why, but apparently the write on windows always + // succeeds after the previous timeout. Who knows? + if !cfg!(windows) { + assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut); + } + + tx.send(()).unwrap(); + s.set_timeout(None); + assert_eq!(s.read(&mut [0, 0]), Ok(1)); + } + + #[test] + fn read_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv().unwrap(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read(&mut [0;128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => panic!("{}", e), + } + } + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + + tx.send(()).unwrap(); + for _ in range(0u, 100) { + assert!(s.write(&[0;128 * 1024]).is_ok()); + } + } + + #[test] + fn write_timeouts() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv().unwrap(); + assert!(s.write(&[0]).is_ok()); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0u, 1001) { + match s.write(&[0; 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => panic!("{}", e), + } + if i == 1000 { panic!("should have filled up?!"); } + } + + tx.send(()).unwrap(); + assert!(s.read(&mut [0]).is_ok()); + } + + #[test] + fn timeout_concurrent_read() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = UnixStream::connect(&addr).unwrap(); + rx.recv().unwrap(); + assert!(s.write(&[0]).is_ok()); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + assert!(s2.read(&mut [0]).is_ok()); + tx2.send(()).unwrap(); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + tx.send(()).unwrap(); + + rx2.recv().unwrap(); + } + + #[cfg(not(windows))] + #[test] + fn clone_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let addr2 = addr.clone(); + let _t = Thread::spawn(move|| { + let _ = UnixStream::connect(&addr2); + }); + let _t = Thread::spawn(move|| { + let _ = UnixStream::connect(&addr); + }); + + assert!(a.accept().is_ok()); + drop(a); + assert!(a2.accept().is_ok()); + } + + #[cfg(not(windows))] // FIXME #17553 + #[test] + fn clone_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + let _t = Thread::spawn(move|| { + let mut a = a; + tx.send(a.accept()).unwrap() + }); + let _t = Thread::spawn(move|| { + let mut a = a2; + tx2.send(a.accept()).unwrap() + }); + + let addr2 = addr.clone(); + let _t = Thread::spawn(move|| { + let _ = UnixStream::connect(&addr2); + }); + let _t = Thread::spawn(move|| { + let _ = UnixStream::connect(&addr); + }); + + assert!(rx.recv().unwrap().is_ok()); + assert!(rx.recv().unwrap().is_ok()); + } + + #[test] + fn close_accept_smoke() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + } + + #[test] + fn close_accept_concurrent() { + let addr = next_test_unix(); + let l = UnixListener::bind(&addr); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut a = a; + tx.send(a.accept()).unwrap(); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile); + } +} diff --git a/src/libstd/old_io/net/tcp.rs b/src/libstd/old_io/net/tcp.rs new file mode 100644 index 00000000000..62f3c02e98f --- /dev/null +++ b/src/libstd/old_io/net/tcp.rs @@ -0,0 +1,1475 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! TCP network connections +//! +//! This module contains the ability to open a TCP stream to a socket address, +//! as well as creating a socket server to accept incoming connections. The +//! destination and binding addresses can either be an IPv4 or IPv6 address. +//! +//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP +//! listener (socket server) implements the `Listener` and `Acceptor` traits. + +use clone::Clone; +use old_io::IoResult; +use result::Result::Err; +use old_io::net::ip::{SocketAddr, ToSocketAddr}; +use old_io::{Reader, Writer, Listener, Acceptor}; +use old_io::{standard_error, TimedOut}; +use option::Option; +use option::Option::{None, Some}; +use time::Duration; + +use sys::tcp::TcpStream as TcpStreamImp; +use sys::tcp::TcpListener as TcpListenerImp; +use sys::tcp::TcpAcceptor as TcpAcceptorImp; + +use sys_common; + +/// A structure which represents a TCP stream between a local socket and a +/// remote socket. +/// +/// The socket will be closed when the value is dropped. +/// +/// # Example +/// +/// ```no_run +/// use std::old_io::TcpStream; +/// +/// { +/// let mut stream = TcpStream::connect("127.0.0.1:34254"); +/// +/// // ignore the Result +/// let _ = stream.write(&[1]); +/// +/// let mut buf = [0]; +/// let _ = stream.read(&mut buf); // ignore here too +/// } // the stream is closed here +/// ``` +pub struct TcpStream { + inner: TcpStreamImp, +} + +impl TcpStream { + fn new(s: TcpStreamImp) -> TcpStream { + TcpStream { inner: s } + } + + /// Open a TCP connection to a remote host. + /// + /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr` + /// trait can be supplied for the address; see this trait documentation for + /// concrete examples. + pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> { + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, None).map(TcpStream::new) + }) + } + + /// Creates a TCP connection to a remote socket address, timing out after + /// the specified duration. + /// + /// This is the same as the `connect` method, except that if the timeout + /// specified elapses before a connection is made an error will be + /// returned. The error's kind will be `TimedOut`. + /// + /// Same as the `connect` method, `addr` argument type can be anything which + /// implements `ToSocketAddr` trait. + /// + /// If a `timeout` with zero or negative duration is specified then + /// the function returns `Err`, with the error kind set to `TimedOut`. + #[unstable = "the timeout argument may eventually change types"] + pub fn connect_timeout<A: ToSocketAddr>(addr: A, + timeout: Duration) -> IoResult<TcpStream> { + if timeout <= Duration::milliseconds(0) { + return Err(standard_error(TimedOut)); + } + + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64)) + .map(TcpStream::new) + }) + } + + /// Returns the socket address of the remote peer of this TCP connection. + pub fn peer_name(&mut self) -> IoResult<SocketAddr> { + self.inner.peer_name() + } + + /// Returns the socket address of the local half of this TCP connection. + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { + self.inner.socket_name() + } + + /// Sets the nodelay flag on this connection to the boolean specified + #[unstable] + pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + self.inner.set_nodelay(nodelay) + } + + /// Sets the keepalive timeout to the timeout specified. + /// + /// If the value specified is `None`, then the keepalive flag is cleared on + /// this connection. Otherwise, the keepalive timeout will be set to the + /// specified time, in seconds. + #[unstable] + pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> { + self.inner.set_keepalive(delay_in_seconds) + } + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// # Example + /// + /// ```no_run + /// # #![allow(unused_must_use)] + /// use std::old_io::timer; + /// use std::old_io::TcpStream; + /// use std::time::Duration; + /// use std::thread::Thread; + /// + /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let stream2 = stream.clone(); + /// + /// let _t = Thread::spawn(move|| { + /// // close this stream after one second + /// timer::sleep(Duration::seconds(1)); + /// let mut stream = stream2; + /// stream.close_read(); + /// }); + /// + /// // wait for some data, will get canceled after one second + /// let mut buf = [0]; + /// stream.read(&mut buf); + /// ``` + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { + self.inner.close_read() + } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { + self.inner.close_write() + } + + /// Sets a timeout, in milliseconds, for blocking operations on this stream. + /// + /// This function will set a timeout for all blocking operations (including + /// reads and writes) on this stream. The timeout specified is a relative + /// time, in milliseconds, into the future after which point operations will + /// time out. This means that the timeout must be reset periodically to keep + /// it from expiring. Specifying a value of `None` will clear the timeout + /// for this stream. + /// + /// The timeout on this stream is local to this stream only. Setting a + /// timeout does not affect any other cloned instances of this stream, nor + /// does the timeout propagated to cloned handles of this stream. Setting + /// this timeout will override any specific read or write timeouts + /// previously set for this stream. + /// + /// For clarification on the semantics of interrupting a read and a write, + /// take a look at `set_read_timeout` and `set_write_timeout`. + #[unstable = "the timeout argument may change in type and value"] + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_timeout(timeout_ms) + } + + /// Sets the timeout for read operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this read time. + /// This will overwrite any previous read timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending read operation, no + /// action is taken. Otherwise, the read operation will be scheduled to + /// promptly return. If a timeout error is returned, then no data was read + /// during the timeout period. + #[unstable = "the timeout argument may change in type and value"] + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_read_timeout(timeout_ms) + } + + /// Sets the timeout for write operations on this stream. + /// + /// See documentation in `set_timeout` for the semantics of this write time. + /// This will overwrite any previous write timeout set through either this + /// function or `set_timeout`. + /// + /// # Errors + /// + /// When this timeout expires, if there is no pending write operation, no + /// action is taken. Otherwise, the pending write operation will be + /// scheduled to promptly return. The actual state of the underlying stream + /// is not specified. + /// + /// The write operation may return an error of type `ShortWrite` which + /// indicates that the object is known to have written an exact number of + /// bytes successfully during the timeout period, and the remaining bytes + /// were never written. + /// + /// If the write operation returns `TimedOut`, then it the timeout primitive + /// does not know how many bytes were written as part of the timeout + /// operation. It may be the case that bytes continue to be written in an + /// asynchronous fashion after the call to write returns. + #[unstable = "the timeout argument may change in type and value"] + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_write_timeout(timeout_ms) + } +} + +impl Clone for TcpStream { + /// Creates a new handle to this TCP stream, allowing for simultaneous reads + /// and writes of this connection. + /// + /// The underlying TCP stream will not be closed until all handles to the + /// stream have been deallocated. All handles will also follow the same + /// stream, but two concurrent reads will not receive the same data. + /// Instead, the first read will receive the first packet received, and the + /// second read will receive the second packet. + fn clone(&self) -> TcpStream { + TcpStream { inner: self.inner.clone() } + } +} + +impl Reader for TcpStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + self.inner.read(buf) + } +} + +impl Writer for TcpStream { + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.inner.write(buf) + } +} + +impl sys_common::AsInner<TcpStreamImp> for TcpStream { + fn as_inner(&self) -> &TcpStreamImp { + &self.inner + } +} + +/// A structure representing a socket server. This listener is used to create a +/// `TcpAcceptor` which can be used to accept sockets on a local port. +/// +/// # Examples +/// +/// ``` +/// # fn foo() { +/// use std::old_io::{TcpListener, TcpStream}; +/// use std::old_io::{Acceptor, Listener}; +/// use std::thread::Thread; +/// +/// let listener = TcpListener::bind("127.0.0.1:80").unwrap(); +/// +/// // bind the listener to the specified address +/// let mut acceptor = listener.listen().unwrap(); +/// +/// fn handle_client(mut stream: TcpStream) { +/// // ... +/// # &mut stream; // silence unused mutability/variable warning +/// } +/// // accept connections and process them, spawning a new tasks for each one +/// for stream in acceptor.incoming() { +/// match stream { +/// Err(e) => { /* connection failed */ } +/// Ok(stream) => { +/// Thread::spawn(move|| { +/// // connection succeeded +/// handle_client(stream) +/// }); +/// } +/// } +/// } +/// +/// // close the socket server +/// drop(acceptor); +/// # } +/// ``` +pub struct TcpListener { + inner: TcpListenerImp, +} + +impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified address. + /// This listener is not ready for accepting connections, `listen` must be called + /// on it before that's possible. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the + /// `socket_name` function. + /// + /// The address type can be any implementer of `ToSocketAddr` trait. See its + /// documentation for concrete examples. + pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> { + super::with_addresses(addr, |addr| { + TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner }) + }) + } + + /// Returns the local socket address of this listener. + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { + self.inner.socket_name() + } +} + +impl Listener<TcpStream, TcpAcceptor> for TcpListener { + fn listen(self) -> IoResult<TcpAcceptor> { + self.inner.listen(128).map(|a| TcpAcceptor { inner: a }) + } +} + +impl sys_common::AsInner<TcpListenerImp> for TcpListener { + fn as_inner(&self) -> &TcpListenerImp { + &self.inner + } +} + +/// The accepting half of a TCP socket server. This structure is created through +/// a `TcpListener`'s `listen` method, and this object can be used to accept new +/// `TcpStream` instances. +pub struct TcpAcceptor { + inner: TcpAcceptorImp, +} + +impl TcpAcceptor { + /// Prevents blocking on all future accepts after `ms` milliseconds have + /// elapsed. + /// + /// This function is used to set a deadline after which this acceptor will + /// time out accepting any connections. The argument is the relative + /// distance, in milliseconds, to a point in the future after which all + /// accepts will fail. + /// + /// If the argument specified is `None`, then any previously registered + /// timeout is cleared. + /// + /// A timeout of `0` can be used to "poll" this acceptor to see if it has + /// any pending connections. All pending connections will be accepted, + /// regardless of whether the timeout has expired or not (the accept will + /// not block in this case). + /// + /// # Example + /// + /// ```no_run + /// # #![allow(unstable)] + /// use std::old_io::TcpListener; + /// use std::old_io::{Listener, Acceptor, TimedOut}; + /// + /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap(); + /// + /// // After 100ms have passed, all accepts will fail + /// a.set_timeout(Some(100)); + /// + /// match a.accept() { + /// Ok(..) => println!("accepted a socket"), + /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); } + /// Err(e) => println!("err: {}", e), + /// } + /// + /// // Reset the timeout and try again + /// a.set_timeout(Some(100)); + /// let socket = a.accept(); + /// + /// // Clear the timeout and block indefinitely waiting for a connection + /// a.set_timeout(None); + /// let socket = a.accept(); + /// ``` + #[unstable = "the type of the argument and name of this function are \ + subject to change"] + pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); } + + /// Closes the accepting capabilities of this acceptor. + /// + /// This function is similar to `TcpStream`'s `close_{read,write}` methods + /// in that it will affect *all* cloned handles of this acceptor's original + /// handle. + /// + /// Once this function succeeds, all future calls to `accept` will return + /// immediately with an error, preventing all future calls to accept. The + /// underlying socket will not be relinquished back to the OS until all + /// acceptors have been deallocated. + /// + /// This is useful for waking up a thread in an accept loop to indicate that + /// it should exit. + /// + /// # Example + /// + /// ``` + /// # #![allow(unstable)] + /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile}; + /// use std::thread::Thread; + /// + /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap(); + /// let a2 = a.clone(); + /// + /// let _t = Thread::spawn(move|| { + /// let mut a2 = a2; + /// for socket in a2.incoming() { + /// match socket { + /// Ok(s) => { /* handle s */ } + /// Err(ref e) if e.kind == EndOfFile => break, // closed + /// Err(e) => panic!("unexpected error: {}", e), + /// } + /// } + /// }); + /// + /// # fn wait_for_sigint() {} + /// // Now that our accept loop is running, wait for the program to be + /// // requested to exit. + /// wait_for_sigint(); + /// + /// // Signal our accept loop to exit + /// assert!(a.close_accept().is_ok()); + /// ``` + #[unstable] + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.close_accept() + } +} + +impl Acceptor<TcpStream> for TcpAcceptor { + fn accept(&mut self) -> IoResult<TcpStream> { + self.inner.accept().map(TcpStream::new) + } +} + +impl Clone for TcpAcceptor { + /// Creates a new handle to this TCP acceptor, allowing for simultaneous + /// accepts. + /// + /// The underlying TCP acceptor will not be closed until all handles to the + /// acceptor have been deallocated. Incoming connections will be received on + /// at most once acceptor, the same connection will not be accepted twice. + /// + /// The `close_accept` method will shut down *all* acceptors cloned from the + /// same original acceptor, whereas the `set_timeout` method only affects + /// the selector that it is called on. + /// + /// This function is useful for creating a handle to invoke `close_accept` + /// on to wake up any other task blocked in `accept`. + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { inner: self.inner.clone() } + } +} + +impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor { + fn as_inner(&self) -> &TcpAcceptorImp { + &self.inner + } +} + +#[cfg(test)] +#[allow(unstable)] +mod test { + use prelude::v1::*; + + use sync::mpsc::channel; + use thread::Thread; + use old_io::net::tcp::*; + use old_io::net::ip::*; + use old_io::test::*; + use old_io::{EndOfFile, TimedOut, ShortWrite, IoError}; + use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted}; + use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError}; + use old_io::{Acceptor, Listener}; + + // FIXME #11530 this fails on android because tests are run as root + #[cfg_attr(any(windows, target_os = "android"), ignore)] + #[test] + fn bind_error() { + match TcpListener::bind("0.0.0.0:1") { + Ok(..) => panic!(), + Err(e) => assert_eq!(e.kind, PermissionDenied), + } + } + + #[test] + fn connect_error() { + match TcpStream::connect("0.0.0.0:1") { + Ok(..) => panic!(), + Err(e) => assert_eq!(e.kind, ConnectionRefused), + } + } + + #[test] + fn listen_ip4_localhost() { + let socket_addr = next_test_ip4(); + let listener = TcpListener::bind(socket_addr); + let mut acceptor = listener.listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(("localhost", socket_addr.port)); + stream.write(&[144]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 144); + } + + #[test] + fn connect_localhost() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(("localhost", addr.port)); + stream.write(&[64]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 64); + } + + #[test] + fn connect_ip4_loopback() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(("127.0.0.1", addr.port)); + stream.write(&[44]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 44); + } + + #[test] + fn connect_ip6_loopback() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(("::1", addr.port)); + stream.write(&[66]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 66); + } + + #[test] + fn smoke_test_ip4() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(addr); + stream.write(&[99]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 99); + } + + #[test] + fn smoke_test_ip6() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut stream = TcpStream::connect(addr); + stream.write(&[99]).unwrap(); + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 99); + } + + #[test] + fn read_eof_ip4() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let _stream = TcpStream::connect(addr); + // Close + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + let nread = stream.read(&mut buf); + assert!(nread.is_err()); + } + + #[test] + fn read_eof_ip6() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let _stream = TcpStream::connect(addr); + // Close + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + let nread = stream.read(&mut buf); + assert!(nread.is_err()); + } + + #[test] + fn read_eof_twice_ip4() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let _stream = TcpStream::connect(addr); + // Close + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + let nread = stream.read(&mut buf); + assert!(nread.is_err()); + + match stream.read(&mut buf) { + Ok(..) => panic!(), + Err(ref e) => { + assert!(e.kind == NotConnected || e.kind == EndOfFile, + "unknown kind: {:?}", e.kind); + } + } + } + + #[test] + fn read_eof_twice_ip6() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let _stream = TcpStream::connect(addr); + // Close + }); + + let mut stream = acceptor.accept(); + let mut buf = [0]; + let nread = stream.read(&mut buf); + assert!(nread.is_err()); + + match stream.read(&mut buf) { + Ok(..) => panic!(), + Err(ref e) => { + assert!(e.kind == NotConnected || e.kind == EndOfFile, + "unknown kind: {:?}", e.kind); + } + } + } + + #[test] + fn write_close_ip4() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + drop(TcpStream::connect(addr)); + tx.send(()).unwrap(); + }); + + let mut stream = acceptor.accept(); + rx.recv().unwrap(); + let buf = [0]; + match stream.write(&buf) { + Ok(..) => {} + Err(e) => { + assert!(e.kind == ConnectionReset || + e.kind == BrokenPipe || + e.kind == ConnectionAborted, + "unknown error: {}", e); + } + } + } + + #[test] + fn write_close_ip6() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + drop(TcpStream::connect(addr)); + tx.send(()).unwrap(); + }); + + let mut stream = acceptor.accept(); + rx.recv().unwrap(); + let buf = [0]; + match stream.write(&buf) { + Ok(..) => {} + Err(e) => { + assert!(e.kind == ConnectionReset || + e.kind == BrokenPipe || + e.kind == ConnectionAborted, + "unknown error: {}", e); + } + } + } + + #[test] + fn multiple_connect_serial_ip4() { + let addr = next_test_ip4(); + let max = 10u; + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + for _ in range(0, max) { + let mut stream = TcpStream::connect(addr); + stream.write(&[99]).unwrap(); + } + }); + + for ref mut stream in acceptor.incoming().take(max) { + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert_eq!(buf[0], 99); + } + } + + #[test] + fn multiple_connect_serial_ip6() { + let addr = next_test_ip6(); + let max = 10u; + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + for _ in range(0, max) { + let mut stream = TcpStream::connect(addr); + stream.write(&[99]).unwrap(); + } + }); + + for ref mut stream in acceptor.incoming().take(max) { + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert_eq!(buf[0], 99); + } + } + + #[test] + fn multiple_connect_interleaved_greedy_schedule_ip4() { + let addr = next_test_ip4(); + static MAX: int = 10; + let acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut acceptor = acceptor; + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + // Start another task to handle the connection + let _t = Thread::spawn(move|| { + let mut stream = stream; + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == i as u8); + debug!("read"); + }); + } + }); + + connect(0, addr); + + fn connect(i: int, addr: SocketAddr) { + if i == MAX { return } + + let _t = Thread::spawn(move|| { + debug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + debug!("writing"); + stream.write(&[i as u8]).unwrap(); + }); + } + } + + #[test] + fn multiple_connect_interleaved_greedy_schedule_ip6() { + let addr = next_test_ip6(); + static MAX: int = 10; + let acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut acceptor = acceptor; + for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { + // Start another task to handle the connection + let _t = Thread::spawn(move|| { + let mut stream = stream; + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == i as u8); + debug!("read"); + }); + } + }); + + connect(0, addr); + + fn connect(i: int, addr: SocketAddr) { + if i == MAX { return } + + let _t = Thread::spawn(move|| { + debug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + debug!("writing"); + stream.write(&[i as u8]).unwrap(); + }); + } + } + + #[test] + fn multiple_connect_interleaved_lazy_schedule_ip4() { + static MAX: int = 10; + let addr = next_test_ip4(); + let acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut acceptor = acceptor; + for stream in acceptor.incoming().take(MAX as uint) { + // Start another task to handle the connection + let _t = Thread::spawn(move|| { + let mut stream = stream; + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 99); + debug!("read"); + }); + } + }); + + connect(0, addr); + + fn connect(i: int, addr: SocketAddr) { + if i == MAX { return } + + let _t = Thread::spawn(move|| { + debug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + debug!("writing"); + stream.write(&[99]).unwrap(); + }); + } + } + + #[test] + fn multiple_connect_interleaved_lazy_schedule_ip6() { + static MAX: int = 10; + let addr = next_test_ip6(); + let acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut acceptor = acceptor; + for stream in acceptor.incoming().take(MAX as uint) { + // Start another task to handle the connection + let _t = Thread::spawn(move|| { + let mut stream = stream; + let mut buf = [0]; + stream.read(&mut buf).unwrap(); + assert!(buf[0] == 99); + debug!("read"); + }); + } + }); + + connect(0, addr); + + fn connect(i: int, addr: SocketAddr) { + if i == MAX { return } + + let _t = Thread::spawn(move|| { + debug!("connecting"); + let mut stream = TcpStream::connect(addr); + // Connect again before writing + connect(i + 1, addr); + debug!("writing"); + stream.write(&[99]).unwrap(); + }); + } + } + + pub fn socket_name(addr: SocketAddr) { + let mut listener = TcpListener::bind(addr).unwrap(); + + // Make sure socket_name gives + // us the socket we binded to. + let so_name = listener.socket_name(); + assert!(so_name.is_ok()); + assert_eq!(addr, so_name.unwrap()); + } + + pub fn peer_name(addr: SocketAddr) { + let acceptor = TcpListener::bind(addr).listen(); + let _t = Thread::spawn(move|| { + let mut acceptor = acceptor; + acceptor.accept().unwrap(); + }); + + let stream = TcpStream::connect(addr); + + assert!(stream.is_ok()); + let mut stream = stream.unwrap(); + + // Make sure peer_name gives us the + // address/port of the peer we've + // connected to. + let peer_name = stream.peer_name(); + assert!(peer_name.is_ok()); + assert_eq!(addr, peer_name.unwrap()); + } + + #[test] + fn socket_and_peer_name_ip4() { + peer_name(next_test_ip4()); + socket_name(next_test_ip4()); + } + + #[test] + fn socket_and_peer_name_ip6() { + // FIXME: peer name is not consistent + //peer_name(next_test_ip6()); + socket_name(next_test_ip6()); + } + + #[test] + fn partial_read() { + let addr = next_test_ip4(); + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut srv = TcpListener::bind(addr).listen().unwrap(); + tx.send(()).unwrap(); + let mut cl = srv.accept().unwrap(); + cl.write(&[10]).unwrap(); + let mut b = [0]; + cl.read(&mut b).unwrap(); + tx.send(()).unwrap(); + }); + + rx.recv().unwrap(); + let mut c = TcpStream::connect(addr).unwrap(); + let mut b = [0; 10]; + assert_eq!(c.read(&mut b), Ok(1)); + c.write(&[1]).unwrap(); + rx.recv().unwrap(); + } + + #[test] + fn double_bind() { + let addr = next_test_ip4(); + let listener = TcpListener::bind(addr).unwrap().listen(); + assert!(listener.is_ok()); + match TcpListener::bind(addr).listen() { + Ok(..) => panic!(), + Err(e) => { + assert!(e.kind == ConnectionRefused || e.kind == OtherIoError, + "unknown error: {} {:?}", e, e.kind); + } + } + } + + #[test] + fn fast_rebind() { + let addr = next_test_ip4(); + let (tx, rx) = channel(); + + let _t = Thread::spawn(move|| { + rx.recv().unwrap(); + let _stream = TcpStream::connect(addr).unwrap(); + // Close + rx.recv().unwrap(); + }); + + { + let mut acceptor = TcpListener::bind(addr).listen(); + tx.send(()).unwrap(); + { + let _stream = acceptor.accept().unwrap(); + // Close client + tx.send(()).unwrap(); + } + // Close listener + } + let _listener = TcpListener::bind(addr); + } + + #[test] + fn tcp_clone_smoke() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 0]; + assert_eq!(s.read(&mut buf), Ok(1)); + assert_eq!(buf[0], 1); + s.write(&[2]).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + rx1.recv().unwrap(); + s2.write(&[1]).unwrap(); + tx2.send(()).unwrap(); + }); + tx1.send(()).unwrap(); + let mut buf = [0, 0]; + assert_eq!(s1.read(&mut buf), Ok(1)); + rx2.recv().unwrap(); + } + + #[test] + fn tcp_clone_two_read() { + let addr = next_test_ip6(); + let mut acceptor = TcpListener::bind(addr).listen(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); + + let _t = Thread::spawn(move|| { + let mut s = TcpStream::connect(addr); + s.write(&[1]).unwrap(); + rx.recv().unwrap(); + s.write(&[2]).unwrap(); + rx.recv().unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (done, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + let mut buf = [0, 0]; + s2.read(&mut buf).unwrap(); + tx2.send(()).unwrap(); + done.send(()).unwrap(); + }); + let mut buf = [0, 0]; + s1.read(&mut buf).unwrap(); + tx1.send(()).unwrap(); + + rx.recv().unwrap(); + } + + #[test] + fn tcp_clone_two_write() { + let addr = next_test_ip4(); + let mut acceptor = TcpListener::bind(addr).listen(); + + let _t = Thread::spawn(move|| { + let mut s = TcpStream::connect(addr); + let mut buf = [0, 1]; + s.read(&mut buf).unwrap(); + s.read(&mut buf).unwrap(); + }); + + let mut s1 = acceptor.accept().unwrap(); + let s2 = s1.clone(); + + let (done, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + s2.write(&[1]).unwrap(); + done.send(()).unwrap(); + }); + s1.write(&[2]).unwrap(); + + rx.recv().unwrap(); + } + + #[test] + fn shutdown_smoke() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).unwrap().listen(); + let _t = Thread::spawn(move|| { + let mut a = a; + let mut c = a.accept().unwrap(); + assert_eq!(c.read_to_end(), Ok(vec!())); + c.write(&[1]).unwrap(); + }); + + let mut s = TcpStream::connect(addr).unwrap(); + assert!(s.inner.close_write().is_ok()); + assert!(s.write(&[1]).is_err()); + assert_eq!(s.read_to_end(), Ok(vec!(1))); + } + + #[test] + fn accept_timeout() { + let addr = next_test_ip4(); + let mut a = TcpListener::bind(addr).unwrap().listen().unwrap(); + + a.set_timeout(Some(10)); + + // Make sure we time out once and future invocations also time out + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + + // Also make sure that even though the timeout is expired that we will + // continue to receive any pending connections. + // + // FIXME: freebsd apparently never sees the pending connection, but + // testing manually always works. Need to investigate this + // flakiness. + if !cfg!(target_os = "freebsd") { + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + tx.send(TcpStream::connect(addr).unwrap()).unwrap(); + }); + let _l = rx.recv().unwrap(); + for i in range(0i, 1001) { + match a.accept() { + Ok(..) => break, + Err(ref e) if e.kind == TimedOut => {} + Err(e) => panic!("error: {}", e), + } + ::thread::Thread::yield_now(); + if i == 1000 { panic!("should have a pending connection") } + } + } + + // Unset the timeout and make sure that this always blocks. + a.set_timeout(None); + let _t = Thread::spawn(move|| { + drop(TcpStream::connect(addr).unwrap()); + }); + a.accept().unwrap(); + } + + #[test] + fn close_readwrite_smoke() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv().unwrap(); + }); + + let mut b = [0]; + let mut s = TcpStream::connect(addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write(&[0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(&mut b).is_err()); + + // closing should affect previous handles + assert!(s2.write(&[0]).is_err()); + assert!(s2.read(&mut b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write(&[0]).is_err()); + assert!(s3.read(&mut b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + } + + #[test] + fn close_read_wakes_up() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv().unwrap(); + }); + + let mut s = TcpStream::connect(addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + assert!(s2.read(&mut [0]).is_err()); + tx.send(()).unwrap(); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv().unwrap(); + } + + #[test] + fn readwrite_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv().unwrap(); + assert!(s.write(&[0]).is_ok()); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + + s.set_timeout(Some(20)); + for i in range(0i, 1001) { + match s.write(&[0; 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => panic!("{}", e), + } + if i == 1000 { panic!("should have filled up?!"); } + } + assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut); + + tx.send(()).unwrap(); + s.set_timeout(None); + assert_eq!(s.read(&mut [0, 0]), Ok(1)); + } + + #[test] + fn read_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv().unwrap(); + let mut amt = 0; + while amt < 100 * 128 * 1024 { + match s.read(&mut [0;128 * 1024]) { + Ok(n) => { amt += n; } + Err(e) => panic!("{}", e), + } + } + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_read_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + + tx.send(()).unwrap(); + for _ in range(0i, 100) { + assert!(s.write(&[0;128 * 1024]).is_ok()); + } + } + + #[test] + fn write_timeouts() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv().unwrap(); + assert!(s.write(&[0]).is_ok()); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + s.set_write_timeout(Some(20)); + for i in range(0i, 1001) { + match s.write(&[0; 128 * 1024]) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => panic!("{}", e), + } + if i == 1000 { panic!("should have filled up?!"); } + } + assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut); + + tx.send(()).unwrap(); + assert!(s.read(&mut [0]).is_ok()); + } + + #[test] + fn timeout_concurrent_read() { + let addr = next_test_ip6(); + let mut a = TcpListener::bind(addr).listen().unwrap(); + let (tx, rx) = channel::<()>(); + Thread::spawn(move|| { + let mut s = TcpStream::connect(addr).unwrap(); + rx.recv().unwrap(); + assert_eq!(s.write(&[0]), Ok(())); + let _ = rx.recv(); + }); + + let mut s = a.accept().unwrap(); + let s2 = s.clone(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut s2 = s2; + assert_eq!(s2.read(&mut [0]), Ok(1)); + tx2.send(()).unwrap(); + }); + + s.set_read_timeout(Some(20)); + assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut); + tx.send(()).unwrap(); + + rx2.recv().unwrap(); + } + + #[test] + fn clone_while_reading() { + let addr = next_test_ip6(); + let listen = TcpListener::bind(addr); + let mut accept = listen.listen().unwrap(); + + // Enqueue a task to write to a socket + let (tx, rx) = channel(); + let (txdone, rxdone) = channel(); + let txdone2 = txdone.clone(); + let _t = Thread::spawn(move|| { + let mut tcp = TcpStream::connect(addr).unwrap(); + rx.recv().unwrap(); + tcp.write_u8(0).unwrap(); + txdone2.send(()).unwrap(); + }); + + // Spawn off a reading clone + let tcp = accept.accept().unwrap(); + let tcp2 = tcp.clone(); + let txdone3 = txdone.clone(); + let _t = Thread::spawn(move|| { + let mut tcp2 = tcp2; + tcp2.read_u8().unwrap(); + txdone3.send(()).unwrap(); + }); + + // Try to ensure that the reading clone is indeed reading + for _ in range(0i, 50) { + ::thread::Thread::yield_now(); + } + + // clone the handle again while it's reading, then let it finish the + // read. + let _ = tcp.clone(); + tx.send(()).unwrap(); + rxdone.recv().unwrap(); + rxdone.recv().unwrap(); + } + + #[test] + fn clone_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr); + let mut a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let _t = Thread::spawn(move|| { + let _ = TcpStream::connect(addr); + }); + let _t = Thread::spawn(move|| { + let _ = TcpStream::connect(addr); + }); + + assert!(a.accept().is_ok()); + assert!(a2.accept().is_ok()); + } + + #[test] + fn clone_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr); + let a = l.listen().unwrap(); + let a2 = a.clone(); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + let _t = Thread::spawn(move|| { + let mut a = a; + tx.send(a.accept()).unwrap(); + }); + let _t = Thread::spawn(move|| { + let mut a = a2; + tx2.send(a.accept()).unwrap(); + }); + + let _t = Thread::spawn(move|| { + let _ = TcpStream::connect(addr); + }); + let _t = Thread::spawn(move|| { + let _ = TcpStream::connect(addr); + }); + + assert!(rx.recv().unwrap().is_ok()); + assert!(rx.recv().unwrap().is_ok()); + } + + #[test] + fn close_accept_smoke() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr); + let mut a = l.listen().unwrap(); + + a.close_accept().unwrap(); + assert_eq!(a.accept().err().unwrap().kind, EndOfFile); + } + + #[test] + fn close_accept_concurrent() { + let addr = next_test_ip4(); + let l = TcpListener::bind(addr); + let a = l.listen().unwrap(); + let mut a2 = a.clone(); + + let (tx, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut a = a; + tx.send(a.accept()).unwrap(); + }); + a2.close_accept().unwrap(); + + assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile); + } +} diff --git a/src/libstd/old_io/net/udp.rs b/src/libstd/old_io/net/udp.rs new file mode 100644 index 00000000000..d7fc760951e --- /dev/null +++ b/src/libstd/old_io/net/udp.rs @@ -0,0 +1,457 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! UDP (User Datagram Protocol) network connections. +//! +//! This module contains the ability to open a UDP stream to a socket address. +//! The destination and binding addresses can either be an IPv4 or IPv6 +//! address. There is no corresponding notion of a server because UDP is a +//! datagram protocol. + +use clone::Clone; +use old_io::net::ip::{SocketAddr, IpAddr, ToSocketAddr}; +use old_io::IoResult; +use option::Option; +use sys::udp::UdpSocket as UdpSocketImp; +use sys_common; + +/// A User Datagram Protocol socket. +/// +/// This is an implementation of a bound UDP socket. This supports both IPv4 and +/// IPv6 addresses, and there is no corresponding notion of a server because UDP +/// is a datagram protocol. +/// +/// # Example +/// +/// ```rust,no_run +/// # #![allow(unused_must_use)] +/// #![feature(slicing_syntax)] +/// +/// use std::old_io::net::udp::UdpSocket; +/// use std::old_io::net::ip::{Ipv4Addr, SocketAddr}; +/// fn main() { +/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; +/// let mut socket = match UdpSocket::bind(addr) { +/// Ok(s) => s, +/// Err(e) => panic!("couldn't bind socket: {}", e), +/// }; +/// +/// let mut buf = [0; 10]; +/// match socket.recv_from(&mut buf) { +/// Ok((amt, src)) => { +/// // Send a reply to the socket we received data from +/// let buf = buf.slice_to_mut(amt); +/// buf.reverse(); +/// socket.send_to(buf, src); +/// } +/// Err(e) => println!("couldn't receive a datagram: {}", e) +/// } +/// drop(socket); // close the socket +/// } +/// ``` +pub struct UdpSocket { + inner: UdpSocketImp, +} + +impl UdpSocket { + /// Creates a UDP socket from the given address. + /// + /// Address type can be any implementor of `ToSocketAddr` trait. See its + /// documentation for concrete examples. + pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> { + super::with_addresses(addr, |addr| { + UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s }) + }) + } + + /// Receives data from the socket. On success, returns the number of bytes + /// read and the address from whence the data came. + pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { + self.inner.recv_from(buf) + } + + /// Sends data on the socket to the given address. Returns nothing on + /// success. + /// + /// Address type can be any implementer of `ToSocketAddr` trait. See its + /// documentation for concrete examples. + pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> { + super::with_addresses(addr, |addr| self.inner.send_to(buf, addr)) + } + + /// Returns the socket address that this socket was created from. + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { + self.inner.socket_name() + } + + /// Joins a multicast IP address (becomes a member of it) + #[unstable] + pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { + self.inner.join_multicast(multi) + } + + /// Leaves a multicast IP address (drops membership from it) + #[unstable] + pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { + self.inner.leave_multicast(multi) + } + + /// Set the multicast loop flag to the specified value + /// + /// This lets multicast packets loop back to local sockets (if enabled) + #[unstable] + pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { + self.inner.set_multicast_loop(on) + } + + /// Sets the multicast TTL + #[unstable] + pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> { + self.inner.multicast_time_to_live(ttl) + } + + /// Sets this socket's TTL + #[unstable] + pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> { + self.inner.time_to_live(ttl) + } + + /// Sets the broadcast flag on or off + #[unstable] + pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> { + self.inner.set_broadcast(broadcast) + } + + /// Sets the read/write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_timeout(timeout_ms) + } + + /// Sets the read timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_read_timeout(timeout_ms) + } + + /// Sets the write timeout for this socket. + /// + /// For more information, see `TcpStream::set_timeout` + #[unstable = "the timeout argument may change in type and value"] + pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { + self.inner.set_write_timeout(timeout_ms) + } +} + +impl Clone for UdpSocket { + /// Creates a new handle to this UDP socket, allowing for simultaneous + /// reads and writes of the socket. + /// + /// The underlying UDP socket will not be closed until all handles to the + /// socket have been deallocated. Two concurrent reads will not receive + /// the same data. Instead, the first read will receive the first packet + /// received, and the second read will receive the second packet. + fn clone(&self) -> UdpSocket { + UdpSocket { + inner: self.inner.clone(), + } + } +} + +impl sys_common::AsInner<UdpSocketImp> for UdpSocket { + fn as_inner(&self) -> &UdpSocketImp { + &self.inner + } +} + +#[cfg(test)] +#[allow(unstable)] +mod test { + use prelude::v1::*; + + use sync::mpsc::channel; + use old_io::net::ip::*; + use old_io::test::*; + use old_io::{IoError, TimedOut, PermissionDenied, ShortWrite}; + use super::*; + use thread::Thread; + + // FIXME #11530 this fails on android because tests are run as root + #[cfg_attr(any(windows, target_os = "android"), ignore)] + #[test] + fn bind_error() { + let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; + match UdpSocket::bind(addr) { + Ok(..) => panic!(), + Err(e) => assert_eq!(e.kind, PermissionDenied), + } + } + + #[test] + fn socket_smoke_test_ip4() { + let server_ip = next_test_ip4(); + let client_ip = next_test_ip4(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + + let _t = Thread::spawn(move|| { + match UdpSocket::bind(client_ip) { + Ok(ref mut client) => { + rx1.recv().unwrap(); + client.send_to(&[99], server_ip).unwrap() + } + Err(..) => panic!() + } + tx2.send(()).unwrap(); + }); + + match UdpSocket::bind(server_ip) { + Ok(ref mut server) => { + tx1.send(()).unwrap(); + let mut buf = [0]; + match server.recv_from(&mut buf) { + Ok((nread, src)) => { + assert_eq!(nread, 1); + assert_eq!(buf[0], 99); + assert_eq!(src, client_ip); + } + Err(..) => panic!() + } + } + Err(..) => panic!() + } + rx2.recv().unwrap(); + } + + #[test] + fn socket_smoke_test_ip6() { + let server_ip = next_test_ip6(); + let client_ip = next_test_ip6(); + let (tx, rx) = channel::<()>(); + + let _t = Thread::spawn(move|| { + match UdpSocket::bind(client_ip) { + Ok(ref mut client) => { + rx.recv().unwrap(); + client.send_to(&[99], server_ip).unwrap() + } + Err(..) => panic!() + } + }); + + match UdpSocket::bind(server_ip) { + Ok(ref mut server) => { + tx.send(()).unwrap(); + let mut buf = [0]; + match server.recv_from(&mut buf) { + Ok((nread, src)) => { + assert_eq!(nread, 1); + assert_eq!(buf[0], 99); + assert_eq!(src, client_ip); + } + Err(..) => panic!() + } + } + Err(..) => panic!() + } + } + + pub fn socket_name(addr: SocketAddr) { + let server = UdpSocket::bind(addr); + + assert!(server.is_ok()); + let mut server = server.unwrap(); + + // Make sure socket_name gives + // us the socket we binded to. + let so_name = server.socket_name(); + assert!(so_name.is_ok()); + assert_eq!(addr, so_name.unwrap()); + } + + #[test] + fn socket_name_ip4() { + socket_name(next_test_ip4()); + } + + #[test] + fn socket_name_ip6() { + socket_name(next_test_ip6()); + } + + #[test] + fn udp_clone_smoke() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let _t = Thread::spawn(move|| { + let mut sock2 = sock2; + let mut buf = [0, 0]; + assert_eq!(sock2.recv_from(&mut buf), Ok((1, addr1))); + assert_eq!(buf[0], 1); + sock2.send_to(&[2], addr1).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut sock3 = sock3; + rx1.recv().unwrap(); + sock3.send_to(&[1], addr2).unwrap(); + tx2.send(()).unwrap(); + }); + tx1.send(()).unwrap(); + let mut buf = [0, 0]; + assert_eq!(sock1.recv_from(&mut buf), Ok((1, addr2))); + rx2.recv().unwrap(); + } + + #[test] + fn udp_clone_two_read() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); + + let _t = Thread::spawn(move|| { + let mut sock2 = sock2; + sock2.send_to(&[1], addr1).unwrap(); + rx.recv().unwrap(); + sock2.send_to(&[2], addr1).unwrap(); + rx.recv().unwrap(); + }); + + let sock3 = sock1.clone(); + + let (done, rx) = channel(); + let _t = Thread::spawn(move|| { + let mut sock3 = sock3; + let mut buf = [0, 0]; + sock3.recv_from(&mut buf).unwrap(); + tx2.send(()).unwrap(); + done.send(()).unwrap(); + }); + let mut buf = [0, 0]; + sock1.recv_from(&mut buf).unwrap(); + tx1.send(()).unwrap(); + + rx.recv().unwrap(); + } + + #[test] + fn udp_clone_two_write() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut sock1 = UdpSocket::bind(addr1).unwrap(); + let sock2 = UdpSocket::bind(addr2).unwrap(); + + let (tx, rx) = channel(); + let (serv_tx, serv_rx) = channel(); + + let _t = Thread::spawn(move|| { + let mut sock2 = sock2; + let mut buf = [0, 1]; + + rx.recv().unwrap(); + match sock2.recv_from(&mut buf) { + Ok(..) => {} + Err(e) => panic!("failed receive: {}", e), + } + serv_tx.send(()).unwrap(); + }); + + let sock3 = sock1.clone(); + + let (done, rx) = channel(); + let tx2 = tx.clone(); + let _t = Thread::spawn(move|| { + let mut sock3 = sock3; + match sock3.send_to(&[1], addr2) { + Ok(..) => { let _ = tx2.send(()); } + Err(..) => {} + } + done.send(()).unwrap(); + }); + match sock1.send_to(&[2], addr2) { + Ok(..) => { let _ = tx.send(()); } + Err(..) => {} + } + drop(tx); + + rx.recv().unwrap(); + serv_rx.recv().unwrap(); + } + + #[cfg(not(windows))] // FIXME #17553 + #[test] + fn recv_from_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + let a2 = UdpSocket::bind(addr2).unwrap(); + + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = Thread::spawn(move|| { + let mut a = a2; + assert_eq!(a.recv_from(&mut [0]), Ok((1, addr1))); + assert_eq!(a.send_to(&[0], addr1), Ok(())); + rx.recv().unwrap(); + assert_eq!(a.send_to(&[0], addr1), Ok(())); + + tx2.send(()).unwrap(); + }); + + // Make sure that reads time out, but writes can continue + a.set_read_timeout(Some(20)); + assert_eq!(a.recv_from(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(a.recv_from(&mut [0]).err().unwrap().kind, TimedOut); + assert_eq!(a.send_to(&[0], addr2), Ok(())); + + // Cloned handles should be able to block + let mut a2 = a.clone(); + assert_eq!(a2.recv_from(&mut [0]), Ok((1, addr2))); + + // Clearing the timeout should allow for receiving + a.set_timeout(None); + tx.send(()).unwrap(); + assert_eq!(a2.recv_from(&mut [0]), Ok((1, addr2))); + + // Make sure the child didn't die + rx2.recv().unwrap(); + } + + #[test] + fn send_to_timeout() { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let mut a = UdpSocket::bind(addr1).unwrap(); + let _b = UdpSocket::bind(addr2).unwrap(); + + a.set_write_timeout(Some(1000)); + for _ in range(0u, 100) { + match a.send_to(&[0;4*1024], addr2) { + Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {}, + Err(IoError { kind: TimedOut, .. }) => break, + Err(e) => panic!("other error: {}", e), + } + } + } +} |
