diff options
| -rw-r--r-- | src/libnative/io/mod.rs | 54 | ||||
| -rw-r--r-- | src/librustrt/rtio.rs | 127 | ||||
| -rw-r--r-- | src/libstd/io/net/addrinfo.rs | 32 | ||||
| -rw-r--r-- | src/libstd/io/net/mod.rs | 45 | ||||
| -rw-r--r-- | src/libstd/io/net/pipe.rs | 65 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 105 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 60 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 58 | ||||
| -rw-r--r-- | src/libstd/os.rs | 35 | ||||
| -rw-r--r-- | src/libstd/sys/common/net.rs (renamed from src/libnative/io/net.rs) | 1110 | ||||
| -rw-r--r-- | src/libstd/sys/unix/mod.rs | 50 | ||||
| -rw-r--r-- | src/libstd/sys/unix/os.rs | 11 | ||||
| -rw-r--r-- | src/libstd/sys/unix/pipe.rs (renamed from src/libnative/io/pipe_unix.rs) | 146 | ||||
| -rw-r--r-- | src/libstd/sys/unix/tcp.rs | 157 | ||||
| -rw-r--r-- | src/libstd/sys/unix/udp.rs | 11 | ||||
| -rw-r--r-- | src/libstd/sys/windows/mod.rs | 12 | ||||
| -rw-r--r-- | src/libstd/sys/windows/pipe.rs (renamed from src/libnative/io/pipe_windows.rs) | 164 | ||||
| -rw-r--r-- | src/libstd/sys/windows/tcp.rs | 219 | ||||
| -rw-r--r-- | src/libstd/sys/windows/udp.rs | 11 |
19 files changed, 1183 insertions, 1289 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index baf58b83dcd..2a76bc29f7c 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -35,8 +35,6 @@ pub use self::process::Process; mod helper_thread; // Native I/O implementations -pub mod addrinfo; -pub mod net; pub mod process; mod util; @@ -53,14 +51,6 @@ pub mod timer; #[path = "timer_windows.rs"] pub mod timer; -#[cfg(unix)] -#[path = "pipe_unix.rs"] -pub mod pipe; - -#[cfg(windows)] -#[path = "pipe_windows.rs"] -pub mod pipe; - #[cfg(windows)] #[path = "tty_windows.rs"] mod tty; @@ -126,52 +116,11 @@ pub struct IoFactory { impl IoFactory { pub fn new() -> IoFactory { - net::init(); IoFactory { _cannot_construct_outside_of_this_module: () } } } impl rtio::IoFactory for IoFactory { - // networking - fn tcp_connect(&mut self, addr: rtio::SocketAddr, - timeout: Option<u64>) - -> IoResult<Box<rtio::RtioTcpStream + Send>> - { - net::TcpStream::connect(addr, timeout).map(|s| { - box s as Box<rtio::RtioTcpStream + Send> - }) - } - fn tcp_bind(&mut self, addr: rtio::SocketAddr) - -> IoResult<Box<rtio::RtioTcpListener + Send>> { - net::TcpListener::bind(addr).map(|s| { - box s as Box<rtio::RtioTcpListener + Send> - }) - } - fn udp_bind(&mut self, addr: rtio::SocketAddr) - -> IoResult<Box<rtio::RtioUdpSocket + Send>> { - net::UdpSocket::bind(addr).map(|u| { - box u as Box<rtio::RtioUdpSocket + Send> - }) - } - fn unix_bind(&mut self, path: &CString) - -> IoResult<Box<rtio::RtioUnixListener + Send>> { - pipe::UnixListener::bind(path).map(|s| { - box s as Box<rtio::RtioUnixListener + Send> - }) - } - fn unix_connect(&mut self, path: &CString, - timeout: Option<u64>) -> IoResult<Box<rtio::RtioPipe + Send>> { - pipe::UnixStream::connect(path, timeout).map(|s| { - box s as Box<rtio::RtioPipe + Send> - }) - } - fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, - hint: Option<rtio::AddrinfoHint>) - -> IoResult<Vec<rtio::AddrinfoInfo>> - { - addrinfo::GetAddrInfoRequest::run(host, servname, hint) - } - // misc fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer + Send>> { timer::Timer::new().map(|t| box t as Box<rtio::RtioTimer + Send>) @@ -189,9 +138,6 @@ impl rtio::IoFactory for IoFactory { fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> { process::Process::kill(pid, signum) } - fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<rtio::RtioPipe + Send>> { - Ok(box file::FileDesc::new(fd, true) as Box<rtio::RtioPipe + Send>) - } #[cfg(unix)] fn tty_open(&mut self, fd: c_int, _readable: bool) -> IoResult<Box<rtio::RtioTTY + Send>> { diff --git a/src/librustrt/rtio.rs b/src/librustrt/rtio.rs index 1f3ef60e6fb..3ebfcaea687 100644 --- a/src/librustrt/rtio.rs +++ b/src/librustrt/rtio.rs @@ -13,13 +13,9 @@ use core::prelude::*; use alloc::boxed::Box; use collections::string::String; -use collections::vec::Vec; -use core::fmt; use core::mem; use libc::c_int; -use libc; -use c_str::CString; use local::Local; use task::Task; @@ -173,87 +169,15 @@ impl<'a> LocalIo<'a> { } pub trait IoFactory { - // networking - fn tcp_connect(&mut self, addr: SocketAddr, - timeout: Option<u64>) -> IoResult<Box<RtioTcpStream + Send>>; - fn tcp_bind(&mut self, addr: SocketAddr) - -> IoResult<Box<RtioTcpListener + Send>>; - fn udp_bind(&mut self, addr: SocketAddr) - -> IoResult<Box<RtioUdpSocket + Send>>; - fn unix_bind(&mut self, path: &CString) - -> IoResult<Box<RtioUnixListener + Send>>; - fn unix_connect(&mut self, path: &CString, - timeout: Option<u64>) -> IoResult<Box<RtioPipe + Send>>; - fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, - hint: Option<AddrinfoHint>) - -> IoResult<Vec<AddrinfoInfo>>; - - // misc fn timer_init(&mut self) -> IoResult<Box<RtioTimer + Send>>; fn spawn(&mut self, cfg: ProcessConfig) -> IoResult<(Box<RtioProcess + Send>, Vec<Option<Box<RtioPipe + Send>>>)>; fn kill(&mut self, pid: libc::pid_t, signal: int) -> IoResult<()>; - fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<RtioPipe + Send>>; fn tty_open(&mut self, fd: c_int, readable: bool) -> IoResult<Box<RtioTTY + Send>>; } -pub trait RtioTcpListener : RtioSocket { - fn listen(self: Box<Self>) -> IoResult<Box<RtioTcpAcceptor + Send>>; -} - -pub trait RtioTcpAcceptor : RtioSocket { - fn accept(&mut self) -> IoResult<Box<RtioTcpStream + Send>>; - fn accept_simultaneously(&mut self) -> IoResult<()>; - fn dont_accept_simultaneously(&mut self) -> IoResult<()>; - fn set_timeout(&mut self, timeout: Option<u64>); - fn clone(&self) -> Box<RtioTcpAcceptor + Send>; - fn close_accept(&mut self) -> IoResult<()>; -} - -pub trait RtioTcpStream : RtioSocket { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint>; - fn write(&mut self, buf: &[u8]) -> IoResult<()>; - fn peer_name(&mut self) -> IoResult<SocketAddr>; - fn control_congestion(&mut self) -> IoResult<()>; - fn nodelay(&mut self) -> IoResult<()>; - fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()>; - fn letdie(&mut self) -> IoResult<()>; - fn clone(&self) -> Box<RtioTcpStream + Send>; - fn close_write(&mut self) -> IoResult<()>; - fn close_read(&mut self) -> IoResult<()>; - fn set_timeout(&mut self, timeout_ms: Option<u64>); - fn set_read_timeout(&mut self, timeout_ms: Option<u64>); - fn set_write_timeout(&mut self, timeout_ms: Option<u64>); -} - -pub trait RtioSocket { - fn socket_name(&mut self) -> IoResult<SocketAddr>; -} - -pub trait RtioUdpSocket : RtioSocket { - fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)>; - fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()>; - - fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()>; - fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()>; - - fn loop_multicast_locally(&mut self) -> IoResult<()>; - fn dont_loop_multicast_locally(&mut self) -> IoResult<()>; - - fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()>; - fn time_to_live(&mut self, ttl: int) -> IoResult<()>; - - fn hear_broadcasts(&mut self) -> IoResult<()>; - fn ignore_broadcasts(&mut self) -> IoResult<()>; - - fn clone(&self) -> Box<RtioUdpSocket + Send>; - fn set_timeout(&mut self, timeout_ms: Option<u64>); - fn set_read_timeout(&mut self, timeout_ms: Option<u64>); - fn set_write_timeout(&mut self, timeout_ms: Option<u64>); -} - pub trait RtioTimer { fn sleep(&mut self, msecs: u64); fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>); @@ -313,54 +237,3 @@ pub struct IoError { } pub type IoResult<T> = Result<T, IoError>; - -#[deriving(PartialEq, Eq)] -pub enum IpAddr { - Ipv4Addr(u8, u8, u8, u8), - Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16), -} - -impl fmt::Show for IpAddr { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { - Ipv4Addr(a, b, c, d) => write!(fmt, "{}.{}.{}.{}", a, b, c, d), - Ipv6Addr(a, b, c, d, e, f, g, h) => { - write!(fmt, - "{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}", - a, b, c, d, e, f, g, h) - } - } - } -} - -#[deriving(PartialEq, Eq)] -pub struct SocketAddr { - pub ip: IpAddr, - pub port: u16, -} - -pub enum StdioContainer { - Ignored, - InheritFd(i32), - CreatePipe(bool, bool), -} - -pub enum ProcessExit { - ExitStatus(int), - ExitSignal(int), -} - -pub struct AddrinfoHint { - pub family: uint, - pub socktype: uint, - pub protocol: uint, - pub flags: uint, -} - -pub struct AddrinfoInfo { - pub address: SocketAddr, - pub family: uint, - pub socktype: uint, - pub protocol: uint, - pub flags: uint, -} diff --git a/src/libstd/io/net/addrinfo.rs b/src/libstd/io/net/addrinfo.rs index 3c72f58b10d..22775d54eff 100644 --- a/src/libstd/io/net/addrinfo.rs +++ b/src/libstd/io/net/addrinfo.rs @@ -20,12 +20,10 @@ getaddrinfo() #![allow(missing_docs)] use iter::Iterator; -use io::{IoResult, IoError}; +use io::{IoResult}; use io::net::ip::{SocketAddr, IpAddr}; use option::{Option, Some, None}; -use result::{Ok, Err}; -use rt::rtio::{IoFactory, LocalIo}; -use rt::rtio; +use sys; use vec::Vec; /// Hints to the types of sockets that are desired when looking up hosts @@ -94,31 +92,7 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> { #[allow(unused_variables)] fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>) -> IoResult<Vec<Info>> { - let hint = hint.map(|Hint { family, socktype, protocol, flags }| { - rtio::AddrinfoHint { - family: family, - socktype: 0, // FIXME: this should use the above variable - protocol: 0, // FIXME: this should use the above variable - flags: flags, - } - }); - match LocalIo::maybe_raise(|io| { - io.get_host_addresses(hostname, servname, hint) - }) { - Ok(v) => Ok(v.into_iter().map(|info| { - Info { - address: SocketAddr { - ip: super::from_rtio(info.address.ip), - port: info.address.port, - }, - family: info.family, - socktype: None, // FIXME: this should use the above variable - protocol: None, // FIXME: this should use the above variable - flags: info.flags, - } - }).collect()), - Err(e) => Err(IoError::from_rtio_error(e)), - } + sys::addrinfo::get_host_addresses(hostname, servname, hint) } // Ignored on android since we cannot give tcp/ip diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index b9b50a55a10..5b1747876d7 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -12,9 +12,8 @@ use io::{IoError, IoResult, InvalidInput}; use option::None; -use result::{Result, Ok, Err}; -use rt::rtio; -use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr, SocketAddr, ToSocketAddr}; +use result::{Ok, Err}; +use self::ip::{SocketAddr, ToSocketAddr}; pub use self::addrinfo::get_host_addresses; @@ -24,46 +23,6 @@ pub mod udp; pub mod ip; pub mod pipe; -fn to_rtio(ip: IpAddr) -> rtio::IpAddr { - match ip { - Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d), - Ipv6Addr(a, b, c, d, e, f, g, h) => { - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) - } - } -} - -fn from_rtio(ip: rtio::IpAddr) -> IpAddr { - match ip { - rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d), - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { - Ipv6Addr(a, b, c, d, e, f, g, h) - } - } -} - -fn with_addresses_io<A: ToSocketAddr, T>( - addr: A, - action: |&mut rtio::IoFactory, rtio::SocketAddr| -> Result<T, rtio::IoError> -) -> Result<T, IoError> { - const DEFAULT_ERROR: IoError = IoError { - kind: InvalidInput, - desc: "no addresses found for hostname", - detail: None - }; - - let addresses = try!(addr.to_socket_addr_all()); - let mut err = DEFAULT_ERROR; - for addr in addresses.into_iter() { - let addr = rtio::SocketAddr { ip: to_rtio(addr.ip), port: addr.port }; - match rtio::LocalIo::maybe_raise(|io| action(io, addr)) { - Ok(r) => return Ok(r), - Err(e) => err = IoError::from_rtio_error(e) - } - } - Err(err) -} - fn with_addresses<A: ToSocketAddr, T>(addr: A, action: |SocketAddr| -> IoResult<T>) -> IoResult<T> { const DEFAULT_ERROR: IoError = IoError { diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index 8c7deadebea..111b0f2b081 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -26,17 +26,20 @@ instances as clients. use prelude::*; -use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error}; -use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; -use rt::rtio::{RtioUnixAcceptor, RtioPipe}; +use io::{Listener, Acceptor, IoResult, TimedOut, standard_error}; use time::Duration; +use sys::pipe::UnixStream as UnixStreamImp; +use sys::pipe::UnixListener as UnixListenerImp; +use sys::pipe::UnixAcceptor as UnixAcceptorImp; + /// A stream which communicates over a named pipe. pub struct UnixStream { - obj: Box<RtioPipe + Send>, + inner: UnixStreamImp, } impl UnixStream { + /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -53,9 +56,8 @@ impl UnixStream { /// stream.write([1, 2, 3]); /// ``` pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> { - LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p }) - }).map_err(IoError::from_rtio_error) + UnixStreamImp::connect(&path.to_c_str(), None) + .map(|inner| UnixStream { inner: inner }) } /// Connect to a pipe named by `path`, timing out if the specified number of @@ -73,10 +75,8 @@ impl UnixStream { return Err(standard_error(TimedOut)); } - LocalIo::maybe_raise(|io| { - let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64)); - s.map(|p| UnixStream { obj: p }) - }).map_err(IoError::from_rtio_error) + UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64)) + .map(|inner| UnixStream { inner: inner }) } @@ -88,7 +88,7 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_read(&mut self) -> IoResult<()> { - self.obj.close_read().map_err(IoError::from_rtio_error) + self.inner.close_read() } /// Closes the writing half of this connection. @@ -99,7 +99,7 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { - self.obj.close_write().map_err(IoError::from_rtio_error) + self.inner.close_write() } /// Sets the read/write timeout for this socket. @@ -107,7 +107,7 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the read timeout for this socket. @@ -115,7 +115,7 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the write timeout for this socket. @@ -123,36 +123,35 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } impl Clone for UnixStream { fn clone(&self) -> UnixStream { - UnixStream { obj: self.obj.clone() } + UnixStream { inner: self.inner.clone() } } } impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for UnixStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } /// A value that can listen for incoming named pipe connection requests. pub struct UnixListener { /// The internal, opaque runtime Unix listener. - obj: Box<RtioUnixListener + Send>, + inner: UnixListenerImp, } impl UnixListener { - /// Creates a new listener, ready to receive incoming connections on the /// specified socket. The server will be named by `path`. /// @@ -175,24 +174,22 @@ impl UnixListener { /// # } /// ``` pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> { - LocalIo::maybe_raise(|io| { - io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s }) - }).map_err(IoError::from_rtio_error) + UnixListenerImp::bind(&path.to_c_str()) + .map(|inner| UnixListener { inner: inner }) } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { fn listen(self) -> IoResult<UnixAcceptor> { - self.obj.listen().map(|obj| { - UnixAcceptor { obj: obj } - }).map_err(IoError::from_rtio_error) + self.inner.listen() + .map(|inner| UnixAcceptor { inner: inner }) } } /// A value that can accept named pipe connections, returned from `listen()`. pub struct UnixAcceptor { /// The internal, opaque runtime Unix acceptor. - obj: Box<RtioUnixAcceptor + Send>, + inner: UnixAcceptorImp } impl UnixAcceptor { @@ -210,7 +207,7 @@ impl UnixAcceptor { #[experimental = "the name and arguments to this function are likely \ to change"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Closes the accepting capabilities of this acceptor. @@ -219,15 +216,15 @@ impl UnixAcceptor { /// more information can be found in that documentation. #[experimental] pub fn close_accept(&mut self) -> IoResult<()> { - self.obj.close_accept().map_err(IoError::from_rtio_error) + self.inner.close_accept() } } impl Acceptor<UnixStream> for UnixAcceptor { fn accept(&mut self) -> IoResult<UnixStream> { - self.obj.accept().map(|s| { - UnixStream { obj: s } - }).map_err(IoError::from_rtio_error) + self.inner.accept().map(|s| { + UnixStream { inner: s } + }) } } @@ -246,7 +243,7 @@ impl Clone for UnixAcceptor { /// This function is useful for creating a handle to invoke `close_accept` /// on to wake up any other task blocked in `accept`. fn clone(&self) -> UnixAcceptor { - UnixAcceptor { obj: self.obj.clone() } + UnixAcceptor { inner: self.inner.clone() } } } diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 928c8586739..2545e07cbb5 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -20,19 +20,17 @@ use clone::Clone; use io::IoResult; use iter::Iterator; -use result::{Ok,Err}; +use result::Err; use io::net::ip::{SocketAddr, ToSocketAddr}; -use io::IoError; use io::{Reader, Writer, Listener, Acceptor}; use io::{standard_error, TimedOut}; -use kinds::Send; use option::{None, Some, Option}; -use boxed::Box; -use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener}; -use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; -use rt::rtio; use time::Duration; +use sys::tcp::TcpStream as TcpStreamImp; +use sys::tcp::TcpListener as TcpListenerImp; +use sys::tcp::TcpAcceptor as TcpAcceptorImp; + /// A structure which represents a TCP stream between a local socket and a /// remote socket. /// @@ -50,12 +48,12 @@ use time::Duration; /// drop(stream); // close the connection /// ``` pub struct TcpStream { - obj: Box<RtioTcpStream + Send>, + inner: TcpStreamImp, } impl TcpStream { - fn new(s: Box<RtioTcpStream + Send>) -> TcpStream { - TcpStream { obj: s } + fn new(s: TcpStreamImp) -> TcpStream { + TcpStream { inner: s } } /// Open a TCP connection to a remote host. @@ -64,7 +62,9 @@ impl TcpStream { /// trait can be supplied for the address; see this trait documentation for /// concrete examples. pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> { - super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new)) + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, None).map(TcpStream::new) + }) } /// Creates a TCP connection to a remote socket address, timing out after @@ -86,39 +86,26 @@ impl TcpStream { return Err(standard_error(TimedOut)); } - super::with_addresses_io(addr, |io, addr| - io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new) - ) + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64)) + .map(TcpStream::new) + }) } /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.peer_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.peer_name() } /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.socket_name() } /// Sets the nodelay flag on this connection to the boolean specified #[experimental] pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - if nodelay { - self.obj.nodelay() - } else { - self.obj.control_congestion() - }.map_err(IoError::from_rtio_error) + self.inner.set_nodelay(nodelay) } /// Sets the keepalive timeout to the timeout specified. @@ -128,10 +115,7 @@ impl TcpStream { /// specified time, in seconds. #[experimental] pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> { - match delay_in_seconds { - Some(i) => self.obj.keepalive(i), - None => self.obj.letdie(), - }.map_err(IoError::from_rtio_error) + self.inner.set_keepalive(delay_in_seconds) } /// Closes the reading half of this connection. @@ -165,7 +149,7 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_read(&mut self) -> IoResult<()> { - self.obj.close_read().map_err(IoError::from_rtio_error) + self.inner.close_read() } /// Closes the writing half of this connection. @@ -176,7 +160,7 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { - self.obj.close_write().map_err(IoError::from_rtio_error) + self.inner.close_write() } /// Sets a timeout, in milliseconds, for blocking operations on this stream. @@ -198,7 +182,7 @@ impl TcpStream { /// take a look at `set_read_timeout` and `set_write_timeout`. #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the timeout for read operations on this stream. @@ -215,7 +199,7 @@ impl TcpStream { /// during the timeout period. #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the timeout for write operations on this stream. @@ -242,7 +226,7 @@ impl TcpStream { /// asynchronous fashion after the call to write returns. #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } @@ -256,19 +240,19 @@ impl Clone for TcpStream { /// Instead, the first read will receive the first packet received, and the /// second read will receive the second packet. fn clone(&self) -> TcpStream { - TcpStream { obj: self.obj.clone() } + TcpStream { inner: self.inner.clone() } } } impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } @@ -309,7 +293,7 @@ impl Writer for TcpStream { /// # } /// ``` pub struct TcpListener { - obj: Box<RtioTcpListener + Send>, + inner: TcpListenerImp, } impl TcpListener { @@ -324,26 +308,20 @@ impl TcpListener { /// The address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> { - super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l })) + super::with_addresses(addr, |addr| { + TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner }) + }) } /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.socket_name() } } impl Listener<TcpStream, TcpAcceptor> for TcpListener { fn listen(self) -> IoResult<TcpAcceptor> { - match self.obj.listen() { - Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }), - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.listen(128).map(|a| TcpAcceptor { inner: a }) } } @@ -351,7 +329,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { /// a `TcpListener`'s `listen` method, and this object can be used to accept new /// `TcpStream` instances. pub struct TcpAcceptor { - obj: Box<RtioTcpAcceptor + Send>, + inner: TcpAcceptorImp, } impl TcpAcceptor { @@ -399,7 +377,7 @@ impl TcpAcceptor { /// ``` #[experimental = "the type of the argument and name of this function are \ subject to change"] - pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); } + pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); } /// Closes the accepting capabilities of this acceptor. /// @@ -445,16 +423,13 @@ impl TcpAcceptor { /// ``` #[experimental] pub fn close_accept(&mut self) -> IoResult<()> { - self.obj.close_accept().map_err(IoError::from_rtio_error) + self.inner.close_accept() } } impl Acceptor<TcpStream> for TcpAcceptor { fn accept(&mut self) -> IoResult<TcpStream> { - match self.obj.accept(){ - Ok(s) => Ok(TcpStream::new(s)), - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.accept().map(TcpStream::new) } } @@ -473,7 +448,7 @@ impl Clone for TcpAcceptor { /// This function is useful for creating a handle to invoke `close_accept` /// on to wake up any other task blocked in `accept`. fn clone(&self) -> TcpAcceptor { - TcpAcceptor { obj: self.obj.clone() } + TcpAcceptor { inner: self.inner.clone() } } } @@ -1112,8 +1087,6 @@ mod test { #[test] fn shutdown_smoke() { - use rt::rtio::RtioTcpStream; - let addr = next_test_ip4(); let a = TcpListener::bind(addr).unwrap().listen(); spawn(proc() { @@ -1124,7 +1097,7 @@ mod test { }); let mut s = TcpStream::connect(addr).unwrap(); - assert!(s.obj.close_write().is_ok()); + assert!(s.inner.close_write().is_ok()); assert!(s.write([1]).is_err()); assert_eq!(s.read_to_end(), Ok(vec!(1))); } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 4ae054beadb..31b61989647 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -17,13 +17,10 @@ use clone::Clone; use io::net::ip::{SocketAddr, IpAddr, ToSocketAddr}; -use io::{Reader, Writer, IoResult, IoError}; -use kinds::Send; -use boxed::Box; +use io::{Reader, Writer, IoResult}; use option::Option; use result::{Ok, Err}; -use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory}; -use rt::rtio; +use sys::udp::UdpSocket as UdpSocketImp; /// A User Datagram Protocol socket. /// @@ -60,7 +57,7 @@ use rt::rtio; /// } /// ``` pub struct UdpSocket { - obj: Box<RtioUdpSocket + Send>, + inner: UdpSocketImp, } impl UdpSocket { @@ -69,18 +66,15 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> { - super::with_addresses_io(addr, |io, addr| io.udp_bind(addr).map(|s| UdpSocket { obj: s })) + super::with_addresses(addr, |addr| { + UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s }) + }) } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { - match self.obj.recv_from(buf) { - Ok((amt, rtio::SocketAddr { ip, port })) => { - Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port })) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.recv_from(buf) } /// Sends data on the socket to the given address. Returns nothing on @@ -89,10 +83,7 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> { - super::with_addresses(addr, |addr| self.obj.send_to(buf, rtio::SocketAddr { - ip: super::to_rtio(addr.ip), - port: addr.port, - }).map_err(IoError::from_rtio_error)) + super::with_addresses(addr, |addr| self.inner.send_to(buf, addr)) } /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer` @@ -112,24 +103,19 @@ impl UdpSocket { /// Returns the socket address that this socket was created from. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }), - Err(e) => Err(IoError::from_rtio_error(e)) - } + self.inner.socket_name() } /// Joins a multicast IP address (becomes a member of it) #[experimental] pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - let e = self.obj.join_multicast(super::to_rtio(multi)); - e.map_err(IoError::from_rtio_error) + self.inner.join_multicast(multi) } /// Leaves a multicast IP address (drops membership from it) #[experimental] pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - let e = self.obj.leave_multicast(super::to_rtio(multi)); - e.map_err(IoError::from_rtio_error) + self.inner.leave_multicast(multi) } /// Set the multicast loop flag to the specified value @@ -137,33 +123,25 @@ impl UdpSocket { /// This lets multicast packets loop back to local sockets (if enabled) #[experimental] pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - if on { - self.obj.loop_multicast_locally() - } else { - self.obj.dont_loop_multicast_locally() - }.map_err(IoError::from_rtio_error) + self.inner.set_multicast_loop(on) } /// Sets the multicast TTL #[experimental] pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error) + self.inner.multicast_time_to_live(ttl) } /// Sets this socket's TTL #[experimental] pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error) + self.inner.time_to_live(ttl) } /// Sets the broadcast flag on or off #[experimental] pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> { - if broadcast { - self.obj.hear_broadcasts() - } else { - self.obj.ignore_broadcasts() - }.map_err(IoError::from_rtio_error) + self.inner.set_broadcast(broadcast) } /// Sets the read/write timeout for this socket. @@ -171,7 +149,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the read timeout for this socket. @@ -179,7 +157,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the write timeout for this socket. @@ -187,7 +165,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } @@ -201,7 +179,7 @@ impl Clone for UdpSocket { /// received, and the second read will receive the second packet. fn clone(&self) -> UdpSocket { UdpSocket { - obj: self.obj.clone(), + inner: self.inner.clone(), } } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index c77cffd561e..64b2518fab1 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -17,15 +17,17 @@ use prelude::*; -use io::{IoResult, IoError}; +use io::IoResult; use libc; -use os; -use rt::rtio::{RtioPipe, LocalIo}; +use sync::Arc; + +use sys_common; +use sys; +use sys::fs::FileDesc as FileDesc; /// A synchronous, in-memory pipe. pub struct PipeStream { - /// The internal, opaque runtime pipe object. - obj: Box<RtioPipe + Send>, + inner: Arc<FileDesc> } pub struct PipePair { @@ -55,14 +57,14 @@ impl PipeStream { /// } /// ``` pub fn open(fd: libc::c_int) -> IoResult<PipeStream> { - LocalIo::maybe_raise(|io| { - io.pipe_open(fd).map(|obj| PipeStream { obj: obj }) - }).map_err(IoError::from_rtio_error) + Ok(PipeStream::from_filedesc(FileDesc::new(fd, true))) } + // FIXME: expose this some other way + /// Wrap a FileDesc directly, taking ownership. #[doc(hidden)] - pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream { - PipeStream { obj: inner } + pub fn from_filedesc(fd: FileDesc) -> PipeStream { + PipeStream { inner: Arc::new(fd) } } /// Creates a pair of in-memory OS pipes for a unidirectional communication @@ -76,43 +78,35 @@ impl PipeStream { /// This function can fail to succeed if the underlying OS has run out of /// available resources to allocate a new pipe. pub fn pair() -> IoResult<PipePair> { - struct Closer { fd: libc::c_int } - - let os::Pipe { reader, writer } = try!(unsafe { os::pipe() }); - let mut reader = Closer { fd: reader }; - let mut writer = Closer { fd: writer }; - - let io_reader = try!(PipeStream::open(reader.fd)); - reader.fd = -1; - let io_writer = try!(PipeStream::open(writer.fd)); - writer.fd = -1; - return Ok(PipePair { reader: io_reader, writer: io_writer }); - - impl Drop for Closer { - fn drop(&mut self) { - if self.fd != -1 { - let _ = unsafe { libc::close(self.fd) }; - } - } - } + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + Ok(PipePair { + reader: PipeStream::from_filedesc(reader), + writer: PipeStream::from_filedesc(writer), + }) + } +} + +impl sys_common::AsFileDesc for PipeStream { + fn as_fd(&self) -> &sys::fs::FileDesc { + &*self.inner } } impl Clone for PipeStream { fn clone(&self) -> PipeStream { - PipeStream { obj: self.obj.clone() } + PipeStream { inner: self.inner.clone() } } } impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for PipeStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } diff --git a/src/libstd/os.rs b/src/libstd/os.rs index 175e23bf819..ea42117bab6 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -43,6 +43,7 @@ use ops::Drop; use option::{Some, None, Option}; use os; use path::{Path, GenericPath, BytesContainer}; +use sys; use sys::os as os_imp; use ptr::RawPtr; use ptr; @@ -603,35 +604,11 @@ pub struct Pipe { /// descriptors to be closed, the file descriptors will leak. For safe handling /// of this scenario, use `std::io::PipeStream` instead. pub unsafe fn pipe() -> IoResult<Pipe> { - return _pipe(); - - #[cfg(unix)] - unsafe fn _pipe() -> IoResult<Pipe> { - let mut fds = [0, ..2]; - match libc::pipe(fds.as_mut_ptr()) { - 0 => Ok(Pipe { reader: fds[0], writer: fds[1] }), - _ => Err(IoError::last_error()), - } - } - - #[cfg(windows)] - unsafe fn _pipe() -> IoResult<Pipe> { - // Windows pipes work subtly differently than unix pipes, and their - // inheritance has to be handled in a different way that I do not - // fully understand. Here we explicitly make the pipe non-inheritable, - // which means to pass it to a subprocess they need to be duplicated - // first, as in std::run. - let mut fds = [0, ..2]; - match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint, - (libc::O_BINARY | libc::O_NOINHERIT) as c_int) { - 0 => { - assert!(fds[0] != -1 && fds[0] != 0); - assert!(fds[1] != -1 && fds[1] != 0); - Ok(Pipe { reader: fds[0], writer: fds[1] }) - } - _ => Err(IoError::last_error()), - } - } + let (reader, writer) = try!(sys::os::pipe()); + Ok(Pipe { + reader: reader.unwrap(), + writer: writer.unwrap(), + }) } /// Returns the proper dll filename for the given basename of a file diff --git a/src/libnative/io/net.rs b/src/libstd/sys/common/net.rs index a4b97a3eb84..0559005100f 100644 --- a/src/libnative/io/net.rs +++ b/src/libstd/sys/common/net.rs @@ -9,21 +9,26 @@ // except according to those terms. use alloc::arc::Arc; -use libc; -use std::mem; -use std::ptr; -use std::rt::mutex; -use std::rt::rtio::{mod, IoResult, IoError}; -use std::sync::atomic; - -use super::{retry, keep_going}; -use super::c; -use super::util; - -#[cfg(unix)] use super::process; -#[cfg(unix)] use super::file::FileDesc; - -pub use self::os::{init, sock_t, last_error}; +use libc::{mod, c_char, c_int}; +use mem; +use ptr::{mod, null, null_mut}; +use rt::mutex; +use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr}; +use io::net::addrinfo; +use io::{IoResult, IoError}; +use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock, + wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval, + decode_error_detailed}; +use sys_common::{mod, keep_going, short_write, timeout}; +use prelude::*; +use cmp; +use io; + +#[deriving(Show)] +pub enum SocketStatus { + Readable, + Writable, +} //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -36,14 +41,14 @@ pub fn ntohs(u: u16) -> u16 { Int::from_be(u) } -enum InAddr { +pub enum InAddr { In4Addr(libc::in_addr), In6Addr(libc::in6_addr), } -fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr { +pub fn ip_to_inaddr(ip: IpAddr) -> InAddr { match ip { - rtio::Ipv4Addr(a, b, c, d) => { + Ipv4Addr(a, b, c, d) => { let ip = (a as u32 << 24) | (b as u32 << 16) | (c as u32 << 8) | @@ -52,7 +57,7 @@ fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr { s_addr: Int::from_be(ip) }) } - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { + Ipv6Addr(a, b, c, d, e, f, g, h) => { In6Addr(libc::in6_addr { s6_addr: [ htons(a), @@ -69,7 +74,7 @@ fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr { } } -fn addr_to_sockaddr(addr: rtio::SocketAddr, +pub fn addr_to_sockaddr(addr: SocketAddr, storage: &mut libc::sockaddr_storage) -> libc::socklen_t { unsafe { @@ -93,20 +98,20 @@ fn addr_to_sockaddr(addr: rtio::SocketAddr, } } -fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { +pub fn socket(addr: SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { unsafe { let fam = match addr.ip { - rtio::Ipv4Addr(..) => libc::AF_INET, - rtio::Ipv6Addr(..) => libc::AF_INET6, + Ipv4Addr(..) => libc::AF_INET, + Ipv6Addr(..) => libc::AF_INET6, }; match libc::socket(fam, ty, 0) { - -1 => Err(os::last_error()), + -1 => Err(last_net_error()), fd => Ok(fd), } } } -fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, +pub fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { let payload = &payload as *const T as *const libc::c_void; @@ -114,7 +119,7 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload, mem::size_of::<T>() as libc::socklen_t); if ret != 0 { - Err(os::last_error()) + Err(last_net_error()) } else { Ok(()) } @@ -130,7 +135,7 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, &mut slot as *mut _ as *mut _, &mut len); if ret != 0 { - Err(os::last_error()) + Err(last_net_error()) } else { assert!(len as uint == mem::size_of::<T>()); Ok(slot) @@ -138,10 +143,10 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int, } } -fn sockname(fd: sock_t, +pub fn sockname(fd: sock_t, f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr, *mut libc::socklen_t) -> libc::c_int) - -> IoResult<rtio::SocketAddr> + -> IoResult<SocketAddr> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; @@ -151,14 +156,14 @@ fn sockname(fd: sock_t, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { - return Err(os::last_error()) + return Err(last_net_error()) } } return sockaddr_to_addr(&storage, len as uint); } pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, - len: uint) -> IoResult<rtio::SocketAddr> { + len: uint) -> IoResult<SocketAddr> { match storage.ss_family as libc::c_int { libc::AF_INET => { assert!(len as uint >= mem::size_of::<libc::sockaddr_in>()); @@ -170,8 +175,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, let b = (ip >> 16) as u8; let c = (ip >> 8) as u8; let d = (ip >> 0) as u8; - Ok(rtio::SocketAddr { - ip: rtio::Ipv4Addr(a, b, c, d), + Ok(SocketAddr { + ip: Ipv4Addr(a, b, c, d), port: ntohs(storage.sin_port), }) } @@ -188,17 +193,15 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, let f = ntohs(storage.sin6_addr.s6_addr[5]); let g = ntohs(storage.sin6_addr.s6_addr[6]); let h = ntohs(storage.sin6_addr.s6_addr[7]); - Ok(rtio::SocketAddr { - ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h), + Ok(SocketAddr { + ip: Ipv6Addr(a, b, c, d, e, f, g, h), port: ntohs(storage.sin6_port), }) } _ => { - #[cfg(unix)] use libc::EINVAL as ERROR; - #[cfg(windows)] use libc::WSAEINVAL as ERROR; Err(IoError { - code: ERROR as uint, - extra: 0, + kind: io::InvalidInput, + desc: "invalid argument", detail: None, }) } @@ -206,15 +209,343 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } //////////////////////////////////////////////////////////////////////////////// -// TCP streams +// get_host_addresses //////////////////////////////////////////////////////////////////////////////// -pub struct TcpStream { - inner: Arc<Inner>, - read_deadline: u64, - write_deadline: u64, +extern "system" { + fn getaddrinfo(node: *const c_char, service: *const c_char, + hints: *const libc::addrinfo, + res: *mut *mut libc::addrinfo) -> c_int; + fn freeaddrinfo(res: *mut libc::addrinfo); } +pub fn get_host_addresses(host: Option<&str>, servname: Option<&str>, + hint: Option<addrinfo::Hint>) + -> Result<Vec<addrinfo::Info>, IoError> +{ + sys::init_net(); + + assert!(host.is_some() || servname.is_some()); + + let c_host = host.map(|x| x.to_c_str()); + let c_host = c_host.as_ref().map(|x| x.as_ptr()).unwrap_or(null()); + let c_serv = servname.map(|x| x.to_c_str()); + let c_serv = c_serv.as_ref().map(|x| x.as_ptr()).unwrap_or(null()); + + let hint = hint.map(|hint| { + libc::addrinfo { + ai_flags: hint.flags as c_int, + ai_family: hint.family as c_int, + ai_socktype: 0, + ai_protocol: 0, + ai_addrlen: 0, + ai_canonname: null_mut(), + ai_addr: null_mut(), + ai_next: null_mut() + } + }); + + let hint_ptr = hint.as_ref().map_or(null(), |x| { + x as *const libc::addrinfo + }); + let mut res = null_mut(); + + // Make the call + let s = unsafe { + getaddrinfo(c_host, c_serv, hint_ptr, &mut res) + }; + + // Error? + if s != 0 { + return Err(last_gai_error(s)); + } + + // Collect all the results we found + let mut addrs = Vec::new(); + let mut rp = res; + while rp.is_not_null() { + unsafe { + let addr = try!(sockaddr_to_addr(mem::transmute((*rp).ai_addr), + (*rp).ai_addrlen as uint)); + addrs.push(addrinfo::Info { + address: addr, + family: (*rp).ai_family as uint, + socktype: None, + protocol: None, + flags: (*rp).ai_flags as uint + }); + + rp = (*rp).ai_next as *mut libc::addrinfo; + } + } + + unsafe { freeaddrinfo(res); } + + Ok(addrs) +} + +//////////////////////////////////////////////////////////////////////////////// +// Timeout helpers +// +// The read/write functions below are the helpers for reading/writing a socket +// with a possible deadline specified. This is generally viewed as a timed out +// I/O operation. +// +// From the application's perspective, timeouts apply to the I/O object, not to +// the underlying file descriptor (it's one timeout per object). This means that +// we can't use the SO_RCVTIMEO and corresponding send timeout option. +// +// The next idea to implement timeouts would be to use nonblocking I/O. An +// invocation of select() would wait (with a timeout) for a socket to be ready. +// Once its ready, we can perform the operation. Note that the operation *must* +// be nonblocking, even though select() says the socket is ready. This is +// because some other thread could have come and stolen our data (handles can be +// cloned). +// +// To implement nonblocking I/O, the first option we have is to use the +// O_NONBLOCK flag. Remember though that this is a global setting, affecting all +// I/O objects, so this was initially viewed as unwise. +// +// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to +// send/recv, but the niftiness wears off once you realize it only works well on +// Linux [1] [2]. This means that it's pretty easy to get a nonblocking +// operation on Linux (no flag fiddling, no affecting other objects), but not on +// other platforms. +// +// To work around this constraint on other platforms, we end up using the +// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this +// could cause other objects' blocking operations to suddenly become +// nonblocking. To get around this, a "blocking operation" which returns EAGAIN +// falls back to using the same code path as nonblocking operations, but with an +// infinite timeout (select + send/recv). This helps emulate blocking +// reads/writes despite the underlying descriptor being nonblocking, as well as +// optimizing the fast path of just hitting one syscall in the good case. +// +// As a final caveat, this implementation uses a mutex so only one thread is +// doing a nonblocking operation at at time. This is the operation that comes +// after the select() (at which point we think the socket is ready). This is +// done for sanity to ensure that the state of the O_NONBLOCK flag is what we +// expect (wouldn't want someone turning it on when it should be off!). All +// operations performed in the lock are *nonblocking* to avoid holding the mutex +// forever. +// +// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone +// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking +// reads/writes are still blocking. +// +// Fun, fun! +// +// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html +// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait + +pub fn read<T>(fd: sock_t, + deadline: u64, + lock: || -> T, + read: |bool| -> libc::c_int) -> IoResult<uint> { + let mut ret = -1; + if deadline == 0 { + ret = retry(|| read(false)); + } + + if deadline != 0 || (ret == -1 && wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + loop { + // With a timeout, first we wait for the socket to become + // readable using select(), specifying the relevant timeout for + // our previously set deadline. + try!(await([fd], deadline, Readable)); + + // At this point, we're still within the timeout, and we've + // determined that the socket is readable (as returned by + // select). We must still read the socket in *nonblocking* mode + // because some other thread could come steal our data. If we + // fail to read some data, we retry (hence the outer loop) and + // wait for the socket to become readable again. + let _guard = lock(); + match retry(|| read(deadline.is_some())) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + n => { ret = n; break } + } + } + } + + match ret { + 0 => Err(sys_common::eof()), + n if n < 0 => Err(last_net_error()), + n => Ok(n as uint) + } +} + +pub fn write<T>(fd: sock_t, + deadline: u64, + buf: &[u8], + write_everything: bool, + lock: || -> T, + write: |bool, *const u8, uint| -> i64) -> IoResult<uint> { + let mut ret = -1; + let mut written = 0; + if deadline == 0 { + if write_everything { + ret = keep_going(buf, |inner, len| { + written = buf.len() - len; + write(false, inner, len) + }); + } else { + ret = retry(|| { write(false, buf.as_ptr(), buf.len()) }); + if ret > 0 { written = ret as uint; } + } + } + + if deadline != 0 || (ret == -1 && wouldblock()) { + let deadline = match deadline { + 0 => None, + n => Some(n), + }; + while written < buf.len() && (write_everything || written == 0) { + // As with read(), first wait for the socket to be ready for + // the I/O operation. + match await([fd], deadline, Writable) { + Err(ref e) if e.kind == io::EndOfFile && written > 0 => { + assert!(deadline.is_some()); + return Err(short_write(written, "short write")) + } + Err(e) => return Err(e), + Ok(()) => {} + } + + // Also as with read(), we use MSG_DONTWAIT to guard ourselves + // against unforeseen circumstances. + let _guard = lock(); + let ptr = buf[written..].as_ptr(); + let len = buf.len() - written; + match retry(|| write(deadline.is_some(), ptr, len)) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + n => { written += n as uint; } + } + } + ret = 0; + } + if ret < 0 { + Err(last_net_error()) + } else { + Ok(written) + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: sock_t, + addrp: *const libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + #[cfg(unix)] use libc::EINPROGRESS as INPROGRESS; + #[cfg(windows)] use libc::WSAEINPROGRESS as INPROGRESS; + #[cfg(unix)] use libc::EWOULDBLOCK as WOULDBLOCK; + #[cfg(windows)] use libc::WSAEWOULDBLOCK as WOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::zeroed() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_net_error()), + _ => { + let err: libc::c_int = try!( + getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(decode_error_detailed(err)) + } + } + } + } + + -1 => Err(last_net_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let start = timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let mut tv = ms_to_timeval(timeout - (timer::now() - start)); + c::select(fd + 1, ptr::null_mut(), set as *mut _, + ptr::null_mut(), &mut tv) + }) + } + #[cfg(windows)] + fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { + let mut tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null_mut(), set, ptr::null_mut(), &mut tv) } + } +} + +pub fn await(fds: &[sock_t], deadline: Option<u64>, + status: SocketStatus) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::zeroed() }; + let mut max = 0; + for &fd in fds.iter() { + c::fd_set(&mut set, fd); + max = cmp::max(max, fd + 1); + } + if cfg!(windows) { + max = fds.len() as sock_t; + } + + let (read, write) = match status { + Readable => (&mut set as *mut _, ptr::null_mut()), + Writable => (ptr::null_mut(), &mut set as *mut _), + }; + let mut tv: libc::timeval = unsafe { mem::zeroed() }; + + match retry(|| { + let now = timer::now(); + let tvp = match deadline { + None => ptr::null_mut(), + Some(deadline) => { + // If we're past the deadline, then pass a 0 timeout to + // select() so we can poll the status + let ms = if deadline < now {0} else {deadline - now}; + tv = ms_to_timeval(ms); + &mut tv as *mut _ + } + }; + let r = unsafe { + c::select(max as libc::c_int, read, write, ptr::null_mut(), tvp) + }; + r + }) { + -1 => Err(last_net_error()), + 0 => Err(timeout("timed out")), + _ => Ok(()), + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Basic socket representation +//////////////////////////////////////////////////////////////////////////////// + struct Inner { fd: sock_t, @@ -223,22 +554,44 @@ struct Inner { lock: mutex::NativeMutex } +impl Inner { + fn new(fd: sock_t) -> Inner { + Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } + } +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { close_sock(self.fd); } } +} + pub struct Guard<'a> { pub fd: sock_t, pub guard: mutex::LockGuard<'a>, } -impl Inner { - fn new(fd: sock_t) -> Inner { - Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + assert!(set_nonblocking(self.fd, false).is_ok()); } } +//////////////////////////////////////////////////////////////////////////////// +// TCP streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpStream { + inner: Arc<Inner>, + read_deadline: u64, + write_deadline: u64, +} + impl TcpStream { - pub fn connect(addr: rtio::SocketAddr, - timeout: Option<u64>) -> IoResult<TcpStream> { + pub fn connect(addr: SocketAddr, timeout: Option<u64>) -> IoResult<TcpStream> { + sys::init_net(); + let fd = try!(socket(addr, libc::SOCK_STREAM)); - let ret = TcpStream::new(Inner::new(fd)); + let ret = TcpStream::new(fd); let mut storage = unsafe { mem::zeroed() }; let len = addr_to_sockaddr(addr, &mut storage); @@ -246,21 +599,21 @@ impl TcpStream { match timeout { Some(timeout) => { - try!(util::connect_timeout(fd, addrp, len, timeout)); + try!(connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { match retry(|| unsafe { libc::connect(fd, addrp, len) }) { - -1 => Err(os::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } } } } - fn new(inner: Inner) -> TcpStream { + pub fn new(fd: sock_t) -> TcpStream { TcpStream { - inner: Arc::new(inner), + inner: Arc::new(Inner::new(fd)), read_deadline: 0, write_deadline: 0, } @@ -268,12 +621,12 @@ impl TcpStream { pub fn fd(&self) -> sock_t { self.inner.fd } - fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, nodelay as libc::c_int) } - fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { + pub fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, seconds.is_some() as libc::c_int); match seconds { @@ -309,16 +662,11 @@ impl TcpStream { fd: self.fd(), guard: unsafe { self.inner.lock.lock() }, }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); + assert!(set_nonblocking(self.fd(), true).is_ok()); ret } -} -#[cfg(windows)] type wrlen = libc::c_int; -#[cfg(not(windows))] type wrlen = libc::size_t; - -impl rtio::RtioTcpStream for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let doread = |nb| unsafe { @@ -331,7 +679,7 @@ impl rtio::RtioTcpStream for TcpStream { read(fd, self.read_deadline, dolock, doread) } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { @@ -341,340 +689,42 @@ impl rtio::RtioTcpStream for TcpStream { len as wrlen, flags) as i64 }; - match write(fd, self.write_deadline, buf, true, dolock, dowrite) { - Ok(_) => Ok(()), - Err(e) => Err(e) - } + write(fd, self.write_deadline, buf, true, dolock, dowrite).map(|_| ()) } - fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> { + pub fn peer_name(&mut self) -> IoResult<SocketAddr> { sockname(self.fd(), libc::getpeername) } - fn control_congestion(&mut self) -> IoResult<()> { - self.set_nodelay(false) - } - fn nodelay(&mut self) -> IoResult<()> { - self.set_nodelay(true) - } - fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> { - self.set_keepalive(Some(delay_in_seconds)) - } - fn letdie(&mut self) -> IoResult<()> { - self.set_keepalive(None) - } - fn clone(&self) -> Box<rtio::RtioTcpStream + Send> { - box TcpStream { - inner: self.inner.clone(), - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioTcpStream + Send> - } - - fn close_write(&mut self) -> IoResult<()> { + pub fn close_write(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) } - fn close_read(&mut self) -> IoResult<()> { + pub fn close_read(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } -} - -impl rtio::RtioSocket for TcpStream { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -impl Drop for Inner { - fn drop(&mut self) { unsafe { os::close(self.fd); } } -} - -#[unsafe_destructor] -impl<'a> Drop for Guard<'a> { - fn drop(&mut self) { - assert!(util::set_nonblocking(self.fd, false).is_ok()); - } -} - -//////////////////////////////////////////////////////////////////////////////// -// TCP listeners -//////////////////////////////////////////////////////////////////////////////// - -pub struct TcpListener { - inner: Inner, -} - -impl TcpListener { - pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> { - let fd = try!(socket(addr, libc::SOCK_STREAM)); - let ret = TcpListener { inner: Inner::new(fd) }; - - let mut storage = unsafe { mem::zeroed() }; - let len = addr_to_sockaddr(addr, &mut storage); - let addrp = &storage as *const _ as *const libc::sockaddr; - - // On platforms with Berkeley-derived sockets, this allows - // to quickly rebind a socket, without needing to wait for - // the OS to clean up the previous one. - if cfg!(unix) { - try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, - 1 as libc::c_int)); - } - - match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(os::last_error()), - _ => Ok(ret), - } - } - - pub fn fd(&self) -> sock_t { self.inner.fd } - - pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> { - match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { - -1 => Err(os::last_error()), - - #[cfg(unix)] - _ => { - let (reader, writer) = try!(process::pipe()); - try!(util::set_nonblocking(reader.fd(), true)); - try!(util::set_nonblocking(writer.fd(), true)); - try!(util::set_nonblocking(self.fd(), true)); - Ok(TcpAcceptor { - inner: Arc::new(AcceptorInner { - listener: self, - reader: reader, - writer: writer, - closed: atomic::AtomicBool::new(false), - }), - deadline: 0, - }) - } - - #[cfg(windows)] - _ => { - let accept = try!(os::Event::new()); - let ret = unsafe { - c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) - }; - if ret != 0 { - return Err(os::last_error()) - } - Ok(TcpAcceptor { - inner: Arc::new(AcceptorInner { - listener: self, - abort: try!(os::Event::new()), - accept: accept, - closed: atomic::AtomicBool::new(false), - }), - deadline: 0, - }) - } - } - } -} - -impl rtio::RtioTcpListener for TcpListener { - fn listen(self: Box<TcpListener>) - -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> { - self.native_listen(128).map(|a| { - box a as Box<rtio::RtioTcpAcceptor + Send> - }) - } -} - -impl rtio::RtioSocket for TcpListener { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { - sockname(self.fd(), libc::getsockname) - } -} - -pub struct TcpAcceptor { - inner: Arc<AcceptorInner>, - deadline: u64, -} - -#[cfg(unix)] -struct AcceptorInner { - listener: TcpListener, - reader: FileDesc, - writer: FileDesc, - closed: atomic::AtomicBool, -} - -#[cfg(windows)] -struct AcceptorInner { - listener: TcpListener, - abort: os::Event, - accept: os::Event, - closed: atomic::AtomicBool, -} - -impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.inner.listener.fd() } - - #[cfg(unix)] - pub fn native_accept(&mut self) -> IoResult<TcpStream> { - // In implementing accept, the two main concerns are dealing with - // close_accept() and timeouts. The unix implementation is based on a - // nonblocking accept plus a call to select(). Windows ends up having - // an entirely separate implementation than unix, which is explained - // below. - // - // To implement timeouts, all blocking is done via select() instead of - // accept() by putting the socket in non-blocking mode. Because - // select() takes a timeout argument, we just pass through the timeout - // to select(). - // - // To implement close_accept(), we have a self-pipe to ourselves which - // is passed to select() along with the socket being accepted on. The - // self-pipe is never written to unless close_accept() is called. - let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; - - while !self.inner.closed.load(atomic::SeqCst) { - match retry(|| unsafe { - libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) - }) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))), - } - try!(util::await([self.fd(), self.inner.reader.fd()], - deadline, util::Readable)); - } - - Err(util::eof()) + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } - - #[cfg(windows)] - pub fn native_accept(&mut self) -> IoResult<TcpStream> { - // Unlink unix, windows cannot invoke `select` on arbitrary file - // descriptors like pipes, only sockets. Consequently, windows cannot - // use the same implementation as unix for accept() when close_accept() - // is considered. - // - // In order to implement close_accept() and timeouts, windows uses - // event handles. An acceptor-specific abort event is created which - // will only get set in close_accept(), and it will never be un-set. - // Additionally, another acceptor-specific event is associated with the - // FD_ACCEPT network event. - // - // These two events are then passed to WaitForMultipleEvents to see - // which one triggers first, and the timeout passed to this function is - // the local timeout for the acceptor. - // - // If the wait times out, then the accept timed out. If the wait - // succeeds with the abort event, then we were closed, and if the wait - // succeeds otherwise, then we do a nonblocking poll via `accept` to - // see if we can accept a connection. The connection is candidate to be - // stolen, so we do all of this in a loop as well. - let events = [self.inner.abort.handle(), self.inner.accept.handle()]; - - while !self.inner.closed.load(atomic::SeqCst) { - let ms = if self.deadline == 0 { - c::WSA_INFINITE as u64 - } else { - let now = ::io::timer::now(); - if self.deadline < now {0} else {self.deadline - now} - }; - let ret = unsafe { - c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, - ms as libc::DWORD, libc::FALSE) - }; - match ret { - c::WSA_WAIT_TIMEOUT => { - return Err(util::timeout("accept timed out")) - } - c::WSA_WAIT_FAILED => return Err(os::last_error()), - c::WSA_WAIT_EVENT_0 => break, - n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), - } - - let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; - let ret = unsafe { - c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) - }; - if ret != 0 { return Err(os::last_error()) } - - if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } - match unsafe { - libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) - } { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - - // Accepted sockets inherit the same properties as the caller, - // so we need to deregister our event and switch the socket back - // to blocking mode - fd => { - let stream = TcpStream::new(Inner::new(fd)); - let ret = unsafe { - c::WSAEventSelect(fd, events[1], 0) - }; - if ret != 0 { return Err(os::last_error()) } - try!(util::set_nonblocking(fd, false)); - return Ok(stream) - } - } - } - - Err(util::eof()) + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } -} -impl rtio::RtioSocket for TcpAcceptor { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { sockname(self.fd(), libc::getsockname) } } -impl rtio::RtioTcpAcceptor for TcpAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>) - } - - fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } - fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); - } - - fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> { - box TcpAcceptor { +impl Clone for TcpStream { + fn clone(&self) -> TcpStream { + TcpStream { inner: self.inner.clone(), - deadline: 0, - } as Box<rtio::RtioTcpAcceptor + Send> - } - - #[cfg(unix)] - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let mut fd = FileDesc::new(self.inner.writer.fd(), false); - match fd.inner_write([0]) { - Ok(..) => Ok(()), - Err(..) if util::wouldblock() => Ok(()), - Err(e) => Err(e), - } - } - - #[cfg(windows)] - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; - if ret == libc::TRUE { - Ok(()) - } else { - Err(os::last_error()) + read_deadline: 0, + write_deadline: 0, } } } @@ -690,7 +740,9 @@ pub struct UdpSocket { } impl UdpSocket { - pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> { + pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> { + sys::init_net(); + let fd = try!(socket(addr, libc::SOCK_DGRAM)); let ret = UdpSocket { inner: Arc::new(Inner::new(fd)), @@ -703,7 +755,7 @@ impl UdpSocket { let addrp = &storage as *const _ as *const libc::sockaddr; match unsafe { libc::bind(fd, addrp, len) } { - -1 => Err(os::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } } @@ -720,8 +772,7 @@ impl UdpSocket { on as libc::c_int) } - pub fn set_membership(&mut self, addr: rtio::IpAddr, - opt: libc::c_int) -> IoResult<()> { + pub fn set_membership(&mut self, addr: IpAddr, opt: libc::c_int) -> IoResult<()> { match ip_to_inaddr(addr) { In4Addr(addr) => { let mreq = libc::ip_mreq { @@ -750,22 +801,15 @@ impl UdpSocket { fd: self.fd(), guard: unsafe { self.inner.lock.lock() }, }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); + assert!(set_nonblocking(self.fd(), true).is_ok()); ret } -} -impl rtio::RtioSocket for UdpSocket { - fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> { + pub fn socket_name(&mut self) -> IoResult<SocketAddr> { sockname(self.fd(), libc::getsockname) } -} - -#[cfg(windows)] type msglen_t = libc::c_int; -#[cfg(unix)] type msglen_t = libc::size_t; -impl rtio::RtioUdpSocket for UdpSocket { - fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> { + pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { let fd = self.fd(); let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let storagep = &mut storage as *mut _ as *mut libc::sockaddr; @@ -787,7 +831,7 @@ impl rtio::RtioUdpSocket for UdpSocket { }) } - fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> { + pub fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> { let mut storage = unsafe { mem::zeroed() }; let dstlen = addr_to_sockaddr(dst, &mut storage); let dstp = &storage as *const _ as *const libc::sockaddr; @@ -806,298 +850,60 @@ impl rtio::RtioUdpSocket for UdpSocket { let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite)); if n != buf.len() { - Err(util::short_write(n, "couldn't send entire packet at once")) + Err(short_write(n, "couldn't send entire packet at once")) } else { Ok(()) } } - fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { + pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { match multi { - rtio::Ipv4Addr(..) => { + Ipv4Addr(..) => { self.set_membership(multi, libc::IP_ADD_MEMBERSHIP) } - rtio::Ipv6Addr(..) => { + Ipv6Addr(..) => { self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP) } } } - fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { + pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { match multi { - rtio::Ipv4Addr(..) => { + Ipv4Addr(..) => { self.set_membership(multi, libc::IP_DROP_MEMBERSHIP) } - rtio::Ipv6Addr(..) => { + Ipv6Addr(..) => { self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP) } } } - fn loop_multicast_locally(&mut self) -> IoResult<()> { - self.set_multicast_loop(true) - } - fn dont_loop_multicast_locally(&mut self) -> IoResult<()> { - self.set_multicast_loop(false) - } - - fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { + pub fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, ttl as libc::c_int) } - fn time_to_live(&mut self, ttl: int) -> IoResult<()> { + pub fn time_to_live(&mut self, ttl: int) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) } - fn hear_broadcasts(&mut self) -> IoResult<()> { - self.set_broadcast(true) - } - fn ignore_broadcasts(&mut self) -> IoResult<()> { - self.set_broadcast(false) - } - - fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> { - box UdpSocket { - inner: self.inner.clone(), - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioUdpSocket + Send> - } - - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } } -//////////////////////////////////////////////////////////////////////////////// -// Timeout helpers -// -// The read/write functions below are the helpers for reading/writing a socket -// with a possible deadline specified. This is generally viewed as a timed out -// I/O operation. -// -// From the application's perspective, timeouts apply to the I/O object, not to -// the underlying file descriptor (it's one timeout per object). This means that -// we can't use the SO_RCVTIMEO and corresponding send timeout option. -// -// The next idea to implement timeouts would be to use nonblocking I/O. An -// invocation of select() would wait (with a timeout) for a socket to be ready. -// Once its ready, we can perform the operation. Note that the operation *must* -// be nonblocking, even though select() says the socket is ready. This is -// because some other thread could have come and stolen our data (handles can be -// cloned). -// -// To implement nonblocking I/O, the first option we have is to use the -// O_NONBLOCK flag. Remember though that this is a global setting, affecting all -// I/O objects, so this was initially viewed as unwise. -// -// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to -// send/recv, but the niftiness wears off once you realize it only works well on -// Linux [1] [2]. This means that it's pretty easy to get a nonblocking -// operation on Linux (no flag fiddling, no affecting other objects), but not on -// other platforms. -// -// To work around this constraint on other platforms, we end up using the -// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this -// could cause other objects' blocking operations to suddenly become -// nonblocking. To get around this, a "blocking operation" which returns EAGAIN -// falls back to using the same code path as nonblocking operations, but with an -// infinite timeout (select + send/recv). This helps emulate blocking -// reads/writes despite the underlying descriptor being nonblocking, as well as -// optimizing the fast path of just hitting one syscall in the good case. -// -// As a final caveat, this implementation uses a mutex so only one thread is -// doing a nonblocking operation at at time. This is the operation that comes -// after the select() (at which point we think the socket is ready). This is -// done for sanity to ensure that the state of the O_NONBLOCK flag is what we -// expect (wouldn't want someone turning it on when it should be off!). All -// operations performed in the lock are *nonblocking* to avoid holding the mutex -// forever. -// -// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone -// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking -// reads/writes are still blocking. -// -// Fun, fun! -// -// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html -// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait - -pub fn read<T>(fd: sock_t, - deadline: u64, - lock: || -> T, - read: |bool| -> libc::c_int) -> IoResult<uint> { - let mut ret = -1; - if deadline == 0 { - ret = retry(|| read(false)); - } - - if deadline != 0 || (ret == -1 && util::wouldblock()) { - let deadline = match deadline { - 0 => None, - n => Some(n), - }; - loop { - // With a timeout, first we wait for the socket to become - // readable using select(), specifying the relevant timeout for - // our previously set deadline. - try!(util::await([fd], deadline, util::Readable)); - - // At this point, we're still within the timeout, and we've - // determined that the socket is readable (as returned by - // select). We must still read the socket in *nonblocking* mode - // because some other thread could come steal our data. If we - // fail to read some data, we retry (hence the outer loop) and - // wait for the socket to become readable again. - let _guard = lock(); - match retry(|| read(deadline.is_some())) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - n => { ret = n; break } - } - } - } - - match ret { - 0 => Err(util::eof()), - n if n < 0 => Err(os::last_error()), - n => Ok(n as uint) - } -} - -pub fn write<T>(fd: sock_t, - deadline: u64, - buf: &[u8], - write_everything: bool, - lock: || -> T, - write: |bool, *const u8, uint| -> i64) -> IoResult<uint> { - let mut ret = -1; - let mut written = 0; - if deadline == 0 { - if write_everything { - ret = keep_going(buf, |inner, len| { - written = buf.len() - len; - write(false, inner, len) - }); - } else { - ret = retry(|| { write(false, buf.as_ptr(), buf.len()) }); - if ret > 0 { written = ret as uint; } - } - } - - if deadline != 0 || (ret == -1 && util::wouldblock()) { - let deadline = match deadline { - 0 => None, - n => Some(n), - }; - while written < buf.len() && (write_everything || written == 0) { - // As with read(), first wait for the socket to be ready for - // the I/O operation. - match util::await([fd], deadline, util::Writable) { - Err(ref e) if e.code == libc::EOF as uint && written > 0 => { - assert!(deadline.is_some()); - return Err(util::short_write(written, "short write")) - } - Err(e) => return Err(e), - Ok(()) => {} - } - - // Also as with read(), we use MSG_DONTWAIT to guard ourselves - // against unforeseen circumstances. - let _guard = lock(); - let ptr = buf[written..].as_ptr(); - let len = buf.len() - written; - match retry(|| write(deadline.is_some(), ptr, len)) { - -1 if util::wouldblock() => {} - -1 => return Err(os::last_error()), - n => { written += n as uint; } - } - } - ret = 0; - } - if ret < 0 { - Err(os::last_error()) - } else { - Ok(written) - } -} - -#[cfg(windows)] -mod os { - use libc; - use std::mem; - use std::rt::rtio::{IoError, IoResult}; - - use io::c; - - pub type sock_t = libc::SOCKET; - pub struct Event(c::WSAEVENT); - - impl Event { - pub fn new() -> IoResult<Event> { - let event = unsafe { c::WSACreateEvent() }; - if event == c::WSA_INVALID_EVENT { - Err(last_error()) - } else { - Ok(Event(event)) - } - } - - pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } - } - - impl Drop for Event { - fn drop(&mut self) { - unsafe { let _ = c::WSACloseEvent(self.handle()); } - } - } - - pub fn init() { - unsafe { - use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - static mut INITIALIZED: bool = false; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - - let _guard = LOCK.lock(); - if !INITIALIZED { - let mut data: c::WSADATA = mem::zeroed(); - let ret = c::WSAStartup(0x202, // version 2.2 - &mut data); - assert_eq!(ret, 0); - INITIALIZED = true; - } - } - } - - pub fn last_error() -> IoError { - use std::os; - let code = unsafe { c::WSAGetLastError() as uint }; - IoError { - code: code, - extra: 0, - detail: Some(os::error_string(code)), +impl Clone for UdpSocket { + fn clone(&self) -> UdpSocket { + UdpSocket { + inner: self.inner.clone(), + read_deadline: 0, + write_deadline: 0, } } - - pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } -} - -#[cfg(unix)] -mod os { - use libc; - use std::rt::rtio::IoError; - use io; - - pub type sock_t = io::file::fd_t; - - pub fn init() {} - pub fn last_error() -> IoError { io::last_error() } - pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } } diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index ad5de2dad48..5a43fd08f90 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -8,24 +8,51 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![allow(missing_doc)] + extern crate libc; use num; use prelude::*; use io::{mod, IoResult, IoError}; +use sys_common::mkerr_libc; +pub mod c; pub mod fs; pub mod os; -pub mod c; +pub mod tcp; +pub mod udp; +pub mod pipe; + +pub mod addrinfo { + pub use sys_common::net::get_host_addresses; +} -pub type sock_t = io::file::fd_t; +// FIXME: move these to c module +pub type sock_t = self::fs::fd_t; pub type wrlen = libc::size_t; +pub type msglen_t = libc::size_t; pub unsafe fn close_sock(sock: sock_t) { let _ = libc::close(sock); } pub fn last_error() -> IoError { - let errno = os::errno() as i32; - let mut err = decode_error(errno); - err.detail = Some(os::error_string(errno)); + decode_error_detailed(os::errno() as i32) +} + +pub fn last_net_error() -> IoError { + last_error() +} + +extern "system" { + fn gai_strerror(errcode: libc::c_int) -> *const libc::c_char; +} + +pub fn last_gai_error(s: libc::c_int) -> IoError { + use c_str::CString; + + let mut err = decode_error(s); + err.detail = Some(unsafe { + CString::new(gai_strerror(s), false).as_str().unwrap().to_string() + }); err } @@ -64,6 +91,12 @@ pub fn decode_error(errno: i32) -> IoError { IoError { kind: kind, desc: desc, detail: None } } +pub fn decode_error_detailed(errno: i32) -> IoError { + let mut err = decode_error(errno); + err.detail = Some(os::error_string(errno)); + err +} + #[inline] pub fn retry<I: PartialEq + num::One + Neg<I>> (f: || -> I) -> I { let minus_one = -num::one::<I>(); @@ -86,7 +119,10 @@ pub fn wouldblock() -> bool { err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int } -pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { +pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) } + +// nothing needed on unix platforms +pub fn init_net() {} diff --git a/src/libstd/sys/unix/os.rs b/src/libstd/sys/unix/os.rs index 34699eb27c1..4e495f043bc 100644 --- a/src/libstd/sys/unix/os.rs +++ b/src/libstd/sys/unix/os.rs @@ -11,6 +11,8 @@ use libc; use libc::{c_int, c_char}; use prelude::*; +use io::IoResult; +use sys::fs::FileDesc; use os::TMPBUF_SZ; @@ -99,3 +101,12 @@ pub fn error_string(errno: i32) -> String { ::string::raw::from_buf(p as *const u8) } } + +pub unsafe fn pipe() -> IoResult<(FileDesc, FileDesc)> { + let mut fds = [0, ..2]; + if libc::pipe(fds.as_mut_ptr()) == 0 { + Ok((FileDesc::new(fds[0], true), FileDesc::new(fds[1], true))) + } else { + Err(super::last_error()) + } +} diff --git a/src/libnative/io/pipe_unix.rs b/src/libstd/sys/unix/pipe.rs index 48f31615339..67384848a94 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -10,19 +10,17 @@ use alloc::arc::Arc; use libc; -use std::c_str::CString; -use std::mem; -use std::rt::mutex; -use std::rt::rtio; -use std::rt::rtio::{IoResult, IoError}; -use std::sync::atomic; - -use super::retry; -use super::net; -use super::util; -use super::c; -use super::process; -use super::file::{fd_t, FileDesc}; +use c_str::CString; +use mem; +use rt::mutex; +use sync::atomic; +use io::{mod, IoResult, IoError}; +use prelude::*; + +use sys::{mod, timer, retry, c, set_nonblocking, wouldblock}; +use sys::fs::{fd_t, FileDesc}; +use sys_common::net::*; +use sys_common::{eof, mkerr_libc}; fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { @@ -41,12 +39,10 @@ fn addr_to_sockaddr_un(addr: &CString, let len = addr.len(); if len > s.sun_path.len() - 1 { - #[cfg(unix)] use libc::EINVAL as ERROR; - #[cfg(windows)] use libc::WSAEINVAL as ERROR; return Err(IoError { - code: ERROR as uint, - extra: 0, - detail: Some("path must be smaller than SUN_LEN".to_string()), + kind: io::InvalidInput, + desc: "invalid argument: path must be smaller than SUN_LEN", + detail: None, }) } s.sun_family = libc::AF_UNIX as libc::sa_family_t; @@ -92,7 +88,7 @@ fn connect(addr: &CString, ty: libc::c_int, } } Some(timeout_ms) => { - try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); + try!(connect_timeout(inner.fd, addrp, len, timeout_ms)); Ok(inner) } } @@ -143,18 +139,16 @@ impl UnixStream { fn lock_nonblocking(&self) {} #[cfg(not(target_os = "linux"))] - fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { - let ret = net::Guard { + fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { + let ret = Guard { fd: self.fd(), guard: unsafe { self.inner.lock.lock() }, }; - assert!(util::set_nonblocking(self.fd(), true).is_ok()); + assert!(set_nonblocking(self.fd(), true).is_ok()); ret } -} -impl rtio::RtioPipe for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let doread = |nb| unsafe { @@ -164,10 +158,10 @@ impl rtio::RtioPipe for UnixStream { buf.len() as libc::size_t, flags) as libc::c_int }; - net::read(fd, self.read_deadline, dolock, doread) + read(fd, self.read_deadline, dolock, doread) } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { @@ -177,32 +171,38 @@ impl rtio::RtioPipe for UnixStream { len as libc::size_t, flags) as i64 }; - match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) { + match write(fd, self.write_deadline, buf, true, dolock, dowrite) { Ok(_) => Ok(()), Err(e) => Err(e) } } - fn clone(&self) -> Box<rtio::RtioPipe + Send> { - box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send> + pub fn close_write(&mut self) -> IoResult<()> { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) } - fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) - } - fn close_read(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + pub fn close_read(&mut self) -> IoResult<()> { + mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream::new(self.inner.clone()) } } @@ -224,16 +224,15 @@ impl UnixListener { fn fd(&self) -> fd_t { self.inner.fd } - pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> { - match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + pub fn listen(self) -> IoResult<UnixAcceptor> { + match unsafe { libc::listen(self.fd(), 128) } { -1 => Err(super::last_error()), - #[cfg(unix)] _ => { - let (reader, writer) = try!(process::pipe()); - try!(util::set_nonblocking(reader.fd(), true)); - try!(util::set_nonblocking(writer.fd(), true)); - try!(util::set_nonblocking(self.fd(), true)); + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + try!(set_nonblocking(reader.fd(), true)); + try!(set_nonblocking(writer.fd(), true)); + try!(set_nonblocking(self.fd(), true)); Ok(UnixAcceptor { inner: Arc::new(AcceptorInner { listener: self, @@ -248,21 +247,11 @@ impl UnixListener { } } -impl rtio::RtioUnixListener for UnixListener { - fn listen(self: Box<UnixListener>) - -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> { - self.native_listen(128).map(|a| { - box a as Box<rtio::RtioUnixAcceptor + Send> - }) - } -} - pub struct UnixAcceptor { inner: Arc<AcceptorInner>, deadline: u64, } -#[cfg(unix)] struct AcceptorInner { listener: UnixListener, reader: FileDesc, @@ -273,7 +262,7 @@ struct AcceptorInner { impl UnixAcceptor { fn fd(&self) -> fd_t { self.inner.listener.fd() } - pub fn native_accept(&mut self) -> IoResult<UnixStream> { + pub fn accept(&mut self) -> IoResult<UnixStream> { let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; while !self.inner.closed.load(atomic::SeqCst) { @@ -287,46 +276,39 @@ impl UnixAcceptor { storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int }) { - -1 if util::wouldblock() => {} + -1 if wouldblock() => {} -1 => return Err(super::last_error()), fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), } } - try!(util::await([self.fd(), self.inner.reader.fd()], - deadline, util::Readable)); + try!(await([self.fd(), self.inner.reader.fd()], + deadline, Readable)); } - Err(util::eof()) + Err(eof()) } -} -impl rtio::RtioUnixAcceptor for UnixAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>) - } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } - fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { - box UnixAcceptor { - inner: self.inner.clone(), - deadline: 0, - } as Box<rtio::RtioUnixAcceptor + Send> - } - - #[cfg(unix)] - fn close_accept(&mut self) -> IoResult<()> { + pub fn close_accept(&mut self) -> IoResult<()> { self.inner.closed.store(true, atomic::SeqCst); - let mut fd = FileDesc::new(self.inner.writer.fd(), false); - match fd.inner_write([0]) { + let fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.write([0]) { Ok(..) => Ok(()), - Err(..) if util::wouldblock() => Ok(()), + Err(..) if wouldblock() => Ok(()), Err(e) => Err(e), } } } +impl Clone for UnixAcceptor { + fn clone(&self) -> UnixAcceptor { + UnixAcceptor { inner: self.inner.clone(), deadline: 0 } + } +} + impl Drop for UnixListener { fn drop(&mut self) { // Unlink the path to the socket to ensure that it doesn't linger. We're diff --git a/src/libstd/sys/unix/tcp.rs b/src/libstd/sys/unix/tcp.rs new file mode 100644 index 00000000000..962475e4177 --- /dev/null +++ b/src/libstd/sys/unix/tcp.rs @@ -0,0 +1,157 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use io::net::ip; +use io::IoResult; +use libc; +use mem; +use ptr; +use prelude::*; +use super::{last_error, last_net_error, retry, sock_t}; +use sync::{Arc, atomic}; +use sys::fs::FileDesc; +use sys::{set_nonblocking, wouldblock}; +use sys; +use sys_common; +use sys_common::net::*; + +pub use sys_common::net::TcpStream; + +//////////////////////////////////////////////////////////////////////////////// +// TCP listeners +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpListener { + pub inner: FileDesc, +} + +impl TcpListener { + pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> { + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpListener { inner: FileDesc::new(fd, true) }; + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + // On platforms with Berkeley-derived sockets, this allows + // to quickly rebind a socket, without needing to wait for + // the OS to clean up the previous one. + try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, 1 as libc::c_int)); + + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_error()), + _ => Ok(ret), + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd() } + + pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_net_error()), + _ => { + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + try!(set_nonblocking(reader.fd(), true)); + try!(set_nonblocking(writer.fd(), true)); + try!(set_nonblocking(self.fd(), true)); + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + } + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +pub struct TcpAcceptor { + inner: Arc<AcceptorInner>, + deadline: u64, +} + +struct AcceptorInner { + listener: TcpListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + +impl TcpAcceptor { + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + + pub fn accept(&mut self) -> IoResult<TcpStream> { + // In implementing accept, the two main concerns are dealing with + // close_accept() and timeouts. The unix implementation is based on a + // nonblocking accept plus a call to select(). Windows ends up having + // an entirely separate implementation than unix, which is explained + // below. + // + // To implement timeouts, all blocking is done via select() instead of + // accept() by putting the socket in non-blocking mode. Because + // select() takes a timeout argument, we just pass through the timeout + // to select(). + // + // To implement close_accept(), we have a self-pipe to ourselves which + // is passed to select() along with the socket being accepted on. The + // self-pipe is never written to unless close_accept() is called. + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + match retry(|| unsafe { + libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) + }) { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + fd => return Ok(TcpStream::new(fd as sock_t)), + } + try!(await([self.fd(), self.inner.reader.fd()], + deadline, Readable)); + } + + Err(sys_common::eof()) + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| sys::timer::now() + a).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.write([0]) { + Ok(..) => Ok(()), + Err(..) if wouldblock() => Ok(()), + Err(e) => Err(e), + } + } +} + +impl Clone for TcpAcceptor { + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } + } +} diff --git a/src/libstd/sys/unix/udp.rs b/src/libstd/sys/unix/udp.rs new file mode 100644 index 00000000000..50f8fb828ad --- /dev/null +++ b/src/libstd/sys/unix/udp.rs @@ -0,0 +1,11 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use sys_common::net::UdpSocket; diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index 5f4129c1484..85fbc6b936c 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -33,12 +33,21 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => ( }; ) ) +pub mod c; pub mod fs; pub mod os; -pub mod c; +pub mod tcp; +pub mod udp; +pub mod pipe; + +pub mod addrinfo { + pub use sys_common::net::get_host_addresses; +} +// FIXME: move these to c module pub type sock_t = libc::SOCKET; pub type wrlen = libc::c_int; +pub type msglen_t = libc::c_int; pub unsafe fn close_sock(sock: sock_t) { let _ = libc::closesocket(sock); } // windows has zero values as errors @@ -140,7 +149,6 @@ pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { } } -// FIXME: call this pub fn init_net() { unsafe { static START: Once = ONCE_INIT; diff --git a/src/libnative/io/pipe_windows.rs b/src/libstd/sys/windows/pipe.rs index f764470f37d..f2f7994a005 100644 --- a/src/libnative/io/pipe_windows.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -86,18 +86,17 @@ use alloc::arc::Arc; use libc; -use std::c_str::CString; -use std::mem; -use std::os; -use std::ptr; -use std::rt::rtio; -use std::rt::rtio::{IoResult, IoError}; -use std::sync::atomic; -use std::rt::mutex; - -use super::c; -use super::util; -use super::file::to_utf16; +use c_str::CString; +use mem; +use ptr; +use sync::atomic; +use rt::mutex; +use io::{mod, IoError, IoResult}; +use prelude::*; + +use sys_common::{mod, eof}; + +use super::{c, os, timer, to_utf16, decode_error_detailed}; struct Event(libc::HANDLE); @@ -177,7 +176,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64, let ms = if deadline == 0 { libc::INFINITE as u64 } else { - let now = ::io::timer::now(); + let now = timer::now(); if deadline < now {0} else {deadline - now} }; let ret = unsafe { @@ -190,7 +189,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64, WAIT_FAILED => Err(super::last_error()), WAIT_TIMEOUT => unsafe { let _ = c::CancelIo(handle); - Err(util::timeout("operation timed out")) + Err(sys_common::timeout("operation timed out")) }, n => Ok((n - WAIT_OBJECT_0) as uint) } @@ -198,8 +197,8 @@ pub fn await(handle: libc::HANDLE, deadline: u64, fn epipe() -> IoError { IoError { - code: libc::ERROR_BROKEN_PIPE as uint, - extra: 0, + kind: io::EndOfFile, + desc: "the pipe has ended", detail: None, } } @@ -268,8 +267,8 @@ impl UnixStream { } pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> { - let addr = try!(to_utf16(addr)); - let start = ::io::timer::now(); + let addr = try!(to_utf16(addr.as_str())); + let start = timer::now(); loop { match UnixStream::try_connect(addr.as_ptr()) { Some(handle) => { @@ -308,13 +307,13 @@ impl UnixStream { match timeout { Some(timeout) => { - let now = ::io::timer::now(); + let now = timer::now(); let timed_out = (now - start) >= timeout || unsafe { let ms = (timeout - (now - start)) as libc::DWORD; libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0 }; if timed_out { - return Err(util::timeout("connect timed out")) + return Err(sys_common::timeout("connect timed out")) } } @@ -349,10 +348,8 @@ impl UnixStream { _ => Ok(()) } } -} -impl rtio::RtioPipe for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { if self.read.is_none() { self.read = Some(try!(Event::new(true, false))); } @@ -368,7 +365,7 @@ impl rtio::RtioPipe for UnixStream { // See comments in close_read() about why this lock is necessary. let guard = unsafe { self.inner.lock.lock() }; if self.read_closed() { - return Err(util::eof()) + return Err(eof()) } // Issue a nonblocking requests, succeeding quickly if it happened to @@ -416,15 +413,15 @@ impl rtio::RtioPipe for UnixStream { // If the reading half is now closed, then we're done. If we woke up // because the writing half was closed, keep trying. if wait_succeeded.is_err() { - return Err(util::timeout("read timed out")) + return Err(sys_common::timeout("read timed out")) } if self.read_closed() { - return Err(util::eof()) + return Err(eof()) } } } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { + pub fn write(&mut self, buf: &[u8]) -> IoResult<()> { if self.write.is_none() { self.write = Some(try!(Event::new(true, false))); } @@ -458,11 +455,7 @@ impl rtio::RtioPipe for UnixStream { if ret == 0 { if err != libc::ERROR_IO_PENDING as uint { - return Err(IoError { - code: err as uint, - extra: 0, - detail: Some(os::error_string(err as uint)), - }) + return Err(decode_error_detailed(err as i32)) } // Process a timeout if one is pending let wait_succeeded = await(self.handle(), self.write_deadline, @@ -484,12 +477,12 @@ impl rtio::RtioPipe for UnixStream { let amt = offset + bytes_written as uint; return if amt > 0 { Err(IoError { - code: libc::ERROR_OPERATION_ABORTED as uint, - extra: amt, - detail: Some("short write during write".to_string()), + kind: io::ShortWrite(amt), + desc: "short write during write", + detail: None, }) } else { - Err(util::timeout("write timed out")) + Err(sys_common::timeout("write timed out")) } } if self.write_closed() { @@ -503,17 +496,7 @@ impl rtio::RtioPipe for UnixStream { Ok(()) } - fn clone(&self) -> Box<rtio::RtioPipe + Send> { - box UnixStream { - inner: self.inner.clone(), - read: None, - write: None, - read_deadline: 0, - write_deadline: 0, - } as Box<rtio::RtioPipe + Send> - } - - fn close_read(&mut self) -> IoResult<()> { + pub fn close_read(&mut self) -> IoResult<()> { // On windows, there's no actual shutdown() method for pipes, so we're // forced to emulate the behavior manually at the application level. To // do this, we need to both cancel any pending requests, as well as @@ -536,23 +519,35 @@ impl rtio::RtioPipe for UnixStream { self.cancel_io() } - fn close_write(&mut self) -> IoResult<()> { + pub fn close_write(&mut self) -> IoResult<()> { // see comments in close_read() for why this lock is necessary let _guard = unsafe { self.inner.lock.lock() }; self.inner.write_closed.store(true, atomic::SeqCst); self.cancel_io() } - fn set_timeout(&mut self, timeout: Option<u64>) { - let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_timeout(&mut self, timeout: Option<u64>) { + let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } - fn set_read_timeout(&mut self, timeout: Option<u64>) { - self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_read_timeout(&mut self, timeout: Option<u64>) { + self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); } - fn set_write_timeout(&mut self, timeout: Option<u64>) { - self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + pub fn set_write_timeout(&mut self, timeout: Option<u64>) { + self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream { + inner: self.inner.clone(), + read: None, + write: None, + read_deadline: 0, + write_deadline: 0, + } } } @@ -570,7 +565,7 @@ impl UnixListener { // Although we technically don't need the pipe until much later, we // create the initial handle up front to test the validity of the name // and such. - let addr_v = try!(to_utf16(addr)); + let addr_v = try!(to_utf16(addr.as_str())); let ret = unsafe { pipe(addr_v.as_ptr(), true) }; if ret == libc::INVALID_HANDLE_VALUE { Err(super::last_error()) @@ -579,7 +574,7 @@ impl UnixListener { } } - pub fn native_listen(self) -> IoResult<UnixAcceptor> { + pub fn listen(self) -> IoResult<UnixAcceptor> { Ok(UnixAcceptor { listener: self, event: try!(Event::new(true, false)), @@ -598,15 +593,6 @@ impl Drop for UnixListener { } } -impl rtio::RtioUnixListener for UnixListener { - fn listen(self: Box<UnixListener>) - -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> { - self.native_listen().map(|a| { - box a as Box<rtio::RtioUnixAcceptor + Send> - }) - } -} - pub struct UnixAcceptor { inner: Arc<AcceptorState>, listener: UnixListener, @@ -620,7 +606,7 @@ struct AcceptorState { } impl UnixAcceptor { - pub fn native_accept(&mut self) -> IoResult<UnixStream> { + pub fn accept(&mut self) -> IoResult<UnixStream> { // This function has some funky implementation details when working with // unix pipes. On windows, each server named pipe handle can be // connected to a one or zero clients. To the best of my knowledge, a @@ -657,9 +643,9 @@ impl UnixAcceptor { // If we've had an artificial call to close_accept, be sure to never // proceed in accepting new clients in the future - if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) } + if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) } - let name = try!(to_utf16(&self.listener.name)); + let name = try!(to_utf16(self.listener.name.as_str())); // Once we've got a "server handle", we need to wait for a client to // connect. The ConnectNamedPipe function will block this thread until @@ -691,7 +677,7 @@ impl UnixAcceptor { if wait_succeeded.is_ok() { err = unsafe { libc::GetLastError() }; } else { - return Err(util::timeout("accept timed out")) + return Err(sys_common::timeout("accept timed out")) } } else { // we succeeded, bypass the check below @@ -727,19 +713,28 @@ impl UnixAcceptor { write_deadline: 0, }) } -} -impl rtio::RtioUnixAcceptor for UnixAcceptor { - fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> { - self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>) + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0); } - fn set_timeout(&mut self, timeout: Option<u64>) { - self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { + c::SetEvent(self.inner.abort.handle()) + }; + if ret == 0 { + Err(super::last_error()) + } else { + Ok(()) + } } +} - fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { - let name = to_utf16(&self.listener.name).ok().unwrap(); - box UnixAcceptor { +impl Clone for UnixAcceptor { + fn clone(&self) -> UnixAcceptor { + let name = to_utf16(self.listener.name.as_str()).ok().unwrap(); + UnixAcceptor { inner: self.inner.clone(), event: Event::new(true, false).ok().unwrap(), deadline: 0, @@ -751,19 +746,6 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { p }, }, - } as Box<rtio::RtioUnixAcceptor + Send> - } - - fn close_accept(&mut self) -> IoResult<()> { - self.inner.closed.store(true, atomic::SeqCst); - let ret = unsafe { - c::SetEvent(self.inner.abort.handle()) - }; - if ret == 0 { - Err(super::last_error()) - } else { - Ok(()) } } } - diff --git a/src/libstd/sys/windows/tcp.rs b/src/libstd/sys/windows/tcp.rs new file mode 100644 index 00000000000..3baf2be08d2 --- /dev/null +++ b/src/libstd/sys/windows/tcp.rs @@ -0,0 +1,219 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use io::net::ip; +use io::IoResult; +use libc; +use mem; +use ptr; +use prelude::*; +use super::{last_error, last_net_error, retry, sock_t}; +use sync::{Arc, atomic}; +use sys::fs::FileDesc; +use sys::{mod, c, set_nonblocking, wouldblock, timer}; +use sys_common::{mod, timeout, eof}; +use sys_common::net::*; + +pub use sys_common::net::TcpStream; + +pub struct Event(c::WSAEVENT); + +impl Event { + pub fn new() -> IoResult<Event> { + let event = unsafe { c::WSACreateEvent() }; + if event == c::WSA_INVALID_EVENT { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = c::WSACloseEvent(self.handle()); } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// TCP listeners +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpListener { + inner: FileDesc, +} + +impl TcpListener { + pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> { + sys::init_net(); + + let fd = try!(socket(addr, libc::SOCK_STREAM)); + let ret = TcpListener { inner: FileDesc::new(fd as libc::c_int, true) }; + + let mut storage = unsafe { mem::zeroed() }; + let len = addr_to_sockaddr(addr, &mut storage); + let addrp = &storage as *const _ as *const libc::sockaddr; + + match unsafe { libc::bind(fd, addrp, len) } { + -1 => Err(last_net_error()), + _ => Ok(ret), + } + } + + pub fn fd(&self) -> sock_t { self.inner.fd as sock_t } + + pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_net_error()), + + _ => { + let accept = try!(Event::new()); + let ret = unsafe { + c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) + }; + if ret != 0 { + return Err(last_net_error()) + } + Ok(TcpAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + abort: try!(Event::new()), + accept: accept, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } + } + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +pub struct TcpAcceptor { + inner: Arc<AcceptorInner>, + deadline: u64, +} + +struct AcceptorInner { + listener: TcpListener, + abort: Event, + accept: Event, + closed: atomic::AtomicBool, +} + +impl TcpAcceptor { + pub fn fd(&self) -> sock_t { self.inner.listener.fd() } + + pub fn accept(&mut self) -> IoResult<TcpStream> { + // Unlink unix, windows cannot invoke `select` on arbitrary file + // descriptors like pipes, only sockets. Consequently, windows cannot + // use the same implementation as unix for accept() when close_accept() + // is considered. + // + // In order to implement close_accept() and timeouts, windows uses + // event handles. An acceptor-specific abort event is created which + // will only get set in close_accept(), and it will never be un-set. + // Additionally, another acceptor-specific event is associated with the + // FD_ACCEPT network event. + // + // These two events are then passed to WaitForMultipleEvents to see + // which one triggers first, and the timeout passed to this function is + // the local timeout for the acceptor. + // + // If the wait times out, then the accept timed out. If the wait + // succeeds with the abort event, then we were closed, and if the wait + // succeeds otherwise, then we do a nonblocking poll via `accept` to + // see if we can accept a connection. The connection is candidate to be + // stolen, so we do all of this in a loop as well. + let events = [self.inner.abort.handle(), self.inner.accept.handle()]; + + while !self.inner.closed.load(atomic::SeqCst) { + let ms = if self.deadline == 0 { + c::WSA_INFINITE as u64 + } else { + let now = timer::now(); + if self.deadline < now {0} else {self.deadline - now} + }; + let ret = unsafe { + c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, + ms as libc::DWORD, libc::FALSE) + }; + match ret { + c::WSA_WAIT_TIMEOUT => { + return Err(timeout("accept timed out")) + } + c::WSA_WAIT_FAILED => return Err(last_net_error()), + c::WSA_WAIT_EVENT_0 => break, + n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), + } + + let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; + let ret = unsafe { + c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) + }; + if ret != 0 { return Err(last_net_error()) } + + if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } + match unsafe { + libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) + } { + -1 if wouldblock() => {} + -1 => return Err(last_net_error()), + + // Accepted sockets inherit the same properties as the caller, + // so we need to deregister our event and switch the socket back + // to blocking mode + fd => { + let stream = TcpStream::new(fd); + let ret = unsafe { + c::WSAEventSelect(fd, events[1], 0) + }; + if ret != 0 { return Err(last_net_error()) } + try!(set_nonblocking(fd, false)); + return Ok(stream) + } + } + } + + Err(eof()) + } + + pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } + + pub fn set_timeout(&mut self, timeout: Option<u64>) { + self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0); + } + + pub fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; + if ret == libc::TRUE { + Ok(()) + } else { + Err(last_net_error()) + } + } +} + +impl Clone for TcpAcceptor { + fn clone(&self) -> TcpAcceptor { + TcpAcceptor { + inner: self.inner.clone(), + deadline: 0, + } + } +} diff --git a/src/libstd/sys/windows/udp.rs b/src/libstd/sys/windows/udp.rs new file mode 100644 index 00000000000..50f8fb828ad --- /dev/null +++ b/src/libstd/sys/windows/udp.rs @@ -0,0 +1,11 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +pub use sys_common::net::UdpSocket; |
