diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-10-10 10:11:49 -0700 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-08 20:40:38 -0800 |
| commit | d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41 (patch) | |
| tree | bdb9af03a1b73d4edc9ae5e6193a010c9b2b4edc /src/libstd/sys | |
| parent | 0c1e1ff1e300868a29405a334e65eae690df971d (diff) | |
| download | rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.tar.gz rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.zip | |
Runtime removal: refactor pipes and networking
This patch continues the runtime removal by moving pipe and networking-related code into `sys`. Because this eliminates APIs in `libnative` and `librustrt`, it is a: [breaking-change] This functionality is likely to be available publicly, in some form, from `std` in the future.
Diffstat (limited to 'src/libstd/sys')
| -rw-r--r-- | src/libstd/sys/common/net.rs | 909 | ||||
| -rw-r--r-- | src/libstd/sys/unix/mod.rs | 50 | ||||
| -rw-r--r-- | src/libstd/sys/unix/os.rs | 11 | ||||
| -rw-r--r-- | src/libstd/sys/unix/pipe.rs | 321 | ||||
| -rw-r--r-- | src/libstd/sys/unix/tcp.rs | 157 | ||||
| -rw-r--r-- | src/libstd/sys/unix/udp.rs | 11 | ||||
| -rw-r--r-- | src/libstd/sys/windows/mod.rs | 12 | ||||
| -rw-r--r-- | src/libstd/sys/windows/pipe.rs | 751 | ||||
| -rw-r--r-- | src/libstd/sys/windows/tcp.rs | 219 | ||||
| -rw-r--r-- | src/libstd/sys/windows/udp.rs | 11 |
10 files changed, 2443 insertions, 9 deletions
diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs new file mode 100644 index 00000000000..0559005100f --- /dev/null +++ b/src/libstd/sys/common/net.rs @@ -0,0 +1,909 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use alloc::arc::Arc; +use libc::{mod, c_char, c_int}; +use mem; +use ptr::{mod, null, null_mut}; +use rt::mutex; +use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr}; +use io::net::addrinfo; +use io::{IoResult, IoError}; +use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock, + wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval, + decode_error_detailed}; +use sys_common::{mod, keep_going, short_write, timeout}; +use prelude::*; +use cmp; +use io; + +#[deriving(Show)] +pub enum SocketStatus { + Readable, + Writable, +} + +//////////////////////////////////////////////////////////////////////////////// +// sockaddr and misc bindings +//////////////////////////////////////////////////////////////////////////////// + +pub fn htons(u: u16) -> u16 { + u.to_be() +} +pub fn ntohs(u: u16) -> u16 { + Int::from_be(u) +} + +pub enum InAddr { + In4Addr(libc::in_addr), + In6Addr(libc::in6_addr), +} + +pub fn ip_to_inaddr(ip: IpAddr) -> InAddr { + match ip { + Ipv4Addr(a, b, c, d) => { + let ip = (a as u32 << 24) | + (b as u32 << 16) | + (c as u32 << 8) | + (d as u32 << 0); + In4Addr(libc::in_addr { + s_addr: Int::from_be(ip) + }) + } + Ipv6Addr(a, b, c, d, e, f, g, h) => { + In6Addr(libc::in6_addr { + s6_addr: [ + htons(a), + htons(b), + htons(c), + htons(d), + htons(e), + htons(f), + htons(g), + htons(h), + ] + }) + } + } +} + +pub fn addr_to_sockaddr(addr: SocketAddr, + storage: &mut libc::sockaddr_storage) + -> libc::socklen_t { + unsafe { + let len = match ip_to_inaddr(addr.ip) { + In4Addr(inaddr) => { + let storage = storage as *mut _ as *mut libc::sockaddr_in; + (*storage).sin_family = libc::AF_INET as libc::sa_family_t; + (*storage).sin_port = htons(addr.port); + (*storage).sin_addr = inaddr; + mem::size_of::<libc::sockaddr_in>() + } + In6Addr(inaddr) => { + let storage = storage as *mut _ as *mut libc::sockaddr_in6; + (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t; + (*storage).sin6_port = htons(addr.port); + (*storage).sin6_addr = inaddr; + mem::size_of::<libc::sockaddr_in6>() + } + }; + return len as libc::socklen_t; + } +} + +pub fn socket(addr: SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { + unsafe { + let fam = match addr.ip { + Ipv4Addr(..) => libc::AF_INET, + Ipv6Addr(..) => libc::AF_INET6, + }; + match libc::socket(fam, ty, 0) { + -1 => Err(last_net_error()), + fd => Ok(fd), + } + } +} + +pub fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, + payload: T) -> IoResult<()> { + unsafe { + let payload = &payload as *const T as *const libc::c_void; + let ret = libc::setsockopt(fd, opt, val, + payload, + mem::size_of::<T>() as libc::socklen_t); + if ret != 0 { + Err(last_net_error()) + } else { + Ok(()) + } + } +} + +pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult<T> { + unsafe { + let mut slot: T = mem::zeroed(); + let mut len = mem::size_of::<T>() as libc::socklen_t; + let ret = c::getsockopt(fd, opt, val, + &mut slot as *mut _ as *mut _, + &mut len); + if ret != 0 { + Err(last_net_error()) + } else { + assert!(len as uint == mem::size_of::<T>()); + Ok(slot) + } + } +} + +pub fn sockname(fd: sock_t, + f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr, + *mut libc::socklen_t) -> libc::c_int) + -> IoResult<SocketAddr> +{ + let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; + let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; + unsafe { + let storage = &mut storage as *mut libc::sockaddr_storage; + let ret = f(fd, + storage as *mut libc::sockaddr, + &mut len as *mut libc::socklen_t); + if ret != 0 { + return Err(last_net_error()) + } + } + return sockaddr_to_addr(&storage, len as uint); +} + +pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, + len: uint) -> IoResult<SocketAddr> { + match storage.ss_family as libc::c_int { + libc::AF_INET => { + assert!(len as uint >= mem::size_of::<libc::sockaddr_in>()); + let storage: &libc::sockaddr_in = unsafe { + mem::transmute(storage) + }; + let ip = (storage.sin_addr.s_addr as u32).to_be(); + let a = (ip >> 24) as u8; + let b = (ip >> 16) as u8; + let c = (ip >> 8) as u8; + let d = (ip >> 0) as u8; + Ok(SocketAddr { + ip: Ipv4Addr(a, b, c, d), + port: ntohs(storage.sin_port), + }) + } + libc::AF_INET6 => { + assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>()); + let storage: &libc::sockaddr_in6 = unsafe { + mem::transmute(storage) + }; + let a = ntohs(storage.sin6_addr.s6_addr[0]); + let b = ntohs(storage.sin6_addr.s6_addr[1]); + let c = ntohs(storage.sin6_addr.s6_addr[2]); + let d = ntohs(storage.sin6_addr.s6_addr[3]); + let e = ntohs(storage.sin6_addr.s6_addr[4]); + let f = ntohs(storage.sin6_addr.s6_addr[5]); + let g = ntohs(storage.sin6_addr.s6_addr[6]); + let h = ntohs(storage.sin6_addr.s6_addr[7]); + Ok(SocketAddr { + ip: Ipv6Addr(a, b, c, d, e, f, g, h), + port: ntohs(storage.sin6_port), + }) + } + _ => { + Err(IoError { + kind: io::InvalidInput, + desc: "invalid argument", + detail: None, + }) + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// get_host_addresses +//////////////////////////////////////////////////////////////////////////////// + +extern "system" { + fn getaddrinfo(node: *const c_char, service: *const c_char, + hints: *const libc::addrinfo, + res: *mut *mut libc::addrinfo) -> c_int; + fn freeaddrinfo(res: *mut libc::addrinfo); +} + +pub fn get_host_addresses(host: Option<&str>, servname: Option<&str>, + hint: Option<addrinfo::Hint>) + -> Result<Vec<addrinfo::Info>, IoError> +{ + sys::init_net(); + + assert!(host.is_some() || servname.is_some()); + + let c_host = host.map(|x| x.to_c_str()); + let c_host = c_host.as_ref().map(|x| x.as_ptr()).unwrap_or(null()); + let c_serv = servname.map(|x| x.to_c_str()); + let c_serv = c_serv.as_ref().map(|x| x.as_ptr()).unwrap_or(null()); + + let hint = hint.map(|hint| { + libc::addrinfo { + ai_flags: hint.flags as c_int, + ai_family: hint.family as c_int, + ai_socktype: 0, + ai_protocol: 0, + ai_addrlen: 0, + ai_canonname: null_mut(), + ai_addr: null_mut(), + ai_next: null_mut() + } + }); + + let hint_ptr = hint.as_ref().map_or(null(), |x| { + x as *const libc::addrinfo + }); + let mut res = null_mut(); + + // Make the call + let s = unsafe { + getaddrinfo(c_host, c_serv, hint_ptr, &mut res) + }; + + // Error? + if s != 0 { + return Err(last_gai_error(s)); + } + + // Collect all the results we found + let mut addrs = Vec::new(); + let mut rp = res; + while rp.is_not_null() { + unsafe { + let addr = try!(sockaddr_to_addr(mem::transmute((*rp).ai_addr), + (*rp).ai_addrlen as uint)); + addrs.push(addrinfo::Info { + address: addr, + family: (*rp).ai_family as uint, + socktype: None, + protocol: None, + flags: (*rp).ai_flags as uint + }); + + rp = (*rp).ai_next as *mut libc::addrinfo; + } + } + + unsafe { freeaddrinfo(res); } + + Ok(addrs) +} + +//////////////////////////////////////////////////////////////////////////////// +// Timeout helpers +// +// The read/write functions below are the helpers for reading/writing a socket +// with a possible deadline specified. This is generally viewed as a timed out +// I/O operation. +// +// From the application's perspective, timeouts apply to the I/O object, not to +// the underlying file descriptor (it's one timeout per object). This means that +// we can't use the SO_RCVTIMEO and corresponding send timeout option. +// +// The next idea to implement timeouts would be to use nonblocking I/O. An +// invocation of select() would wait (with a timeout) for a socket to be ready. +// Once its ready, we can perform the operation. Note that the operation *must* +// be nonblocking, even though select() says the socket is ready. This is +// because some other thread could have come and stolen our data (handles can be +// cloned). +// +// To implement nonblocking I/O, the first option we have is to use the +// O_NONBLOCK flag. Remember though that this is a global setting, affecting all +// I/O objects, so this was initially viewed as unwise. +// +// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to +// send/recv, but the niftiness wears off once you realize it only works well on +// Linux [1] [2]. This means that it's pretty easy to get a nonblocking +// operation on Linux (no flag fiddling, no affecting other objects), but not on +// other platforms. +// +// To work around this constraint on other platforms, we end up using the +// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this +// could cause other objects' blocking operations to suddenly become +// nonblocking. To get around this, a "blocking operation" which returns EAGAIN +// falls back to using the same code path as nonblocking operations, but with an +// infinite timeout (select + send/recv). This helps emulate blocking +// reads/writes despite the underlying descriptor being nonblocking, as well as +// optimizing the fast path of just hitting one syscall in the good case. +// +// As a final caveat, this implementation uses a mutex so only one thread is +// doing a nonblocking operation at at time. This is the operation that comes +// after the select() (at which point we think the socket is ready). This is +// done for sanity to ensure that the state of the O_NONBLOCK flag is what we +// expect (wouldn't want someone turning it on when it should be off!). All +// operations performed in the lock are *nonblocking* to avoid holding the mutex +// forever. +// +// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone +// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking +// reads/writes are still blocking. +// +// Fun, fun! +// +// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html +// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait + +pub fn read<T>(fd: sock_t, + deadline: u64, + lock: || -> T, + read: |bool| -> libc::c_int) -> IoResult<uint> { + let mut ret = -1; + if deadline == 0 { + ret = retry(|| read(false)); + } + + if deadline != 0 || (ret == -1 && wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + loop { + // With a timeout, first we wait for the socket to become + // readable using select(), specifying the relevant timeout for + // our previously set deadline. + try!(await([fd], deadline, Readable)); + + // At this point, we're still within the timeout, and we've + // determined that the socket is readable (as returned by + // select). We must still read the socket in *nonblocking* mode + // because some other thread could come steal our data. If we + // fail to read some data, we retry (hence the outer loop) and + // wait for the socket to become readable again. + let _guard = lock(); + match retry(|| read(deadline.is_some())) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + n => { ret = n; break } + } + } + } + + match ret { + 0 => Err(sys_common::eof()), + n if n < 0 => Err(last_net_error()), + n => Ok(n as uint) + } +} + +pub fn write<T>(fd: sock_t, + deadline: u64, + buf: &[u8], + write_everything: bool, + lock: || -> T, + write: |bool, *const u8, uint| -> i64) -> IoResult<uint> { + let mut ret = -1; + let mut written = 0; + if deadline == 0 { + if write_everything { + ret = keep_going(buf, |inner, len| { + written = buf.len() - len; + write(false, inner, len) + }); + } else { + ret = retry(|| { write(false, buf.as_ptr(), buf.len()) }); + if ret > 0 { written = ret as uint; } + } + } + + if deadline != 0 || (ret == -1 && wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + while written < buf.len() && (write_everything || written == 0) { + // As with read(), first wait for the socket to be ready for + // the I/O operation. + match await([fd], deadline, Writable) { + Err(ref e) if e.kind == io::EndOfFile && written > 0 => { + assert!(deadline.is_some()); + return Err(short_write(written, "short write")) + } + Err(e) => return Err(e), + Ok(()) => {} + } + + // Also as with read(), we use MSG_DONTWAIT to guard ourselves + // against unforeseen circumstances. + let _guard = lock(); + let ptr = buf[written..].as_ptr(); + let len = buf.len() - written; + match retry(|| write(deadline.is_some(), ptr, len)) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + n => { written += n as uint; } + } + } + ret = 0; + } + if ret < 0 { + Err(last_net_error()) + } else { + Ok(written) + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: sock_t, + addrp: *const libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + #[cfg(unix)] use libc::EINPROGRESS as INPROGRESS; + #[cfg(windows)] use libc::WSAEINPROGRESS as INPROGRESS; + #[cfg(unix)] use libc::EWOULDBLOCK as WOULDBLOCK; + #[cfg(windows)] use libc::WSAEWOULDBLOCK as WOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::zeroed() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_net_error()), + _ => { + let err: libc::c_int = try!( + getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(decode_error_detailed(err)) + } + } + } + } + + -1 => Err(last_net_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let start = timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let mut tv = ms_to_timeval(timeout - (timer::now() - start)); + c::select(fd + 1, ptr::null_mut(), set as *mut _, + ptr::null_mut(), &mut tv) + }) + } + #[cfg(windows)] + fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let mut tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null_mut(), set, ptr::null_mut(), &mut tv) } + } +} + +pub fn await(fds: &[sock_t], deadline: Option<u64>, + status: SocketStatus) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::zeroed() }; + let mut max = 0; + for &fd in fds.iter() { + c::fd_set(&mut set, fd); + max = cmp::max(max, fd + 1); + } + if cfg!(windows) { + max = fds.len() as sock_t; + } + + let (read, write) = match status { + Readable => (&mut set as *mut _, ptr::null_mut()), + Writable => (ptr::null_mut(), &mut set as *mut _), + }; + let mut tv: libc::timeval = unsafe { mem::zeroed() }; + + match retry(|| { + let now = timer::now(); + let tvp = match deadline { + None => ptr::null_mut(), + Some(deadline) => { + // If we're past the deadline, then pass a 0 timeout to + // select() so we can poll the status + let ms = if deadline < now {0} else {deadline - now}; + tv = ms_to_timeval(ms); + &mut tv as *mut _ + } + }; + let r = unsafe { + c::select(max as libc::c_int, read, write, ptr::null_mut(), tvp) + }; + r + }) { + -1 => Err(last_net_error()), + 0 => Err(timeout("timed out")), + _ => Ok(()), + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Basic socket representation +//////////////////////////////////////////////////////////////////////////////// + +struct Inner { + fd: sock_t, + + // Unused on Linux, where this lock is not necessary. + #[allow(dead_code)] + lock: mutex::NativeMutex +} + +impl Inner { + fn new(fd: sock_t) -> Inner { + Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + } +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { close_sock(self.fd); } } +} + +pub struct Guard<'a> { + pub fd: sock_t, + pub guard: mutex::LockGuard<'a>, +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + assert!(set_nonblocking(self.fd, false).is_ok()); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// TCP streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpStream { + inner: Arc<Inner>, + read_deadline: u64, + write_deadline: u64, +} + +impl TcpStream { + pub fn connect(addr: SocketAddr, timeout: Option<u64>) -> IoResult<TcpStream> { + sys::init_net(); + + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpStream::new(fd); + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + match timeout { + Some(timeout) => { + try!(connect_timeout(fd, addrp, len, timeout)); + Ok(ret) + }, + None => { + match retry(|| unsafe { libc::connect(fd, addrp, len) }) { + -1 => Err(last_error()), + _ => Ok(ret), + } + } + } + } + + pub fn new(fd: sock_t) -> TcpStream { + TcpStream { + inner: Arc::new(Inner::new(fd)), + read_deadline: 0, + write_deadline: 0, + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd } + + pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, + nodelay as libc::c_int) + } + + pub fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { + let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, + seconds.is_some() as libc::c_int); + match seconds { + Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), + None => ret, + } + } + + #[cfg(any(target_os = "macos", target_os = "ios"))] + fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, + seconds as libc::c_int) + } + #[cfg(any(target_os = "freebsd", target_os = "dragonfly"))] + fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, + seconds as libc::c_int) + } + #[cfg(not(any(target_os = "macos", + target_os = "ios", + target_os = "freebsd", + target_os = "dragonfly")))] + fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { + Ok(()) + } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { + fd: self.fd(), + guard: unsafe { self.inner.lock.lock() }, + }; + assert!(set_nonblocking(self.fd(), true).is_ok()); + ret + } + + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let doread = |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recv(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as wrlen, + flags) as libc::c_int + }; + read(fd, self.read_deadline, dolock, doread) + } + + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::send(fd, + buf as *const _, + len as wrlen, + flags) as i64 + }; + write(fd, self.write_deadline, buf, true, dolock, dowrite).map(|_| ()) + } + pub fn peer_name(&mut self) -> IoResult<SocketAddr> { + sockname(self.fd(), libc::getpeername) + } + + pub fn close_write(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + pub fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +impl Clone for TcpStream { + fn clone(&self) -> TcpStream { + TcpStream { + inner: self.inner.clone(), + read_deadline: 0, + write_deadline: 0, + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// UDP +//////////////////////////////////////////////////////////////////////////////// + +pub struct UdpSocket { + inner: Arc<Inner>, + read_deadline: u64, + write_deadline: u64, +} + +impl UdpSocket { + pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> { + sys::init_net(); + + let fd = try!(socket(addr, libc::SOCK_DGRAM)); + let ret = UdpSocket { + inner: Arc::new(Inner::new(fd)), + read_deadline: 0, + write_deadline: 0, + }; + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_error()), + _ => Ok(ret), + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd } + + pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { + setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, + on as libc::c_int) + } + + pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, + on as libc::c_int) + } + + pub fn set_membership(&mut self, addr: IpAddr, opt: libc::c_int) -> IoResult<()> { + match ip_to_inaddr(addr) { + In4Addr(addr) => { + let mreq = libc::ip_mreq { + imr_multiaddr: addr, + // interface == INADDR_ANY + imr_interface: libc::in_addr { s_addr: 0x0 }, + }; + setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) + } + In6Addr(addr) => { + let mreq = libc::ip6_mreq { + ipv6mr_multiaddr: addr, + ipv6mr_interface: 0, + }; + setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) + } + } + } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { + fd: self.fd(), + guard: unsafe { self.inner.lock.lock() }, + }; + assert!(set_nonblocking(self.fd(), true).is_ok()); + ret + } + + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { + sockname(self.fd(), libc::getsockname) + } + + pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { + let fd = self.fd(); + let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; + let storagep = &mut storage as *mut _ as *mut libc::sockaddr; + let mut addrlen: libc::socklen_t = + mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; + + let dolock = || self.lock_nonblocking(); + let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recvfrom(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + flags, + storagep, + &mut addrlen) as libc::c_int + })); + sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { + Ok((n as uint, addr)) + }) + } + + pub fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> { + let mut storage = unsafe { mem::zeroed() }; + let dstlen = addr_to_sockaddr(dst, &mut storage); + let dstp = &storage as *const _ as *const libc::sockaddr; + + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb, buf: *const u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::sendto(fd, + buf as *const libc::c_void, + len as msglen_t, + flags, + dstp, + dstlen) as i64 + }; + + let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite)); + if n != buf.len() { + Err(short_write(n, "couldn't send entire packet at once")) + } else { + Ok(()) + } + } + + pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { + match multi { + Ipv4Addr(..) => { + self.set_membership(multi, libc::IP_ADD_MEMBERSHIP) + } + Ipv6Addr(..) => { + self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP) + } + } + } + pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { + match multi { + Ipv4Addr(..) => { + self.set_membership(multi, libc::IP_DROP_MEMBERSHIP) + } + Ipv6Addr(..) => { + self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP) + } + } + } + + pub fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, + ttl as libc::c_int) + } + pub fn time_to_live(&mut self, ttl: int) -> IoResult<()> { + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } +} + +impl Clone for UdpSocket { + fn clone(&self) -> UdpSocket { + UdpSocket { + inner: self.inner.clone(), + read_deadline: 0, + write_deadline: 0, + } + } +} diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index ad5de2dad48..5a43fd08f90 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -8,24 +8,51 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![allow(missing_doc)] + extern crate libc; use num; use prelude::*; use io::{mod, IoResult, IoError}; +use sys_common::mkerr_libc; +pub mod c; pub mod fs; pub mod os; -pub mod c; +pub mod tcp; +pub mod udp; +pub mod pipe; + +pub mod addrinfo { + pub use sys_common::net::get_host_addresses; +} -pub type sock_t = io::file::fd_t; +// FIXME: move these to c module +pub type sock_t = self::fs::fd_t; pub type wrlen = libc::size_t; +pub type msglen_t = libc::size_t; pub unsafe fn close_sock(sock: sock_t) { let _ = libc::close(sock); } pub fn last_error() -> IoError { - let errno = os::errno() as i32; - let mut err = decode_error(errno); - err.detail = Some(os::error_string(errno)); + decode_error_detailed(os::errno() as i32) +} + +pub fn last_net_error() -> IoError { + last_error() +} + +extern "system" { + fn gai_strerror(errcode: libc::c_int) -> *const libc::c_char; +} + +pub fn last_gai_error(s: libc::c_int) -> IoError { + use c_str::CString; + + let mut err = decode_error(s); + err.detail = Some(unsafe { + CString::new(gai_strerror(s), false).as_str().unwrap().to_string() + }); err } @@ -64,6 +91,12 @@ pub fn decode_error(errno: i32) -> IoError { IoError { kind: kind, desc: desc, detail: None } } +pub fn decode_error_detailed(errno: i32) -> IoError { + let mut err = decode_error(errno); + err.detail = Some(os::error_string(errno)); + err +} + #[inline] pub fn retry<I: PartialEq + num::One + Neg<I>> (f: || -> I) -> I { let minus_one = -num::one::<I>(); @@ -86,7 +119,10 @@ pub fn wouldblock() -> bool { err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int } -pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { +pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) } + +// nothing needed on unix platforms +pub fn init_net() {} diff --git a/src/libstd/sys/unix/os.rs b/src/libstd/sys/unix/os.rs index 34699eb27c1..4e495f043bc 100644 --- a/src/libstd/sys/unix/os.rs +++ b/src/libstd/sys/unix/os.rs @@ -11,6 +11,8 @@ use libc; use libc::{c_int, c_char}; use prelude::*; +use io::IoResult; +use sys::fs::FileDesc; use os::TMPBUF_SZ; @@ -99,3 +101,12 @@ pub fn error_string(errno: i32) -> String { ::string::raw::from_buf(p as *const u8) } } + +pub unsafe fn pipe() -> IoResult<(FileDesc, FileDesc)> { + let mut fds = [0, ..2]; + if libc::pipe(fds.as_mut_ptr()) == 0 { + Ok((FileDesc::new(fds[0], true), FileDesc::new(fds[1], true))) + } else { + Err(super::last_error()) + } +} diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs new file mode 100644 index 00000000000..67384848a94 --- /dev/null +++ b/src/libstd/sys/unix/pipe.rs @@ -0,0 +1,321 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use alloc::arc::Arc; +use libc; +use c_str::CString; +use mem; +use rt::mutex; +use sync::atomic; +use io::{mod, IoResult, IoError}; +use prelude::*; + +use sys::{mod, timer, retry, c, set_nonblocking, wouldblock}; +use sys::fs::{fd_t, FileDesc}; +use sys_common::net::*; +use sys_common::{eof, mkerr_libc}; + +fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { + match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { + -1 => Err(super::last_error()), + fd => Ok(fd) + } +} + +fn addr_to_sockaddr_un(addr: &CString, + storage: &mut libc::sockaddr_storage) + -> IoResult<libc::socklen_t> { + // the sun_path length is limited to SUN_LEN (with null) + assert!(mem::size_of::<libc::sockaddr_storage>() >= + mem::size_of::<libc::sockaddr_un>()); + let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) }; + + let len = addr.len(); + if len > s.sun_path.len() - 1 { + return Err(IoError { + kind: io::InvalidInput, + desc: "invalid argument: path must be smaller than SUN_LEN", + detail: None, + }) + } + s.sun_family = libc::AF_UNIX as libc::sa_family_t; + for (slot, value) in s.sun_path.iter_mut().zip(addr.iter()) { + *slot = value; + } + + // count the null terminator + let len = mem::size_of::<libc::sa_family_t>() + len + 1; + return Ok(len as libc::socklen_t); +} + +struct Inner { + fd: fd_t, + + // Unused on Linux, where this lock is not necessary. + #[allow(dead_code)] + lock: mutex::NativeMutex +} + +impl Inner { + fn new(fd: fd_t) -> Inner { + Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + } +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } +} + +fn connect(addr: &CString, ty: libc::c_int, + timeout: Option<u64>) -> IoResult<Inner> { + let mut storage = unsafe { mem::zeroed() }; + let len = try!(addr_to_sockaddr_un(addr, &mut storage)); + let inner = Inner::new(try!(unix_socket(ty))); + let addrp = &storage as *const _ as *const libc::sockaddr; + + match timeout { + None => { + match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } + } + Some(timeout_ms) => { + try!(connect_timeout(inner.fd, addrp, len, timeout_ms)); + Ok(inner) + } + } +} + +fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> { + let mut storage = unsafe { mem::zeroed() }; + let len = try!(addr_to_sockaddr_un(addr, &mut storage)); + let inner = Inner::new(try!(unix_socket(ty))); + let addrp = &storage as *const _ as *const libc::sockaddr; + match unsafe { + libc::bind(inner.fd, addrp, len) + } { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + inner: Arc<Inner>, + read_deadline: u64, + write_deadline: u64, +} + +impl UnixStream { + pub fn connect(addr: &CString, + timeout: Option<u64>) -> IoResult<UnixStream> { + connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { + UnixStream::new(Arc::new(inner)) + }) + } + + fn new(inner: Arc<Inner>) -> UnixStream { + UnixStream { + inner: inner, + read_deadline: 0, + write_deadline: 0, + } + } + + fn fd(&self) -> fd_t { self.inner.fd } + + #[cfg(target_os = "linux")] + fn lock_nonblocking(&self) {} + + #[cfg(not(target_os = "linux"))] + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { + fd: self.fd(), + guard: unsafe { self.inner.lock.lock() }, + }; + assert!(set_nonblocking(self.fd(), true).is_ok()); + ret + } + + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let doread = |nb| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::recv(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len() as libc::size_t, + flags) as libc::c_int + }; + read(fd, self.read_deadline, dolock, doread) + } + + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let fd = self.fd(); + let dolock = || self.lock_nonblocking(); + let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { + let flags = if nb {c::MSG_DONTWAIT} else {0}; + libc::send(fd, + buf as *const _, + len as libc::size_t, + flags) as i64 + }; + match write(fd, self.write_deadline, buf, true, dolock, dowrite) { + Ok(_) => Ok(()), + Err(e) => Err(e) + } + } + + pub fn close_write(&mut self) -> IoResult<()> { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + + pub fn close_read(&mut self) -> IoResult<()> { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream::new(self.inner.clone()) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + inner: Inner, + path: CString, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult<UnixListener> { + bind(addr, libc::SOCK_STREAM).map(|fd| { + UnixListener { inner: fd, path: addr.clone() } + }) + } + + fn fd(&self) -> fd_t { self.inner.fd } + + pub fn listen(self) -> IoResult<UnixAcceptor> { + match unsafe { libc::listen(self.fd(), 128) } { + -1 => Err(super::last_error()), + + _ => { + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + try!(set_nonblocking(reader.fd(), true)); + try!(set_nonblocking(writer.fd(), true)); + try!(set_nonblocking(self.fd(), true)); + Ok(UnixAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + } + } +} + +pub struct UnixAcceptor { + inner: Arc<AcceptorInner>, + deadline: u64, +} + +struct AcceptorInner { + listener: UnixListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + +impl UnixAcceptor { + fn fd(&self) -> fd_t { self.inner.listener.fd() } + + pub fn accept(&mut self) -> IoResult<UnixStream> { + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + unsafe { + let mut storage: libc::sockaddr_storage = mem::zeroed(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::<libc::sockaddr_storage>(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) { + -1 if wouldblock() => {} + -1 => return Err(super::last_error()), + fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), + } + } + try!(await([self.fd(), self.inner.reader.fd()], + deadline, Readable)); + } + + Err(eof()) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.write([0]) { + Ok(..) => Ok(()), + Err(..) if wouldblock() => Ok(()), + Err(e) => Err(e), + } + } +} + +impl Clone for UnixAcceptor { + fn clone(&self) -> UnixAcceptor { + UnixAcceptor { inner: self.inner.clone(), deadline: 0 } + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { + // Unlink the path to the socket to ensure that it doesn't linger. We're + // careful to unlink the path before we close the file descriptor to + // prevent races where we unlink someone else's path. + unsafe { + let _ = libc::unlink(self.path.as_ptr()); + } + } +} diff --git a/src/libstd/sys/unix/tcp.rs b/src/libstd/sys/unix/tcp.rs new file mode 100644 index 00000000000..962475e4177 --- /dev/null +++ b/src/libstd/sys/unix/tcp.rs @@ -0,0 +1,157 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use io::net::ip; +use io::IoResult; +use libc; +use mem; +use ptr; +use prelude::*; +use super::{last_error, last_net_error, retry, sock_t}; +use sync::{Arc, atomic}; +use sys::fs::FileDesc; +use sys::{set_nonblocking, wouldblock}; +use sys; +use sys_common; +use sys_common::net::*; + +pub use sys_common::net::TcpStream; + +//////////////////////////////////////////////////////////////////////////////// +// TCP listeners +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpListener { + pub inner: FileDesc, +} + +impl TcpListener { + pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> { + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpListener { inner: FileDesc::new(fd, true) }; + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + // On platforms with Berkeley-derived sockets, this allows + // to quickly rebind a socket, without needing to wait for + // the OS to clean up the previous one. + try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, 1 as libc::c_int)); + + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_error()), + _ => Ok(ret), + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd() } + + pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_net_error()), + _ => { + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + try!(set_nonblocking(reader.fd(), true)); + try!(set_nonblocking(writer.fd(), true)); + try!(set_nonblocking(self.fd(), true)); + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + } + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +pub struct TcpAcceptor { + inner: Arc<AcceptorInner>, + deadline: u64, +} + +struct AcceptorInner { + listener: TcpListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + +impl TcpAcceptor { + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + + pub fn accept(&mut self) -> IoResult<TcpStream> { + // In implementing accept, the two main concerns are dealing with + // close_accept() and timeouts. The unix implementation is based on a + // nonblocking accept plus a call to select(). Windows ends up having + // an entirely separate implementation than unix, which is explained + // below. + // + // To implement timeouts, all blocking is done via select() instead of + // accept() by putting the socket in non-blocking mode. Because + // select() takes a timeout argument, we just pass through the timeout + // to select(). + // + // To implement close_accept(), we have a self-pipe to ourselves which + // is passed to select() along with the socket being accepted on. The + // self-pipe is never written to unless close_accept() is called. + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + match retry(|| unsafe { + libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) + }) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + fd => return Ok(TcpStream::new(fd as sock_t)), + } + try!(await([self.fd(), self.inner.reader.fd()], + deadline, Readable)); + } + + Err(sys_common::eof()) + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| sys::timer::now() + a).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.write([0]) { + Ok(..) => Ok(()), + Err(..) if wouldblock() => Ok(()), + Err(e) => Err(e), + } + } +} + +impl Clone for TcpAcceptor { + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } + } +} diff --git a/src/libstd/sys/unix/udp.rs b/src/libstd/sys/unix/udp.rs new file mode 100644 index 00000000000..50f8fb828ad --- /dev/null +++ b/src/libstd/sys/unix/udp.rs @@ -0,0 +1,11 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use sys_common::net::UdpSocket; diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index 5f4129c1484..85fbc6b936c 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -33,12 +33,21 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => ( }; ) ) +pub mod c; pub mod fs; pub mod os; -pub mod c; +pub mod tcp; +pub mod udp; +pub mod pipe; + +pub mod addrinfo { + pub use sys_common::net::get_host_addresses; +} +// FIXME: move these to c module pub type sock_t = libc::SOCKET; pub type wrlen = libc::c_int; +pub type msglen_t = libc::c_int; pub unsafe fn close_sock(sock: sock_t) { let _ = libc::closesocket(sock); } // windows has zero values as errors @@ -140,7 +149,6 @@ pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { } } -// FIXME: call this pub fn init_net() { unsafe { static START: Once = ONCE_INIT; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs new file mode 100644 index 00000000000..f2f7994a005 --- /dev/null +++ b/src/libstd/sys/windows/pipe.rs @@ -0,0 +1,751 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Named pipes implementation for windows +//! +//! If are unfortunate enough to be reading this code, I would like to first +//! apologize. This was my first encounter with windows named pipes, and it +//! didn't exactly turn out very cleanly. If you, too, are new to named pipes, +//! read on as I'll try to explain some fun things that I ran into. +//! +//! # Unix pipes vs Named pipes +//! +//! As with everything else, named pipes on windows are pretty different from +//! unix pipes on unix. On unix, you use one "server pipe" to accept new client +//! pipes. So long as this server pipe is active, new children pipes can +//! connect. On windows, you instead have a number of "server pipes", and each +//! of these server pipes can throughout their lifetime be attached to a client +//! or not. Once attached to a client, a server pipe may then disconnect at a +//! later date. +//! +//! # Accepting clients +//! +//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces +//! are built around the unix flavors. This means that we have one "server +//! pipe" to which many clients can connect. In order to make this compatible +//! with the windows model, each connected client consumes ownership of a server +//! pipe, and then a new server pipe is created for the next client. +//! +//! Note that the server pipes attached to clients are never given back to the +//! listener for recycling. This could possibly be implemented with a channel so +//! the listener half can re-use server pipes, but for now I err'd on the simple +//! side of things. Each stream accepted by a listener will destroy the server +//! pipe after the stream is dropped. +//! +//! This model ends up having a small race or two, and you can find more details +//! on the `native_accept` method. +//! +//! # Simultaneous reads and writes +//! +//! In testing, I found that two simultaneous writes and two simultaneous reads +//! on a pipe ended up working out just fine, but problems were encountered when +//! a read was executed simultaneously with a write. After some googling around, +//! it sounded like named pipes just weren't built for this kind of interaction, +//! and the suggested solution was to use overlapped I/O. +//! +//! I don't really know what overlapped I/O is, but my basic understanding after +//! reading about it is that you have an external Event which is used to signal +//! I/O completion, passed around in some OVERLAPPED structures. As to what this +//! is, I'm not exactly sure. +//! +//! This problem implies that all named pipes are created with the +//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is +//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and +//! inside of this structure is a HANDLE from CreateEvent. After the I/O is +//! determined to be pending (may complete in the future), the +//! GetOverlappedResult function is used to block on the event, waiting for the +//! I/O to finish. +//! +//! This scheme ended up working well enough. There were two snags that I ran +//! into, however: +//! +//! * Each UnixStream instance needs its own read/write events to wait on. These +//! can't be shared among clones of the same stream because the documentation +//! states that it unsets the event when the I/O is started (would possibly +//! corrupt other events simultaneously waiting). For convenience's sake, +//! these events are lazily initialized. +//! +//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition +//! to all pipes created through `connect`. Notably this means that the +//! ConnectNamedPipe function is nonblocking, implying that the Listener needs +//! to have yet another event to do the actual blocking. +//! +//! # Conclusion +//! +//! The conclusion here is that I probably don't know the best way to work with +//! windows named pipes, but the solution here seems to work well enough to get +//! the test suite passing (the suite is in libstd), and that's good enough for +//! me! + +use alloc::arc::Arc; +use libc; +use c_str::CString; +use mem; +use ptr; +use sync::atomic; +use rt::mutex; +use io::{mod, IoError, IoResult}; +use prelude::*; + +use sys_common::{mod, eof}; + +use super::{c, os, timer, to_utf16, decode_error_detailed}; + +struct Event(libc::HANDLE); + +impl Event { + fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> { + let event = unsafe { + libc::CreateEventW(ptr::null_mut(), + manual_reset as libc::BOOL, + initial_state as libc::BOOL, + ptr::null()) + }; + if event as uint == 0 { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle()); } + } +} + +struct Inner { + handle: libc::HANDLE, + lock: mutex::NativeMutex, + read_closed: atomic::AtomicBool, + write_closed: atomic::AtomicBool, +} + +impl Inner { + fn new(handle: libc::HANDLE) -> Inner { + Inner { + handle: handle, + lock: unsafe { mutex::NativeMutex::new() }, + read_closed: atomic::AtomicBool::new(false), + write_closed: atomic::AtomicBool::new(false), + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + unsafe { + let _ = libc::FlushFileBuffers(self.handle); + let _ = libc::CloseHandle(self.handle); + } + } +} + +unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE { + libc::CreateNamedPipeW( + name, + libc::PIPE_ACCESS_DUPLEX | + if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} | + libc::FILE_FLAG_OVERLAPPED, + libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT, + libc::PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + ptr::null_mut() + ) +} + +pub fn await(handle: libc::HANDLE, deadline: u64, + events: &[libc::HANDLE]) -> IoResult<uint> { + use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0}; + + // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo + // to figure out if we should indeed get the result. + let ms = if deadline == 0 { + libc::INFINITE as u64 + } else { + let now = timer::now(); + if deadline < now {0} else {deadline - now} + }; + let ret = unsafe { + c::WaitForMultipleObjects(events.len() as libc::DWORD, + events.as_ptr(), + libc::FALSE, + ms as libc::DWORD) + }; + match ret { + WAIT_FAILED => Err(super::last_error()), + WAIT_TIMEOUT => unsafe { + let _ = c::CancelIo(handle); + Err(sys_common::timeout("operation timed out")) + }, + n => Ok((n - WAIT_OBJECT_0) as uint) + } +} + +fn epipe() -> IoError { + IoError { + kind: io::EndOfFile, + desc: "the pipe has ended", + detail: None, + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + inner: Arc<Inner>, + write: Option<Event>, + read: Option<Event>, + read_deadline: u64, + write_deadline: u64, +} + +impl UnixStream { + fn try_connect(p: *const u16) -> Option<libc::HANDLE> { + // Note that most of this is lifted from the libuv implementation. + // The idea is that if we fail to open a pipe in read/write mode + // that we try afterwards in just read or just write + let mut result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::GENERIC_WRITE, + 0, + ptr::null_mut(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::null_mut()) + }; + if result != libc::INVALID_HANDLE_VALUE { + return Some(result) + } + + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES, + 0, + ptr::null_mut(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::null_mut()) + }; + if result != libc::INVALID_HANDLE_VALUE { + return Some(result) + } + } + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES, + 0, + ptr::null_mut(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::null_mut()) + }; + if result != libc::INVALID_HANDLE_VALUE { + return Some(result) + } + } + None + } + + pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> { + let addr = try!(to_utf16(addr.as_str())); + let start = timer::now(); + loop { + match UnixStream::try_connect(addr.as_ptr()) { + Some(handle) => { + let inner = Inner::new(handle); + let mut mode = libc::PIPE_TYPE_BYTE | + libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT; + let ret = unsafe { + libc::SetNamedPipeHandleState(inner.handle, + &mut mode, + ptr::null_mut(), + ptr::null_mut()) + }; + return if ret == 0 { + Err(super::last_error()) + } else { + Ok(UnixStream { + inner: Arc::new(inner), + read: None, + write: None, + read_deadline: 0, + write_deadline: 0, + }) + } + } + None => {} + } + + // On windows, if you fail to connect, you may need to call the + // `WaitNamedPipe` function, and this is indicated with an error + // code of ERROR_PIPE_BUSY. + let code = unsafe { libc::GetLastError() }; + if code as int != libc::ERROR_PIPE_BUSY as int { + return Err(super::last_error()) + } + + match timeout { + Some(timeout) => { + let now = timer::now(); + let timed_out = (now - start) >= timeout || unsafe { + let ms = (timeout - (now - start)) as libc::DWORD; + libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0 + }; + if timed_out { + return Err(sys_common::timeout("connect timed out")) + } + } + + // An example I found on Microsoft's website used 20 + // seconds, libuv uses 30 seconds, hence we make the + // obvious choice of waiting for 25 seconds. + None => { + if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 { + return Err(super::last_error()) + } + } + } + } + } + + fn handle(&self) -> libc::HANDLE { self.inner.handle } + + fn read_closed(&self) -> bool { + self.inner.read_closed.load(atomic::SeqCst) + } + + fn write_closed(&self) -> bool { + self.inner.write_closed.load(atomic::SeqCst) + } + + fn cancel_io(&self) -> IoResult<()> { + match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } { + 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => { + Ok(()) + } + 0 => Err(super::last_error()), + _ => Ok(()) + } + } + + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + if self.read.is_none() { + self.read = Some(try!(Event::new(true, false))); + } + + let mut bytes_read = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; + overlapped.hEvent = self.read.as_ref().unwrap().handle(); + + // Pre-flight check to see if the reading half has been closed. This + // must be done before issuing the ReadFile request, but after we + // acquire the lock. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { self.inner.lock.lock() }; + if self.read_closed() { + return Err(eof()) + } + + // Issue a nonblocking requests, succeeding quickly if it happened to + // succeed. + let ret = unsafe { + libc::ReadFile(self.handle(), + buf.as_ptr() as libc::LPVOID, + buf.len() as libc::DWORD, + &mut bytes_read, + &mut overlapped) + }; + if ret != 0 { return Ok(bytes_read as uint) } + + // If our errno doesn't say that the I/O is pending, then we hit some + // legitimate error and return immediately. + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + + // Now that we've issued a successful nonblocking request, we need to + // wait for it to finish. This can all be done outside the lock because + // we'll see any invocation of CancelIoEx. We also call this in a loop + // because we're woken up if the writing half is closed, we just need to + // realize that the reading half wasn't closed and we go right back to + // sleep. + drop(guard); + loop { + // Process a timeout if one is pending + let wait_succeeded = await(self.handle(), self.read_deadline, + [overlapped.hEvent]); + + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + // If we succeeded, or we failed for some reason other than + // CancelIoEx, return immediately + if ret != 0 { return Ok(bytes_read as uint) } + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { + return Err(super::last_error()) + } + + // If the reading half is now closed, then we're done. If we woke up + // because the writing half was closed, keep trying. + if wait_succeeded.is_err() { + return Err(sys_common::timeout("read timed out")) + } + if self.read_closed() { + return Err(eof()) + } + } + } + + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { + if self.write.is_none() { + self.write = Some(try!(Event::new(true, false))); + } + + let mut offset = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; + overlapped.hEvent = self.write.as_ref().unwrap().handle(); + + while offset < buf.len() { + let mut bytes_written = 0; + + // This sequence below is quite similar to the one found in read(). + // Some careful looping is done to ensure that if close_write() is + // invoked we bail out early, and if close_read() is invoked we keep + // going after we woke up. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { self.inner.lock.lock() }; + if self.write_closed() { + return Err(epipe()) + } + let ret = unsafe { + libc::WriteFile(self.handle(), + buf[offset..].as_ptr() as libc::LPVOID, + (buf.len() - offset) as libc::DWORD, + &mut bytes_written, + &mut overlapped) + }; + let err = os::errno(); + drop(guard); + + if ret == 0 { + if err != libc::ERROR_IO_PENDING as uint { + return Err(decode_error_detailed(err as i32)) + } + // Process a timeout if one is pending + let wait_succeeded = await(self.handle(), self.write_deadline, + [overlapped.hEvent]); + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + // If we weren't aborted, this was a legit error, if we were + // aborted, then check to see if the write half was actually + // closed or whether we woke up from the read half closing. + if ret == 0 { + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { + return Err(super::last_error()) + } + if !wait_succeeded.is_ok() { + let amt = offset + bytes_written as uint; + return if amt > 0 { + Err(IoError { + kind: io::ShortWrite(amt), + desc: "short write during write", + detail: None, + }) + } else { + Err(sys_common::timeout("write timed out")) + } + } + if self.write_closed() { + return Err(epipe()) + } + continue // retry + } + } + offset += bytes_written as uint; + } + Ok(()) + } + + pub fn close_read(&mut self) -> IoResult<()> { + // On windows, there's no actual shutdown() method for pipes, so we're + // forced to emulate the behavior manually at the application level. To + // do this, we need to both cancel any pending requests, as well as + // prevent all future requests from succeeding. These two operations are + // not atomic with respect to one another, so we must use a lock to do + // so. + // + // The read() code looks like: + // + // 1. Make sure the pipe is still open + // 2. Submit a read request + // 3. Wait for the read request to finish + // + // The race this lock is preventing is if another thread invokes + // close_read() between steps 1 and 2. By atomically executing steps 1 + // and 2 with a lock with respect to close_read(), we're guaranteed that + // no thread will erroneously sit in a read forever. + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.read_closed.store(true, atomic::SeqCst); + self.cancel_io() + } + + pub fn close_write(&mut self) -> IoResult<()> { + // see comments in close_read() for why this lock is necessary + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.write_closed.store(true, atomic::SeqCst); + self.cancel_io() + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + self.read_deadline = deadline; + self.write_deadline = deadline; + } + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { + inner: self.inner.clone(), + read: None, + write: None, + read_deadline: 0, + write_deadline: 0, + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + handle: libc::HANDLE, + name: CString, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult<UnixListener> { + // Although we technically don't need the pipe until much later, we + // create the initial handle up front to test the validity of the name + // and such. + let addr_v = try!(to_utf16(addr.as_str())); + let ret = unsafe { pipe(addr_v.as_ptr(), true) }; + if ret == libc::INVALID_HANDLE_VALUE { + Err(super::last_error()) + } else { + Ok(UnixListener { handle: ret, name: addr.clone() }) + } + } + + pub fn listen(self) -> IoResult<UnixAcceptor> { + Ok(UnixAcceptor { + listener: self, + event: try!(Event::new(true, false)), + deadline: 0, + inner: Arc::new(AcceptorState { + abort: try!(Event::new(true, false)), + closed: atomic::AtomicBool::new(false), + }), + }) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle); } + } +} + +pub struct UnixAcceptor { + inner: Arc<AcceptorState>, + listener: UnixListener, + event: Event, + deadline: u64, +} + +struct AcceptorState { + abort: Event, + closed: atomic::AtomicBool, +} + +impl UnixAcceptor { + pub fn accept(&mut self) -> IoResult<UnixStream> { + // This function has some funky implementation details when working with + // unix pipes. On windows, each server named pipe handle can be + // connected to a one or zero clients. To the best of my knowledge, a + // named server is considered active and present if there exists at + // least one server named pipe for it. + // + // The model of this function is to take the current known server + // handle, connect a client to it, and then transfer ownership to the + // UnixStream instance. The next time accept() is invoked, it'll need a + // different server handle to connect a client to. + // + // Note that there is a possible race here. Once our server pipe is + // handed off to a `UnixStream` object, the stream could be closed, + // meaning that there would be no active server pipes, hence even though + // we have a valid `UnixAcceptor`, no one can connect to it. For this + // reason, we generate the next accept call's server pipe at the end of + // this function call. + // + // This provides us an invariant that we always have at least one server + // connection open at a time, meaning that all connects to this acceptor + // should succeed while this is active. + // + // The actual implementation of doing this is a little tricky. Once a + // server pipe is created, a client can connect to it at any time. I + // assume that which server a client connects to is nondeterministic, so + // we also need to guarantee that the only server able to be connected + // to is the one that we're calling ConnectNamedPipe on. This means that + // we have to create the second server pipe *after* we've already + // accepted a connection. In order to at least somewhat gracefully + // handle errors, this means that if the second server pipe creation + // fails that we disconnect the connected client and then just keep + // using the original server pipe. + let handle = self.listener.handle; + + // If we've had an artificial call to close_accept, be sure to never + // proceed in accepting new clients in the future + if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) } + + let name = try!(to_utf16(self.listener.name.as_str())); + + // Once we've got a "server handle", we need to wait for a client to + // connect. The ConnectNamedPipe function will block this thread until + // someone on the other end connects. This function can "fail" if a + // client connects after we created the pipe but before we got down + // here. Thanks windows. + let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() }; + overlapped.hEvent = self.event.handle(); + if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { + let mut err = unsafe { libc::GetLastError() }; + + if err == libc::ERROR_IO_PENDING as libc::DWORD { + // Process a timeout if one is pending + let wait_succeeded = await(handle, self.deadline, + [self.inner.abort.handle(), + overlapped.hEvent]); + + // This will block until the overlapped I/O is completed. The + // timeout was previously handled, so this will either block in + // the normal case or succeed very quickly in the timeout case. + let ret = unsafe { + let mut transfer = 0; + libc::GetOverlappedResult(handle, + &mut overlapped, + &mut transfer, + libc::TRUE) + }; + if ret == 0 { + if wait_succeeded.is_ok() { + err = unsafe { libc::GetLastError() }; + } else { + return Err(sys_common::timeout("accept timed out")) + } + } else { + // we succeeded, bypass the check below + err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; + } + } + if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD { + return Err(super::last_error()) + } + } + + // Now that we've got a connected client to our handle, we need to + // create a second server pipe. If this fails, we disconnect the + // connected client and return an error (see comments above). + let new_handle = unsafe { pipe(name.as_ptr(), false) }; + if new_handle == libc::INVALID_HANDLE_VALUE { + let ret = Err(super::last_error()); + // If our disconnection fails, then there's not really a whole lot + // that we can do, so panic + let err = unsafe { libc::DisconnectNamedPipe(handle) }; + assert!(err != 0); + return ret; + } else { + self.listener.handle = new_handle; + } + + // Transfer ownership of our handle into this stream + Ok(UnixStream { + inner: Arc::new(Inner::new(handle)), + read: None, + write: None, + read_deadline: 0, + write_deadline: 0, + }) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { + c::SetEvent(self.inner.abort.handle()) + }; + if ret == 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl Clone for UnixAcceptor { + fn clone(&self) -> UnixAcceptor { + let name = to_utf16(self.listener.name.as_str()).ok().unwrap(); + UnixAcceptor { + inner: self.inner.clone(), + event: Event::new(true, false).ok().unwrap(), + deadline: 0, + listener: UnixListener { + name: self.listener.name.clone(), + handle: unsafe { + let p = pipe(name.as_ptr(), false) ; + assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE); + p + }, + }, + } + } +} diff --git a/src/libstd/sys/windows/tcp.rs b/src/libstd/sys/windows/tcp.rs new file mode 100644 index 00000000000..3baf2be08d2 --- /dev/null +++ b/src/libstd/sys/windows/tcp.rs @@ -0,0 +1,219 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use io::net::ip; +use io::IoResult; +use libc; +use mem; +use ptr; +use prelude::*; +use super::{last_error, last_net_error, retry, sock_t}; +use sync::{Arc, atomic}; +use sys::fs::FileDesc; +use sys::{mod, c, set_nonblocking, wouldblock, timer}; +use sys_common::{mod, timeout, eof}; +use sys_common::net::*; + +pub use sys_common::net::TcpStream; + +pub struct Event(c::WSAEVENT); + +impl Event { + pub fn new() -> IoResult<Event> { + let event = unsafe { c::WSACreateEvent() }; + if event == c::WSA_INVALID_EVENT { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = c::WSACloseEvent(self.handle()); } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// TCP listeners +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpListener { + inner: FileDesc, +} + +impl TcpListener { + pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> { + sys::init_net(); + + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpListener { inner: FileDesc::new(fd as libc::c_int, true) }; + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_net_error()), + _ => Ok(ret), + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd as sock_t } + + pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_net_error()), + + _ => { + let accept = try!(Event::new()); + let ret = unsafe { + c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) + }; + if ret != 0 { + return Err(last_net_error()) + } + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + abort: try!(Event::new()), + accept: accept, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + } + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +pub struct TcpAcceptor { + inner: Arc<AcceptorInner>, + deadline: u64, +} + +struct AcceptorInner { + listener: TcpListener, + abort: Event, + accept: Event, + closed: atomic::AtomicBool, +} + +impl TcpAcceptor { + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + + pub fn accept(&mut self) -> IoResult<TcpStream> { + // Unlink unix, windows cannot invoke `select` on arbitrary file + // descriptors like pipes, only sockets. Consequently, windows cannot + // use the same implementation as unix for accept() when close_accept() + // is considered. + // + // In order to implement close_accept() and timeouts, windows uses + // event handles. An acceptor-specific abort event is created which + // will only get set in close_accept(), and it will never be un-set. + // Additionally, another acceptor-specific event is associated with the + // FD_ACCEPT network event. + // + // These two events are then passed to WaitForMultipleEvents to see + // which one triggers first, and the timeout passed to this function is + // the local timeout for the acceptor. + // + // If the wait times out, then the accept timed out. If the wait + // succeeds with the abort event, then we were closed, and if the wait + // succeeds otherwise, then we do a nonblocking poll via `accept` to + // see if we can accept a connection. The connection is candidate to be + // stolen, so we do all of this in a loop as well. + let events = [self.inner.abort.handle(), self.inner.accept.handle()]; + + while !self.inner.closed.load(atomic::SeqCst) { + let ms = if self.deadline == 0 { + c::WSA_INFINITE as u64 + } else { + let now = timer::now(); + if self.deadline < now {0} else {self.deadline - now} + }; + let ret = unsafe { + c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, + ms as libc::DWORD, libc::FALSE) + }; + match ret { + c::WSA_WAIT_TIMEOUT => { + return Err(timeout("accept timed out")) + } + c::WSA_WAIT_FAILED => return Err(last_net_error()), + c::WSA_WAIT_EVENT_0 => break, + n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), + } + + let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; + let ret = unsafe { + c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) + }; + if ret != 0 { return Err(last_net_error()) } + + if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } + match unsafe { + libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) + } { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + + // Accepted sockets inherit the same properties as the caller, + // so we need to deregister our event and switch the socket back + // to blocking mode + fd => { + let stream = TcpStream::new(fd); + let ret = unsafe { + c::WSAEventSelect(fd, events[1], 0) + }; + if ret != 0 { return Err(last_net_error()) } + try!(set_nonblocking(fd, false)); + return Ok(stream) + } + } + } + + Err(eof()) + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; + if ret == libc::TRUE { + Ok(()) + } else { + Err(last_net_error()) + } + } +} + +impl Clone for TcpAcceptor { + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } + } +} diff --git a/src/libstd/sys/windows/udp.rs b/src/libstd/sys/windows/udp.rs new file mode 100644 index 00000000000..50f8fb828ad --- /dev/null +++ b/src/libstd/sys/windows/udp.rs @@ -0,0 +1,11 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use sys_common::net::UdpSocket; |
