about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libnative/io/mod.rs54
-rw-r--r--src/librustrt/rtio.rs127
-rw-r--r--src/libstd/io/net/addrinfo.rs32
-rw-r--r--src/libstd/io/net/mod.rs45
-rw-r--r--src/libstd/io/net/pipe.rs65
-rw-r--r--src/libstd/io/net/tcp.rs105
-rw-r--r--src/libstd/io/net/udp.rs60
-rw-r--r--src/libstd/io/pipe.rs58
-rw-r--r--src/libstd/os.rs35
-rw-r--r--src/libstd/sys/common/net.rs (renamed from src/libnative/io/net.rs)1110
-rw-r--r--src/libstd/sys/unix/mod.rs50
-rw-r--r--src/libstd/sys/unix/os.rs11
-rw-r--r--src/libstd/sys/unix/pipe.rs (renamed from src/libnative/io/pipe_unix.rs)146
-rw-r--r--src/libstd/sys/unix/tcp.rs157
-rw-r--r--src/libstd/sys/unix/udp.rs11
-rw-r--r--src/libstd/sys/windows/mod.rs12
-rw-r--r--src/libstd/sys/windows/pipe.rs (renamed from src/libnative/io/pipe_windows.rs)164
-rw-r--r--src/libstd/sys/windows/tcp.rs219
-rw-r--r--src/libstd/sys/windows/udp.rs11
19 files changed, 1183 insertions, 1289 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index baf58b83dcd..2a76bc29f7c 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -35,8 +35,6 @@ pub use self::process::Process;
 mod helper_thread;
 
 // Native I/O implementations
-pub mod addrinfo;
-pub mod net;
 pub mod process;
 mod util;
 
@@ -53,14 +51,6 @@ pub mod timer;
 #[path = "timer_windows.rs"]
 pub mod timer;
 
-#[cfg(unix)]
-#[path = "pipe_unix.rs"]
-pub mod pipe;
-
-#[cfg(windows)]
-#[path = "pipe_windows.rs"]
-pub mod pipe;
-
 #[cfg(windows)]
 #[path = "tty_windows.rs"]
 mod tty;
@@ -126,52 +116,11 @@ pub struct IoFactory {
 
 impl IoFactory {
     pub fn new() -> IoFactory {
-        net::init();
         IoFactory { _cannot_construct_outside_of_this_module: () }
     }
 }
 
 impl rtio::IoFactory for IoFactory {
-    // networking
-    fn tcp_connect(&mut self, addr: rtio::SocketAddr,
-                   timeout: Option<u64>)
-        -> IoResult<Box<rtio::RtioTcpStream + Send>>
-    {
-        net::TcpStream::connect(addr, timeout).map(|s| {
-            box s as Box<rtio::RtioTcpStream + Send>
-        })
-    }
-    fn tcp_bind(&mut self, addr: rtio::SocketAddr)
-                -> IoResult<Box<rtio::RtioTcpListener + Send>> {
-        net::TcpListener::bind(addr).map(|s| {
-            box s as Box<rtio::RtioTcpListener + Send>
-        })
-    }
-    fn udp_bind(&mut self, addr: rtio::SocketAddr)
-                -> IoResult<Box<rtio::RtioUdpSocket + Send>> {
-        net::UdpSocket::bind(addr).map(|u| {
-            box u as Box<rtio::RtioUdpSocket + Send>
-        })
-    }
-    fn unix_bind(&mut self, path: &CString)
-                 -> IoResult<Box<rtio::RtioUnixListener + Send>> {
-        pipe::UnixListener::bind(path).map(|s| {
-            box s as Box<rtio::RtioUnixListener + Send>
-        })
-    }
-    fn unix_connect(&mut self, path: &CString,
-                    timeout: Option<u64>) -> IoResult<Box<rtio::RtioPipe + Send>> {
-        pipe::UnixStream::connect(path, timeout).map(|s| {
-            box s as Box<rtio::RtioPipe + Send>
-        })
-    }
-    fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
-                          hint: Option<rtio::AddrinfoHint>)
-        -> IoResult<Vec<rtio::AddrinfoInfo>>
-    {
-        addrinfo::GetAddrInfoRequest::run(host, servname, hint)
-    }
-
     // misc
     fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer + Send>> {
         timer::Timer::new().map(|t| box t as Box<rtio::RtioTimer + Send>)
@@ -189,9 +138,6 @@ impl rtio::IoFactory for IoFactory {
     fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> {
         process::Process::kill(pid, signum)
     }
-    fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<rtio::RtioPipe + Send>> {
-        Ok(box file::FileDesc::new(fd, true) as Box<rtio::RtioPipe + Send>)
-    }
     #[cfg(unix)]
     fn tty_open(&mut self, fd: c_int, _readable: bool)
                 -> IoResult<Box<rtio::RtioTTY + Send>> {
diff --git a/src/librustrt/rtio.rs b/src/librustrt/rtio.rs
index 1f3ef60e6fb..3ebfcaea687 100644
--- a/src/librustrt/rtio.rs
+++ b/src/librustrt/rtio.rs
@@ -13,13 +13,9 @@
 use core::prelude::*;
 use alloc::boxed::Box;
 use collections::string::String;
-use collections::vec::Vec;
-use core::fmt;
 use core::mem;
 use libc::c_int;
-use libc;
 
-use c_str::CString;
 use local::Local;
 use task::Task;
 
@@ -173,87 +169,15 @@ impl<'a> LocalIo<'a> {
 }
 
 pub trait IoFactory {
-    // networking
-    fn tcp_connect(&mut self, addr: SocketAddr,
-                   timeout: Option<u64>) -> IoResult<Box<RtioTcpStream + Send>>;
-    fn tcp_bind(&mut self, addr: SocketAddr)
-                -> IoResult<Box<RtioTcpListener + Send>>;
-    fn udp_bind(&mut self, addr: SocketAddr)
-                -> IoResult<Box<RtioUdpSocket + Send>>;
-    fn unix_bind(&mut self, path: &CString)
-                 -> IoResult<Box<RtioUnixListener + Send>>;
-    fn unix_connect(&mut self, path: &CString,
-                    timeout: Option<u64>) -> IoResult<Box<RtioPipe + Send>>;
-    fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
-                          hint: Option<AddrinfoHint>)
-                          -> IoResult<Vec<AddrinfoInfo>>;
-
-    // misc
     fn timer_init(&mut self) -> IoResult<Box<RtioTimer + Send>>;
     fn spawn(&mut self, cfg: ProcessConfig)
             -> IoResult<(Box<RtioProcess + Send>,
                          Vec<Option<Box<RtioPipe + Send>>>)>;
     fn kill(&mut self, pid: libc::pid_t, signal: int) -> IoResult<()>;
-    fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<RtioPipe + Send>>;
     fn tty_open(&mut self, fd: c_int, readable: bool)
             -> IoResult<Box<RtioTTY + Send>>;
 }
 
-pub trait RtioTcpListener : RtioSocket {
-    fn listen(self: Box<Self>) -> IoResult<Box<RtioTcpAcceptor + Send>>;
-}
-
-pub trait RtioTcpAcceptor : RtioSocket {
-    fn accept(&mut self) -> IoResult<Box<RtioTcpStream + Send>>;
-    fn accept_simultaneously(&mut self) -> IoResult<()>;
-    fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
-    fn set_timeout(&mut self, timeout: Option<u64>);
-    fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
-    fn close_accept(&mut self) -> IoResult<()>;
-}
-
-pub trait RtioTcpStream : RtioSocket {
-    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint>;
-    fn write(&mut self, buf: &[u8]) -> IoResult<()>;
-    fn peer_name(&mut self) -> IoResult<SocketAddr>;
-    fn control_congestion(&mut self) -> IoResult<()>;
-    fn nodelay(&mut self) -> IoResult<()>;
-    fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()>;
-    fn letdie(&mut self) -> IoResult<()>;
-    fn clone(&self) -> Box<RtioTcpStream + Send>;
-    fn close_write(&mut self) -> IoResult<()>;
-    fn close_read(&mut self) -> IoResult<()>;
-    fn set_timeout(&mut self, timeout_ms: Option<u64>);
-    fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
-    fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
-}
-
-pub trait RtioSocket {
-    fn socket_name(&mut self) -> IoResult<SocketAddr>;
-}
-
-pub trait RtioUdpSocket : RtioSocket {
-    fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)>;
-    fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()>;
-
-    fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()>;
-    fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()>;
-
-    fn loop_multicast_locally(&mut self) -> IoResult<()>;
-    fn dont_loop_multicast_locally(&mut self) -> IoResult<()>;
-
-    fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()>;
-    fn time_to_live(&mut self, ttl: int) -> IoResult<()>;
-
-    fn hear_broadcasts(&mut self) -> IoResult<()>;
-    fn ignore_broadcasts(&mut self) -> IoResult<()>;
-
-    fn clone(&self) -> Box<RtioUdpSocket + Send>;
-    fn set_timeout(&mut self, timeout_ms: Option<u64>);
-    fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
-    fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
-}
-
 pub trait RtioTimer {
     fn sleep(&mut self, msecs: u64);
     fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>);
@@ -313,54 +237,3 @@ pub struct IoError {
 }
 
 pub type IoResult<T> = Result<T, IoError>;
-
-#[deriving(PartialEq, Eq)]
-pub enum IpAddr {
-    Ipv4Addr(u8, u8, u8, u8),
-    Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16),
-}
-
-impl fmt::Show for IpAddr {
-    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
-        match *self {
-            Ipv4Addr(a, b, c, d) => write!(fmt, "{}.{}.{}.{}", a, b, c, d),
-            Ipv6Addr(a, b, c, d, e, f, g, h) => {
-                write!(fmt,
-                       "{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}",
-                       a, b, c, d, e, f, g, h)
-            }
-        }
-    }
-}
-
-#[deriving(PartialEq, Eq)]
-pub struct SocketAddr {
-    pub ip: IpAddr,
-    pub port: u16,
-}
-
-pub enum StdioContainer {
-    Ignored,
-    InheritFd(i32),
-    CreatePipe(bool, bool),
-}
-
-pub enum ProcessExit {
-    ExitStatus(int),
-    ExitSignal(int),
-}
-
-pub struct AddrinfoHint {
-    pub family: uint,
-    pub socktype: uint,
-    pub protocol: uint,
-    pub flags: uint,
-}
-
-pub struct AddrinfoInfo {
-    pub address: SocketAddr,
-    pub family: uint,
-    pub socktype: uint,
-    pub protocol: uint,
-    pub flags: uint,
-}
diff --git a/src/libstd/io/net/addrinfo.rs b/src/libstd/io/net/addrinfo.rs
index 3c72f58b10d..22775d54eff 100644
--- a/src/libstd/io/net/addrinfo.rs
+++ b/src/libstd/io/net/addrinfo.rs
@@ -20,12 +20,10 @@ getaddrinfo()
 #![allow(missing_docs)]
 
 use iter::Iterator;
-use io::{IoResult, IoError};
+use io::{IoResult};
 use io::net::ip::{SocketAddr, IpAddr};
 use option::{Option, Some, None};
-use result::{Ok, Err};
-use rt::rtio::{IoFactory, LocalIo};
-use rt::rtio;
+use sys;
 use vec::Vec;
 
 /// Hints to the types of sockets that are desired when looking up hosts
@@ -94,31 +92,7 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> {
 #[allow(unused_variables)]
 fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>)
           -> IoResult<Vec<Info>> {
-    let hint = hint.map(|Hint { family, socktype, protocol, flags }| {
-        rtio::AddrinfoHint {
-            family: family,
-            socktype: 0, // FIXME: this should use the above variable
-            protocol: 0, // FIXME: this should use the above variable
-            flags: flags,
-        }
-    });
-    match LocalIo::maybe_raise(|io| {
-        io.get_host_addresses(hostname, servname, hint)
-    }) {
-        Ok(v) => Ok(v.into_iter().map(|info| {
-            Info {
-                address: SocketAddr {
-                    ip: super::from_rtio(info.address.ip),
-                    port: info.address.port,
-                },
-                family: info.family,
-                socktype: None, // FIXME: this should use the above variable
-                protocol: None, // FIXME: this should use the above variable
-                flags: info.flags,
-            }
-        }).collect()),
-        Err(e) => Err(IoError::from_rtio_error(e)),
-    }
+    sys::addrinfo::get_host_addresses(hostname, servname, hint)
 }
 
 // Ignored on android since we cannot give tcp/ip
diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs
index b9b50a55a10..5b1747876d7 100644
--- a/src/libstd/io/net/mod.rs
+++ b/src/libstd/io/net/mod.rs
@@ -12,9 +12,8 @@
 
 use io::{IoError, IoResult, InvalidInput};
 use option::None;
-use result::{Result, Ok, Err};
-use rt::rtio;
-use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr, SocketAddr, ToSocketAddr};
+use result::{Ok, Err};
+use self::ip::{SocketAddr, ToSocketAddr};
 
 pub use self::addrinfo::get_host_addresses;
 
@@ -24,46 +23,6 @@ pub mod udp;
 pub mod ip;
 pub mod pipe;
 
-fn to_rtio(ip: IpAddr) -> rtio::IpAddr {
-    match ip {
-        Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d),
-        Ipv6Addr(a, b, c, d, e, f, g, h) => {
-            rtio::Ipv6Addr(a, b, c, d, e, f, g, h)
-        }
-    }
-}
-
-fn from_rtio(ip: rtio::IpAddr) -> IpAddr {
-    match ip {
-        rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d),
-        rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
-            Ipv6Addr(a, b, c, d, e, f, g, h)
-        }
-    }
-}
-
-fn with_addresses_io<A: ToSocketAddr, T>(
-    addr: A,
-    action: |&mut rtio::IoFactory, rtio::SocketAddr| -> Result<T, rtio::IoError>
-) -> Result<T, IoError> {
-    const DEFAULT_ERROR: IoError = IoError {
-        kind: InvalidInput,
-        desc: "no addresses found for hostname",
-        detail: None
-    };
-
-    let addresses = try!(addr.to_socket_addr_all());
-    let mut err = DEFAULT_ERROR;
-    for addr in addresses.into_iter() {
-        let addr = rtio::SocketAddr { ip: to_rtio(addr.ip), port: addr.port };
-        match rtio::LocalIo::maybe_raise(|io| action(io, addr)) {
-            Ok(r) => return Ok(r),
-            Err(e) => err = IoError::from_rtio_error(e)
-        }
-    }
-    Err(err)
-}
-
 fn with_addresses<A: ToSocketAddr, T>(addr: A, action: |SocketAddr| -> IoResult<T>)
     -> IoResult<T> {
     const DEFAULT_ERROR: IoError = IoError {
diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs
index 8c7deadebea..111b0f2b081 100644
--- a/src/libstd/io/net/pipe.rs
+++ b/src/libstd/io/net/pipe.rs
@@ -26,17 +26,20 @@ instances as clients.
 
 use prelude::*;
 
-use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error};
-use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
-use rt::rtio::{RtioUnixAcceptor, RtioPipe};
+use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
 use time::Duration;
 
+use sys::pipe::UnixStream as UnixStreamImp;
+use sys::pipe::UnixListener as UnixListenerImp;
+use sys::pipe::UnixAcceptor as UnixAcceptorImp;
+
 /// A stream which communicates over a named pipe.
 pub struct UnixStream {
-    obj: Box<RtioPipe + Send>,
+    inner: UnixStreamImp,
 }
 
 impl UnixStream {
+
     /// Connect to a pipe named by `path`. This will attempt to open a
     /// connection to the underlying socket.
     ///
@@ -53,9 +56,8 @@ impl UnixStream {
     /// stream.write([1, 2, 3]);
     /// ```
     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
-        LocalIo::maybe_raise(|io| {
-            io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
-        }).map_err(IoError::from_rtio_error)
+        UnixStreamImp::connect(&path.to_c_str(), None)
+            .map(|inner| UnixStream { inner: inner })
     }
 
     /// Connect to a pipe named by `path`, timing out if the specified number of
@@ -73,10 +75,8 @@ impl UnixStream {
             return Err(standard_error(TimedOut));
         }
 
-        LocalIo::maybe_raise(|io| {
-            let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64));
-            s.map(|p| UnixStream { obj: p })
-        }).map_err(IoError::from_rtio_error)
+        UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64))
+            .map(|inner| UnixStream { inner: inner })
     }
 
 
@@ -88,7 +88,7 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_read(&mut self) -> IoResult<()> {
-        self.obj.close_read().map_err(IoError::from_rtio_error)
+        self.inner.close_read()
     }
 
     /// Closes the writing half of this connection.
@@ -99,7 +99,7 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> {
-        self.obj.close_write().map_err(IoError::from_rtio_error)
+        self.inner.close_write()
     }
 
     /// Sets the read/write timeout for this socket.
@@ -107,7 +107,7 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the read timeout for this socket.
@@ -115,7 +115,7 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the write timeout for this socket.
@@ -123,36 +123,35 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
 impl Clone for UnixStream {
     fn clone(&self) -> UnixStream {
-        UnixStream { obj: self.obj.clone() }
+        UnixStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for UnixStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for UnixStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }
 
 /// A value that can listen for incoming named pipe connection requests.
 pub struct UnixListener {
     /// The internal, opaque runtime Unix listener.
-    obj: Box<RtioUnixListener + Send>,
+    inner: UnixListenerImp,
 }
 
 impl UnixListener {
-
     /// Creates a new listener, ready to receive incoming connections on the
     /// specified socket. The server will be named by `path`.
     ///
@@ -175,24 +174,22 @@ impl UnixListener {
     /// # }
     /// ```
     pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
-        LocalIo::maybe_raise(|io| {
-            io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
-        }).map_err(IoError::from_rtio_error)
+        UnixListenerImp::bind(&path.to_c_str())
+            .map(|inner| UnixListener { inner: inner })
     }
 }
 
 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
     fn listen(self) -> IoResult<UnixAcceptor> {
-        self.obj.listen().map(|obj| {
-            UnixAcceptor { obj: obj }
-        }).map_err(IoError::from_rtio_error)
+        self.inner.listen()
+            .map(|inner| UnixAcceptor { inner: inner })
     }
 }
 
 /// A value that can accept named pipe connections, returned from `listen()`.
 pub struct UnixAcceptor {
     /// The internal, opaque runtime Unix acceptor.
-    obj: Box<RtioUnixAcceptor + Send>,
+    inner: UnixAcceptorImp
 }
 
 impl UnixAcceptor {
@@ -210,7 +207,7 @@ impl UnixAcceptor {
     #[experimental = "the name and arguments to this function are likely \
                       to change"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Closes the accepting capabilities of this acceptor.
@@ -219,15 +216,15 @@ impl UnixAcceptor {
     /// more information can be found in that documentation.
     #[experimental]
     pub fn close_accept(&mut self) -> IoResult<()> {
-        self.obj.close_accept().map_err(IoError::from_rtio_error)
+        self.inner.close_accept()
     }
 }
 
 impl Acceptor<UnixStream> for UnixAcceptor {
     fn accept(&mut self) -> IoResult<UnixStream> {
-        self.obj.accept().map(|s| {
-            UnixStream { obj: s }
-        }).map_err(IoError::from_rtio_error)
+        self.inner.accept().map(|s| {
+            UnixStream { inner: s }
+        })
     }
 }
 
@@ -246,7 +243,7 @@ impl Clone for UnixAcceptor {
     /// This function is useful for creating a handle to invoke `close_accept`
     /// on to wake up any other task blocked in `accept`.
     fn clone(&self) -> UnixAcceptor {
-        UnixAcceptor { obj: self.obj.clone() }
+        UnixAcceptor { inner: self.inner.clone() }
     }
 }
 
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 928c8586739..2545e07cbb5 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -20,19 +20,17 @@
 use clone::Clone;
 use io::IoResult;
 use iter::Iterator;
-use result::{Ok,Err};
+use result::Err;
 use io::net::ip::{SocketAddr, ToSocketAddr};
-use io::IoError;
 use io::{Reader, Writer, Listener, Acceptor};
 use io::{standard_error, TimedOut};
-use kinds::Send;
 use option::{None, Some, Option};
-use boxed::Box;
-use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener};
-use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
-use rt::rtio;
 use time::Duration;
 
+use sys::tcp::TcpStream as TcpStreamImp;
+use sys::tcp::TcpListener as TcpListenerImp;
+use sys::tcp::TcpAcceptor as TcpAcceptorImp;
+
 /// A structure which represents a TCP stream between a local socket and a
 /// remote socket.
 ///
@@ -50,12 +48,12 @@ use time::Duration;
 /// drop(stream); // close the connection
 /// ```
 pub struct TcpStream {
-    obj: Box<RtioTcpStream + Send>,
+    inner: TcpStreamImp,
 }
 
 impl TcpStream {
-    fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
-        TcpStream { obj: s }
+    fn new(s: TcpStreamImp) -> TcpStream {
+        TcpStream { inner: s }
     }
 
     /// Open a TCP connection to a remote host.
@@ -64,7 +62,9 @@ impl TcpStream {
     /// trait can be supplied for the address; see this trait documentation for
     /// concrete examples.
     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
-        super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new))
+        super::with_addresses(addr, |addr| {
+            TcpStreamImp::connect(addr, None).map(TcpStream::new)
+        })
     }
 
     /// Creates a TCP connection to a remote socket address, timing out after
@@ -86,39 +86,26 @@ impl TcpStream {
             return Err(standard_error(TimedOut));
         }
 
-        super::with_addresses_io(addr, |io, addr|
-            io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
-        )
+        super::with_addresses(addr, |addr| {
+            TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
+                .map(TcpStream::new)
+        })
     }
 
     /// Returns the socket address of the remote peer of this TCP connection.
     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.peer_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.peer_name()
     }
 
     /// Returns the socket address of the local half of this TCP connection.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.socket_name()
     }
 
     /// Sets the nodelay flag on this connection to the boolean specified
     #[experimental]
     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
-        if nodelay {
-            self.obj.nodelay()
-        } else {
-            self.obj.control_congestion()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_nodelay(nodelay)
     }
 
     /// Sets the keepalive timeout to the timeout specified.
@@ -128,10 +115,7 @@ impl TcpStream {
     /// specified time, in seconds.
     #[experimental]
     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
-        match delay_in_seconds {
-            Some(i) => self.obj.keepalive(i),
-            None => self.obj.letdie(),
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_keepalive(delay_in_seconds)
     }
 
     /// Closes the reading half of this connection.
@@ -165,7 +149,7 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_read(&mut self) -> IoResult<()> {
-        self.obj.close_read().map_err(IoError::from_rtio_error)
+        self.inner.close_read()
     }
 
     /// Closes the writing half of this connection.
@@ -176,7 +160,7 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> {
-        self.obj.close_write().map_err(IoError::from_rtio_error)
+        self.inner.close_write()
     }
 
     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
@@ -198,7 +182,7 @@ impl TcpStream {
     /// take a look at `set_read_timeout` and `set_write_timeout`.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the timeout for read operations on this stream.
@@ -215,7 +199,7 @@ impl TcpStream {
     /// during the timeout period.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the timeout for write operations on this stream.
@@ -242,7 +226,7 @@ impl TcpStream {
     /// asynchronous fashion after the call to write returns.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
@@ -256,19 +240,19 @@ impl Clone for TcpStream {
     /// Instead, the first read will receive the first packet received, and the
     /// second read will receive the second packet.
     fn clone(&self) -> TcpStream {
-        TcpStream { obj: self.obj.clone() }
+        TcpStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for TcpStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }
 
@@ -309,7 +293,7 @@ impl Writer for TcpStream {
 /// # }
 /// ```
 pub struct TcpListener {
-    obj: Box<RtioTcpListener + Send>,
+    inner: TcpListenerImp,
 }
 
 impl TcpListener {
@@ -324,26 +308,20 @@ impl TcpListener {
     /// The address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
-        super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l }))
+        super::with_addresses(addr, |addr| {
+            TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
+        })
     }
 
     /// Returns the local socket address of this listener.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.socket_name()
     }
 }
 
 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
     fn listen(self) -> IoResult<TcpAcceptor> {
-        match self.obj.listen() {
-            Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
     }
 }
 
@@ -351,7 +329,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
 /// `TcpStream` instances.
 pub struct TcpAcceptor {
-    obj: Box<RtioTcpAcceptor + Send>,
+    inner: TcpAcceptorImp,
 }
 
 impl TcpAcceptor {
@@ -399,7 +377,7 @@ impl TcpAcceptor {
     /// ```
     #[experimental = "the type of the argument and name of this function are \
                       subject to change"]
-    pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
+    pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
 
     /// Closes the accepting capabilities of this acceptor.
     ///
@@ -445,16 +423,13 @@ impl TcpAcceptor {
     /// ```
     #[experimental]
     pub fn close_accept(&mut self) -> IoResult<()> {
-        self.obj.close_accept().map_err(IoError::from_rtio_error)
+        self.inner.close_accept()
     }
 }
 
 impl Acceptor<TcpStream> for TcpAcceptor {
     fn accept(&mut self) -> IoResult<TcpStream> {
-        match self.obj.accept(){
-            Ok(s) => Ok(TcpStream::new(s)),
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.accept().map(TcpStream::new)
     }
 }
 
@@ -473,7 +448,7 @@ impl Clone for TcpAcceptor {
     /// This function is useful for creating a handle to invoke `close_accept`
     /// on to wake up any other task blocked in `accept`.
     fn clone(&self) -> TcpAcceptor {
-        TcpAcceptor { obj: self.obj.clone() }
+        TcpAcceptor { inner: self.inner.clone() }
     }
 }
 
@@ -1112,8 +1087,6 @@ mod test {
 
     #[test]
     fn shutdown_smoke() {
-        use rt::rtio::RtioTcpStream;
-
         let addr = next_test_ip4();
         let a = TcpListener::bind(addr).unwrap().listen();
         spawn(proc() {
@@ -1124,7 +1097,7 @@ mod test {
         });
 
         let mut s = TcpStream::connect(addr).unwrap();
-        assert!(s.obj.close_write().is_ok());
+        assert!(s.inner.close_write().is_ok());
         assert!(s.write([1]).is_err());
         assert_eq!(s.read_to_end(), Ok(vec!(1)));
     }
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index 4ae054beadb..31b61989647 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -17,13 +17,10 @@
 
 use clone::Clone;
 use io::net::ip::{SocketAddr, IpAddr, ToSocketAddr};
-use io::{Reader, Writer, IoResult, IoError};
-use kinds::Send;
-use boxed::Box;
+use io::{Reader, Writer, IoResult};
 use option::Option;
 use result::{Ok, Err};
-use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory};
-use rt::rtio;
+use sys::udp::UdpSocket as UdpSocketImp;
 
 /// A User Datagram Protocol socket.
 ///
@@ -60,7 +57,7 @@ use rt::rtio;
 /// }
 /// ```
 pub struct UdpSocket {
-    obj: Box<RtioUdpSocket + Send>,
+    inner: UdpSocketImp,
 }
 
 impl UdpSocket {
@@ -69,18 +66,15 @@ impl UdpSocket {
     /// Address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> {
-        super::with_addresses_io(addr, |io, addr| io.udp_bind(addr).map(|s| UdpSocket { obj: s }))
+        super::with_addresses(addr, |addr| {
+            UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s })
+        })
     }
 
     /// Receives data from the socket. On success, returns the number of bytes
     /// read and the address from whence the data came.
     pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> {
-        match self.obj.recv_from(buf) {
-            Ok((amt, rtio::SocketAddr { ip, port })) => {
-                Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port }))
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.recv_from(buf)
     }
 
     /// Sends data on the socket to the given address. Returns nothing on
@@ -89,10 +83,7 @@ impl UdpSocket {
     /// Address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> {
-        super::with_addresses(addr, |addr| self.obj.send_to(buf, rtio::SocketAddr {
-            ip: super::to_rtio(addr.ip),
-            port: addr.port,
-        }).map_err(IoError::from_rtio_error))
+        super::with_addresses(addr, |addr| self.inner.send_to(buf, addr))
     }
 
     /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
@@ -112,24 +103,19 @@ impl UdpSocket {
 
     /// Returns the socket address that this socket was created from.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }),
-            Err(e) => Err(IoError::from_rtio_error(e))
-        }
+        self.inner.socket_name()
     }
 
     /// Joins a multicast IP address (becomes a member of it)
     #[experimental]
     pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
-        let e = self.obj.join_multicast(super::to_rtio(multi));
-        e.map_err(IoError::from_rtio_error)
+        self.inner.join_multicast(multi)
     }
 
     /// Leaves a multicast IP address (drops membership from it)
     #[experimental]
     pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
-        let e = self.obj.leave_multicast(super::to_rtio(multi));
-        e.map_err(IoError::from_rtio_error)
+        self.inner.leave_multicast(multi)
     }
 
     /// Set the multicast loop flag to the specified value
@@ -137,33 +123,25 @@ impl UdpSocket {
     /// This lets multicast packets loop back to local sockets (if enabled)
     #[experimental]
     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
-        if on {
-            self.obj.loop_multicast_locally()
-        } else {
-            self.obj.dont_loop_multicast_locally()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_multicast_loop(on)
     }
 
     /// Sets the multicast TTL
     #[experimental]
     pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
-        self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error)
+        self.inner.multicast_time_to_live(ttl)
     }
 
     /// Sets this socket's TTL
     #[experimental]
     pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
-        self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error)
+        self.inner.time_to_live(ttl)
     }
 
     /// Sets the broadcast flag on or off
     #[experimental]
     pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> {
-        if broadcast {
-            self.obj.hear_broadcasts()
-        } else {
-            self.obj.ignore_broadcasts()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_broadcast(broadcast)
     }
 
     /// Sets the read/write timeout for this socket.
@@ -171,7 +149,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the read timeout for this socket.
@@ -179,7 +157,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the write timeout for this socket.
@@ -187,7 +165,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
@@ -201,7 +179,7 @@ impl Clone for UdpSocket {
     /// received, and the second read will receive the second packet.
     fn clone(&self) -> UdpSocket {
         UdpSocket {
-            obj: self.obj.clone(),
+            inner: self.inner.clone(),
         }
     }
 }
diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs
index c77cffd561e..64b2518fab1 100644
--- a/src/libstd/io/pipe.rs
+++ b/src/libstd/io/pipe.rs
@@ -17,15 +17,17 @@
 
 use prelude::*;
 
-use io::{IoResult, IoError};
+use io::IoResult;
 use libc;
-use os;
-use rt::rtio::{RtioPipe, LocalIo};
+use sync::Arc;
+
+use sys_common;
+use sys;
+use sys::fs::FileDesc as FileDesc;
 
 /// A synchronous, in-memory pipe.
 pub struct PipeStream {
-    /// The internal, opaque runtime pipe object.
-    obj: Box<RtioPipe + Send>,
+    inner: Arc<FileDesc>
 }
 
 pub struct PipePair {
@@ -55,14 +57,14 @@ impl PipeStream {
     /// }
     /// ```
     pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
-        LocalIo::maybe_raise(|io| {
-            io.pipe_open(fd).map(|obj| PipeStream { obj: obj })
-        }).map_err(IoError::from_rtio_error)
+        Ok(PipeStream::from_filedesc(FileDesc::new(fd, true)))
     }
 
+    // FIXME: expose this some other way
+    /// Wrap a FileDesc directly, taking ownership.
     #[doc(hidden)]
-    pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream {
-        PipeStream { obj: inner }
+    pub fn from_filedesc(fd: FileDesc) -> PipeStream {
+        PipeStream { inner: Arc::new(fd) }
     }
 
     /// Creates a pair of in-memory OS pipes for a unidirectional communication
@@ -76,43 +78,35 @@ impl PipeStream {
     /// This function can fail to succeed if the underlying OS has run out of
     /// available resources to allocate a new pipe.
     pub fn pair() -> IoResult<PipePair> {
-        struct Closer { fd: libc::c_int }
-
-        let os::Pipe { reader, writer } = try!(unsafe { os::pipe() });
-        let mut reader = Closer { fd: reader };
-        let mut writer = Closer { fd: writer };
-
-        let io_reader = try!(PipeStream::open(reader.fd));
-        reader.fd = -1;
-        let io_writer = try!(PipeStream::open(writer.fd));
-        writer.fd = -1;
-        return Ok(PipePair { reader: io_reader, writer: io_writer });
-
-        impl Drop for Closer {
-            fn drop(&mut self) {
-                if self.fd != -1 {
-                    let _ = unsafe { libc::close(self.fd) };
-                }
-            }
-        }
+        let (reader, writer) = try!(unsafe { sys::os::pipe() });
+        Ok(PipePair {
+            reader: PipeStream::from_filedesc(reader),
+            writer: PipeStream::from_filedesc(writer),
+        })
+    }
+}
+
+impl sys_common::AsFileDesc for PipeStream {
+    fn as_fd(&self) -> &sys::fs::FileDesc {
+        &*self.inner
     }
 }
 
 impl Clone for PipeStream {
     fn clone(&self) -> PipeStream {
-        PipeStream { obj: self.obj.clone() }
+        PipeStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for PipeStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for PipeStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }
 
diff --git a/src/libstd/os.rs b/src/libstd/os.rs
index 175e23bf819..ea42117bab6 100644
--- a/src/libstd/os.rs
+++ b/src/libstd/os.rs
@@ -43,6 +43,7 @@ use ops::Drop;
 use option::{Some, None, Option};
 use os;
 use path::{Path, GenericPath, BytesContainer};
+use sys;
 use sys::os as os_imp;
 use ptr::RawPtr;
 use ptr;
@@ -603,35 +604,11 @@ pub struct Pipe {
 /// descriptors to be closed, the file descriptors will leak. For safe handling
 /// of this scenario, use `std::io::PipeStream` instead.
 pub unsafe fn pipe() -> IoResult<Pipe> {
-    return _pipe();
-
-    #[cfg(unix)]
-    unsafe fn _pipe() -> IoResult<Pipe> {
-        let mut fds = [0, ..2];
-        match libc::pipe(fds.as_mut_ptr()) {
-            0 => Ok(Pipe { reader: fds[0], writer: fds[1] }),
-            _ => Err(IoError::last_error()),
-        }
-    }
-
-    #[cfg(windows)]
-    unsafe fn _pipe() -> IoResult<Pipe> {
-        // Windows pipes work subtly differently than unix pipes, and their
-        // inheritance has to be handled in a different way that I do not
-        // fully understand. Here we explicitly make the pipe non-inheritable,
-        // which means to pass it to a subprocess they need to be duplicated
-        // first, as in std::run.
-        let mut fds = [0, ..2];
-        match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint,
-                         (libc::O_BINARY | libc::O_NOINHERIT) as c_int) {
-            0 => {
-                assert!(fds[0] != -1 && fds[0] != 0);
-                assert!(fds[1] != -1 && fds[1] != 0);
-                Ok(Pipe { reader: fds[0], writer: fds[1] })
-            }
-            _ => Err(IoError::last_error()),
-        }
-    }
+    let (reader, writer) = try!(sys::os::pipe());
+    Ok(Pipe {
+        reader: reader.unwrap(),
+        writer: writer.unwrap(),
+    })
 }
 
 /// Returns the proper dll filename for the given basename of a file
diff --git a/src/libnative/io/net.rs b/src/libstd/sys/common/net.rs
index a4b97a3eb84..0559005100f 100644
--- a/src/libnative/io/net.rs
+++ b/src/libstd/sys/common/net.rs
@@ -9,21 +9,26 @@
 // except according to those terms.
 
 use alloc::arc::Arc;
-use libc;
-use std::mem;
-use std::ptr;
-use std::rt::mutex;
-use std::rt::rtio::{mod, IoResult, IoError};
-use std::sync::atomic;
-
-use super::{retry, keep_going};
-use super::c;
-use super::util;
-
-#[cfg(unix)] use super::process;
-#[cfg(unix)] use super::file::FileDesc;
-
-pub use self::os::{init, sock_t, last_error};
+use libc::{mod, c_char, c_int};
+use mem;
+use ptr::{mod, null, null_mut};
+use rt::mutex;
+use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
+use io::net::addrinfo;
+use io::{IoResult, IoError};
+use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock,
+          wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval,
+          decode_error_detailed};
+use sys_common::{mod, keep_going, short_write, timeout};
+use prelude::*;
+use cmp;
+use io;
+
+#[deriving(Show)]
+pub enum SocketStatus {
+    Readable,
+    Writable,
+}
 
 ////////////////////////////////////////////////////////////////////////////////
 // sockaddr and misc bindings
@@ -36,14 +41,14 @@ pub fn ntohs(u: u16) -> u16 {
     Int::from_be(u)
 }
 
-enum InAddr {
+pub enum InAddr {
     In4Addr(libc::in_addr),
     In6Addr(libc::in6_addr),
 }
 
-fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
+pub fn ip_to_inaddr(ip: IpAddr) -> InAddr {
     match ip {
-        rtio::Ipv4Addr(a, b, c, d) => {
+        Ipv4Addr(a, b, c, d) => {
             let ip = (a as u32 << 24) |
                      (b as u32 << 16) |
                      (c as u32 <<  8) |
@@ -52,7 +57,7 @@ fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
                 s_addr: Int::from_be(ip)
             })
         }
-        rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
+        Ipv6Addr(a, b, c, d, e, f, g, h) => {
             In6Addr(libc::in6_addr {
                 s6_addr: [
                     htons(a),
@@ -69,7 +74,7 @@ fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
     }
 }
 
-fn addr_to_sockaddr(addr: rtio::SocketAddr,
+pub fn addr_to_sockaddr(addr: SocketAddr,
                     storage: &mut libc::sockaddr_storage)
                     -> libc::socklen_t {
     unsafe {
@@ -93,20 +98,20 @@ fn addr_to_sockaddr(addr: rtio::SocketAddr,
     }
 }
 
-fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
+pub fn socket(addr: SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
     unsafe {
         let fam = match addr.ip {
-            rtio::Ipv4Addr(..) => libc::AF_INET,
-            rtio::Ipv6Addr(..) => libc::AF_INET6,
+            Ipv4Addr(..) => libc::AF_INET,
+            Ipv6Addr(..) => libc::AF_INET6,
         };
         match libc::socket(fam, ty, 0) {
-            -1 => Err(os::last_error()),
+            -1 => Err(last_net_error()),
             fd => Ok(fd),
         }
     }
 }
 
-fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
+pub fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
                  payload: T) -> IoResult<()> {
     unsafe {
         let payload = &payload as *const T as *const libc::c_void;
@@ -114,7 +119,7 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
                                    payload,
                                    mem::size_of::<T>() as libc::socklen_t);
         if ret != 0 {
-            Err(os::last_error())
+            Err(last_net_error())
         } else {
             Ok(())
         }
@@ -130,7 +135,7 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
                                 &mut slot as *mut _ as *mut _,
                                 &mut len);
         if ret != 0 {
-            Err(os::last_error())
+            Err(last_net_error())
         } else {
             assert!(len as uint == mem::size_of::<T>());
             Ok(slot)
@@ -138,10 +143,10 @@ pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
     }
 }
 
-fn sockname(fd: sock_t,
+pub fn sockname(fd: sock_t,
             f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
                                          *mut libc::socklen_t) -> libc::c_int)
-    -> IoResult<rtio::SocketAddr>
+    -> IoResult<SocketAddr>
 {
     let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
     let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
@@ -151,14 +156,14 @@ fn sockname(fd: sock_t,
                     storage as *mut libc::sockaddr,
                     &mut len as *mut libc::socklen_t);
         if ret != 0 {
-            return Err(os::last_error())
+            return Err(last_net_error())
         }
     }
     return sockaddr_to_addr(&storage, len as uint);
 }
 
 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
-                        len: uint) -> IoResult<rtio::SocketAddr> {
+                        len: uint) -> IoResult<SocketAddr> {
     match storage.ss_family as libc::c_int {
         libc::AF_INET => {
             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
@@ -170,8 +175,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
             let b = (ip >> 16) as u8;
             let c = (ip >>  8) as u8;
             let d = (ip >>  0) as u8;
-            Ok(rtio::SocketAddr {
-                ip: rtio::Ipv4Addr(a, b, c, d),
+            Ok(SocketAddr {
+                ip: Ipv4Addr(a, b, c, d),
                 port: ntohs(storage.sin_port),
             })
         }
@@ -188,17 +193,15 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
             let f = ntohs(storage.sin6_addr.s6_addr[5]);
             let g = ntohs(storage.sin6_addr.s6_addr[6]);
             let h = ntohs(storage.sin6_addr.s6_addr[7]);
-            Ok(rtio::SocketAddr {
-                ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
+            Ok(SocketAddr {
+                ip: Ipv6Addr(a, b, c, d, e, f, g, h),
                 port: ntohs(storage.sin6_port),
             })
         }
         _ => {
-            #[cfg(unix)] use libc::EINVAL as ERROR;
-            #[cfg(windows)] use libc::WSAEINVAL as ERROR;
             Err(IoError {
-                code: ERROR as uint,
-                extra: 0,
+                kind: io::InvalidInput,
+                desc: "invalid argument",
                 detail: None,
             })
         }
@@ -206,15 +209,343 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-// TCP streams
+// get_host_addresses
 ////////////////////////////////////////////////////////////////////////////////
 
-pub struct TcpStream {
-    inner: Arc<Inner>,
-    read_deadline: u64,
-    write_deadline: u64,
+extern "system" {
+    fn getaddrinfo(node: *const c_char, service: *const c_char,
+                   hints: *const libc::addrinfo,
+                   res: *mut *mut libc::addrinfo) -> c_int;
+    fn freeaddrinfo(res: *mut libc::addrinfo);
 }
 
+pub fn get_host_addresses(host: Option<&str>, servname: Option<&str>,
+                          hint: Option<addrinfo::Hint>)
+                          -> Result<Vec<addrinfo::Info>, IoError>
+{
+    sys::init_net();
+
+    assert!(host.is_some() || servname.is_some());
+
+    let c_host = host.map(|x| x.to_c_str());
+    let c_host = c_host.as_ref().map(|x| x.as_ptr()).unwrap_or(null());
+    let c_serv = servname.map(|x| x.to_c_str());
+    let c_serv = c_serv.as_ref().map(|x| x.as_ptr()).unwrap_or(null());
+
+    let hint = hint.map(|hint| {
+        libc::addrinfo {
+            ai_flags: hint.flags as c_int,
+            ai_family: hint.family as c_int,
+            ai_socktype: 0,
+            ai_protocol: 0,
+            ai_addrlen: 0,
+            ai_canonname: null_mut(),
+            ai_addr: null_mut(),
+            ai_next: null_mut()
+        }
+    });
+
+    let hint_ptr = hint.as_ref().map_or(null(), |x| {
+        x as *const libc::addrinfo
+    });
+    let mut res = null_mut();
+
+    // Make the call
+    let s = unsafe {
+        getaddrinfo(c_host, c_serv, hint_ptr, &mut res)
+    };
+
+    // Error?
+    if s != 0 {
+        return Err(last_gai_error(s));
+    }
+
+    // Collect all the results we found
+    let mut addrs = Vec::new();
+    let mut rp = res;
+    while rp.is_not_null() {
+        unsafe {
+            let addr = try!(sockaddr_to_addr(mem::transmute((*rp).ai_addr),
+                                             (*rp).ai_addrlen as uint));
+            addrs.push(addrinfo::Info {
+                address: addr,
+                family: (*rp).ai_family as uint,
+                socktype: None,
+                protocol: None,
+                flags: (*rp).ai_flags as uint
+            });
+
+            rp = (*rp).ai_next as *mut libc::addrinfo;
+        }
+    }
+
+    unsafe { freeaddrinfo(res); }
+
+    Ok(addrs)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Timeout helpers
+//
+// The read/write functions below are the helpers for reading/writing a socket
+// with a possible deadline specified. This is generally viewed as a timed out
+// I/O operation.
+//
+// From the application's perspective, timeouts apply to the I/O object, not to
+// the underlying file descriptor (it's one timeout per object). This means that
+// we can't use the SO_RCVTIMEO and corresponding send timeout option.
+//
+// The next idea to implement timeouts would be to use nonblocking I/O. An
+// invocation of select() would wait (with a timeout) for a socket to be ready.
+// Once its ready, we can perform the operation. Note that the operation *must*
+// be nonblocking, even though select() says the socket is ready. This is
+// because some other thread could have come and stolen our data (handles can be
+// cloned).
+//
+// To implement nonblocking I/O, the first option we have is to use the
+// O_NONBLOCK flag. Remember though that this is a global setting, affecting all
+// I/O objects, so this was initially viewed as unwise.
+//
+// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
+// send/recv, but the niftiness wears off once you realize it only works well on
+// Linux [1] [2]. This means that it's pretty easy to get a nonblocking
+// operation on Linux (no flag fiddling, no affecting other objects), but not on
+// other platforms.
+//
+// To work around this constraint on other platforms, we end up using the
+// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
+// could cause other objects' blocking operations to suddenly become
+// nonblocking. To get around this, a "blocking operation" which returns EAGAIN
+// falls back to using the same code path as nonblocking operations, but with an
+// infinite timeout (select + send/recv). This helps emulate blocking
+// reads/writes despite the underlying descriptor being nonblocking, as well as
+// optimizing the fast path of just hitting one syscall in the good case.
+//
+// As a final caveat, this implementation uses a mutex so only one thread is
+// doing a nonblocking operation at at time. This is the operation that comes
+// after the select() (at which point we think the socket is ready). This is
+// done for sanity to ensure that the state of the O_NONBLOCK flag is what we
+// expect (wouldn't want someone turning it on when it should be off!). All
+// operations performed in the lock are *nonblocking* to avoid holding the mutex
+// forever.
+//
+// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
+// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
+// reads/writes are still blocking.
+//
+// Fun, fun!
+//
+// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
+// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
+
+pub fn read<T>(fd: sock_t,
+               deadline: u64,
+               lock: || -> T,
+               read: |bool| -> libc::c_int) -> IoResult<uint> {
+    let mut ret = -1;
+    if deadline == 0 {
+        ret = retry(|| read(false));
+    }
+
+    if deadline != 0 || (ret == -1 && wouldblock()) {
+        let deadline = match deadline {
+            0 => None,
+            n => Some(n),
+        };
+        loop {
+            // With a timeout, first we wait for the socket to become
+            // readable using select(), specifying the relevant timeout for
+            // our previously set deadline.
+            try!(await([fd], deadline, Readable));
+
+            // At this point, we're still within the timeout, and we've
+            // determined that the socket is readable (as returned by
+            // select). We must still read the socket in *nonblocking* mode
+            // because some other thread could come steal our data. If we
+            // fail to read some data, we retry (hence the outer loop) and
+            // wait for the socket to become readable again.
+            let _guard = lock();
+            match retry(|| read(deadline.is_some())) {
+                -1 if wouldblock() => {}
+                -1 => return Err(last_net_error()),
+               n => { ret = n; break }
+            }
+        }
+    }
+
+    match ret {
+        0 => Err(sys_common::eof()),
+        n if n < 0 => Err(last_net_error()),
+        n => Ok(n as uint)
+    }
+}
+
+pub fn write<T>(fd: sock_t,
+                deadline: u64,
+                buf: &[u8],
+                write_everything: bool,
+                lock: || -> T,
+                write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
+    let mut ret = -1;
+    let mut written = 0;
+    if deadline == 0 {
+        if write_everything {
+            ret = keep_going(buf, |inner, len| {
+                written = buf.len() - len;
+                write(false, inner, len)
+            });
+        } else {
+            ret = retry(|| { write(false, buf.as_ptr(), buf.len()) });
+            if ret > 0 { written = ret as uint; }
+        }
+    }
+
+    if deadline != 0 || (ret == -1 && wouldblock()) {
+        let deadline = match deadline {
+            0 => None,
+            n => Some(n),
+        };
+        while written < buf.len() && (write_everything || written == 0) {
+            // As with read(), first wait for the socket to be ready for
+            // the I/O operation.
+            match await([fd], deadline, Writable) {
+                Err(ref e) if e.kind == io::EndOfFile && written > 0 => {
+                    assert!(deadline.is_some());
+                    return Err(short_write(written, "short write"))
+                }
+                Err(e) => return Err(e),
+                Ok(()) => {}
+            }
+
+            // Also as with read(), we use MSG_DONTWAIT to guard ourselves
+            // against unforeseen circumstances.
+            let _guard = lock();
+            let ptr = buf[written..].as_ptr();
+            let len = buf.len() - written;
+            match retry(|| write(deadline.is_some(), ptr, len)) {
+                -1 if wouldblock() => {}
+                -1 => return Err(last_net_error()),
+                n => { written += n as uint; }
+            }
+        }
+        ret = 0;
+    }
+    if ret < 0 {
+        Err(last_net_error())
+    } else {
+        Ok(written)
+    }
+}
+
+// See http://developerweb.net/viewtopic.php?id=3196 for where this is
+// derived from.
+pub fn connect_timeout(fd: sock_t,
+                       addrp: *const libc::sockaddr,
+                       len: libc::socklen_t,
+                       timeout_ms: u64) -> IoResult<()> {
+    #[cfg(unix)]    use libc::EINPROGRESS as INPROGRESS;
+    #[cfg(windows)] use libc::WSAEINPROGRESS as INPROGRESS;
+    #[cfg(unix)]    use libc::EWOULDBLOCK as WOULDBLOCK;
+    #[cfg(windows)] use libc::WSAEWOULDBLOCK as WOULDBLOCK;
+
+    // Make sure the call to connect() doesn't block
+    try!(set_nonblocking(fd, true));
+
+    let ret = match unsafe { libc::connect(fd, addrp, len) } {
+        // If the connection is in progress, then we need to wait for it to
+        // finish (with a timeout). The current strategy for doing this is
+        // to use select() with a timeout.
+        -1 if os::errno() as int == INPROGRESS as int ||
+              os::errno() as int == WOULDBLOCK as int => {
+            let mut set: c::fd_set = unsafe { mem::zeroed() };
+            c::fd_set(&mut set, fd);
+            match await(fd, &mut set, timeout_ms) {
+                0 => Err(timeout("connection timed out")),
+                -1 => Err(last_net_error()),
+                _ => {
+                    let err: libc::c_int = try!(
+                        getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
+                    if err == 0 {
+                        Ok(())
+                    } else {
+                        Err(decode_error_detailed(err))
+                    }
+                }
+            }
+        }
+
+        -1 => Err(last_net_error()),
+        _ => Ok(()),
+    };
+
+    // be sure to turn blocking I/O back on
+    try!(set_nonblocking(fd, false));
+    return ret;
+
+    #[cfg(unix)]
+    fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
+        let start = timer::now();
+        retry(|| unsafe {
+            // Recalculate the timeout each iteration (it is generally
+            // undefined what the value of the 'tv' is after select
+            // returns EINTR).
+            let mut tv = ms_to_timeval(timeout - (timer::now() - start));
+            c::select(fd + 1, ptr::null_mut(), set as *mut _,
+                      ptr::null_mut(), &mut tv)
+        })
+    }
+    #[cfg(windows)]
+    fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
+        let mut tv = ms_to_timeval(timeout);
+        unsafe { c::select(1, ptr::null_mut(), set, ptr::null_mut(), &mut tv) }
+    }
+}
+
+pub fn await(fds: &[sock_t], deadline: Option<u64>,
+             status: SocketStatus) -> IoResult<()> {
+    let mut set: c::fd_set = unsafe { mem::zeroed() };
+    let mut max = 0;
+    for &fd in fds.iter() {
+        c::fd_set(&mut set, fd);
+        max = cmp::max(max, fd + 1);
+    }
+    if cfg!(windows) {
+        max = fds.len() as sock_t;
+    }
+
+    let (read, write) = match status {
+        Readable => (&mut set as *mut _, ptr::null_mut()),
+        Writable => (ptr::null_mut(), &mut set as *mut _),
+    };
+    let mut tv: libc::timeval = unsafe { mem::zeroed() };
+
+    match retry(|| {
+        let now = timer::now();
+        let tvp = match deadline {
+            None => ptr::null_mut(),
+            Some(deadline) => {
+                // If we're past the deadline, then pass a 0 timeout to
+                // select() so we can poll the status
+                let ms = if deadline < now {0} else {deadline - now};
+                tv = ms_to_timeval(ms);
+                &mut tv as *mut _
+            }
+        };
+        let r = unsafe {
+            c::select(max as libc::c_int, read, write, ptr::null_mut(), tvp)
+        };
+        r
+    }) {
+        -1 => Err(last_net_error()),
+        0 => Err(timeout("timed out")),
+        _ => Ok(()),
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Basic socket representation
+////////////////////////////////////////////////////////////////////////////////
+
 struct Inner {
     fd: sock_t,
 
@@ -223,22 +554,44 @@ struct Inner {
     lock: mutex::NativeMutex
 }
 
+impl Inner {
+    fn new(fd: sock_t) -> Inner {
+        Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+    }
+}
+
+impl Drop for Inner {
+    fn drop(&mut self) { unsafe { close_sock(self.fd); } }
+}
+
 pub struct Guard<'a> {
     pub fd: sock_t,
     pub guard: mutex::LockGuard<'a>,
 }
 
-impl Inner {
-    fn new(fd: sock_t) -> Inner {
-        Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+#[unsafe_destructor]
+impl<'a> Drop for Guard<'a> {
+    fn drop(&mut self) {
+        assert!(set_nonblocking(self.fd, false).is_ok());
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// TCP streams
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpStream {
+    inner: Arc<Inner>,
+    read_deadline: u64,
+    write_deadline: u64,
+}
+
 impl TcpStream {
-    pub fn connect(addr: rtio::SocketAddr,
-                   timeout: Option<u64>) -> IoResult<TcpStream> {
+    pub fn connect(addr: SocketAddr, timeout: Option<u64>) -> IoResult<TcpStream> {
+        sys::init_net();
+
         let fd = try!(socket(addr, libc::SOCK_STREAM));
-        let ret = TcpStream::new(Inner::new(fd));
+        let ret = TcpStream::new(fd);
 
         let mut storage = unsafe { mem::zeroed() };
         let len = addr_to_sockaddr(addr, &mut storage);
@@ -246,21 +599,21 @@ impl TcpStream {
 
         match timeout {
             Some(timeout) => {
-                try!(util::connect_timeout(fd, addrp, len, timeout));
+                try!(connect_timeout(fd, addrp, len, timeout));
                 Ok(ret)
             },
             None => {
                 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
-                    -1 => Err(os::last_error()),
+                    -1 => Err(last_error()),
                     _ => Ok(ret),
                 }
             }
         }
     }
 
-    fn new(inner: Inner) -> TcpStream {
+    pub fn new(fd: sock_t) -> TcpStream {
         TcpStream {
-            inner: Arc::new(inner),
+            inner: Arc::new(Inner::new(fd)),
             read_deadline: 0,
             write_deadline: 0,
         }
@@ -268,12 +621,12 @@ impl TcpStream {
 
     pub fn fd(&self) -> sock_t { self.inner.fd }
 
-    fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
+    pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
         setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
                    nodelay as libc::c_int)
     }
 
-    fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
+    pub fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
         let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
                              seconds.is_some() as libc::c_int);
         match seconds {
@@ -309,16 +662,11 @@ impl TcpStream {
             fd: self.fd(),
             guard: unsafe { self.inner.lock.lock() },
         };
-        assert!(util::set_nonblocking(self.fd(), true).is_ok());
+        assert!(set_nonblocking(self.fd(), true).is_ok());
         ret
     }
-}
 
-#[cfg(windows)] type wrlen = libc::c_int;
-#[cfg(not(windows))] type wrlen = libc::size_t;
-
-impl rtio::RtioTcpStream for TcpStream {
-    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
+    pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
         let fd = self.fd();
         let dolock = || self.lock_nonblocking();
         let doread = |nb| unsafe {
@@ -331,7 +679,7 @@ impl rtio::RtioTcpStream for TcpStream {
         read(fd, self.read_deadline, dolock, doread)
     }
 
-    fn write(&mut self, buf: &[u8]) -> IoResult<()> {
+    pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
         let fd = self.fd();
         let dolock = || self.lock_nonblocking();
         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
@@ -341,340 +689,42 @@ impl rtio::RtioTcpStream for TcpStream {
                        len as wrlen,
                        flags) as i64
         };
-        match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
-            Ok(_) => Ok(()),
-            Err(e) => Err(e)
-        }
+        write(fd, self.write_deadline, buf, true, dolock, dowrite).map(|_| ())
     }
-    fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
+    pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
         sockname(self.fd(), libc::getpeername)
     }
-    fn control_congestion(&mut self) -> IoResult<()> {
-        self.set_nodelay(false)
-    }
-    fn nodelay(&mut self) -> IoResult<()> {
-        self.set_nodelay(true)
-    }
-    fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
-        self.set_keepalive(Some(delay_in_seconds))
-    }
-    fn letdie(&mut self) -> IoResult<()> {
-        self.set_keepalive(None)
-    }
 
-    fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
-        box TcpStream {
-            inner: self.inner.clone(),
-            read_deadline: 0,
-            write_deadline: 0,
-        } as Box<rtio::RtioTcpStream + Send>
-    }
-
-    fn close_write(&mut self) -> IoResult<()> {
+    pub fn close_write(&mut self) -> IoResult<()> {
         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
     }
-    fn close_read(&mut self) -> IoResult<()> {
+    pub fn close_read(&mut self) -> IoResult<()> {
         super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
     }
 
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
         self.read_deadline = deadline;
         self.write_deadline = deadline;
     }
-    fn set_read_timeout(&mut self, timeout: Option<u64>) {
-        self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
-    }
-    fn set_write_timeout(&mut self, timeout: Option<u64>) {
-        self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
-    }
-}
-
-impl rtio::RtioSocket for TcpStream {
-    fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
-        sockname(self.fd(), libc::getsockname)
-    }
-}
-
-impl Drop for Inner {
-    fn drop(&mut self) { unsafe { os::close(self.fd); } }
-}
-
-#[unsafe_destructor]
-impl<'a> Drop for Guard<'a> {
-    fn drop(&mut self) {
-        assert!(util::set_nonblocking(self.fd, false).is_ok());
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// TCP listeners
-////////////////////////////////////////////////////////////////////////////////
-
-pub struct TcpListener {
-    inner: Inner,
-}
-
-impl TcpListener {
-    pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
-        let fd = try!(socket(addr, libc::SOCK_STREAM));
-        let ret = TcpListener { inner: Inner::new(fd) };
-
-        let mut storage = unsafe { mem::zeroed() };
-        let len = addr_to_sockaddr(addr, &mut storage);
-        let addrp = &storage as *const _ as *const libc::sockaddr;
-
-        // On platforms with Berkeley-derived sockets, this allows
-        // to quickly rebind a socket, without needing to wait for
-        // the OS to clean up the previous one.
-        if cfg!(unix) {
-            try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
-                            1 as libc::c_int));
-        }
-
-        match unsafe { libc::bind(fd, addrp, len) } {
-            -1 => Err(os::last_error()),
-            _ => Ok(ret),
-        }
-    }
-
-    pub fn fd(&self) -> sock_t { self.inner.fd }
-
-    pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
-        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-            -1 => Err(os::last_error()),
-
-            #[cfg(unix)]
-            _ => {
-                let (reader, writer) = try!(process::pipe());
-                try!(util::set_nonblocking(reader.fd(), true));
-                try!(util::set_nonblocking(writer.fd(), true));
-                try!(util::set_nonblocking(self.fd(), true));
-                Ok(TcpAcceptor {
-                    inner: Arc::new(AcceptorInner {
-                        listener: self,
-                        reader: reader,
-                        writer: writer,
-                        closed: atomic::AtomicBool::new(false),
-                    }),
-                    deadline: 0,
-                })
-            }
-
-            #[cfg(windows)]
-            _ => {
-                let accept = try!(os::Event::new());
-                let ret = unsafe {
-                    c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
-                };
-                if ret != 0 {
-                    return Err(os::last_error())
-                }
-                Ok(TcpAcceptor {
-                    inner: Arc::new(AcceptorInner {
-                        listener: self,
-                        abort: try!(os::Event::new()),
-                        accept: accept,
-                        closed: atomic::AtomicBool::new(false),
-                    }),
-                    deadline: 0,
-                })
-            }
-        }
-    }
-}
-
-impl rtio::RtioTcpListener for TcpListener {
-    fn listen(self: Box<TcpListener>)
-              -> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
-        self.native_listen(128).map(|a| {
-            box a as Box<rtio::RtioTcpAcceptor + Send>
-        })
-    }
-}
-
-impl rtio::RtioSocket for TcpListener {
-    fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
-        sockname(self.fd(), libc::getsockname)
-    }
-}
-
-pub struct TcpAcceptor {
-    inner: Arc<AcceptorInner>,
-    deadline: u64,
-}
-
-#[cfg(unix)]
-struct AcceptorInner {
-    listener: TcpListener,
-    reader: FileDesc,
-    writer: FileDesc,
-    closed: atomic::AtomicBool,
-}
-
-#[cfg(windows)]
-struct AcceptorInner {
-    listener: TcpListener,
-    abort: os::Event,
-    accept: os::Event,
-    closed: atomic::AtomicBool,
-}
-
-impl TcpAcceptor {
-    pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
-
-    #[cfg(unix)]
-    pub fn native_accept(&mut self) -> IoResult<TcpStream> {
-        // In implementing accept, the two main concerns are dealing with
-        // close_accept() and timeouts. The unix implementation is based on a
-        // nonblocking accept plus a call to select(). Windows ends up having
-        // an entirely separate implementation than unix, which is explained
-        // below.
-        //
-        // To implement timeouts, all blocking is done via select() instead of
-        // accept() by putting the socket in non-blocking mode. Because
-        // select() takes a timeout argument, we just pass through the timeout
-        // to select().
-        //
-        // To implement close_accept(), we have a self-pipe to ourselves which
-        // is passed to select() along with the socket being accepted on. The
-        // self-pipe is never written to unless close_accept() is called.
-        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
-
-        while !self.inner.closed.load(atomic::SeqCst) {
-            match retry(|| unsafe {
-                libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
-            }) {
-                -1 if util::wouldblock() => {}
-                -1 => return Err(os::last_error()),
-                fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
-            }
-            try!(util::await([self.fd(), self.inner.reader.fd()],
-                             deadline, util::Readable));
-        }
-
-        Err(util::eof())
+    pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
-
-    #[cfg(windows)]
-    pub fn native_accept(&mut self) -> IoResult<TcpStream> {
-        // Unlink unix, windows cannot invoke `select` on arbitrary file
-        // descriptors like pipes, only sockets. Consequently, windows cannot
-        // use the same implementation as unix for accept() when close_accept()
-        // is considered.
-        //
-        // In order to implement close_accept() and timeouts, windows uses
-        // event handles. An acceptor-specific abort event is created which
-        // will only get set in close_accept(), and it will never be un-set.
-        // Additionally, another acceptor-specific event is associated with the
-        // FD_ACCEPT network event.
-        //
-        // These two events are then passed to WaitForMultipleEvents to see
-        // which one triggers first, and the timeout passed to this function is
-        // the local timeout for the acceptor.
-        //
-        // If the wait times out, then the accept timed out. If the wait
-        // succeeds with the abort event, then we were closed, and if the wait
-        // succeeds otherwise, then we do a nonblocking poll via `accept` to
-        // see if we can accept a connection. The connection is candidate to be
-        // stolen, so we do all of this in a loop as well.
-        let events = [self.inner.abort.handle(), self.inner.accept.handle()];
-
-        while !self.inner.closed.load(atomic::SeqCst) {
-            let ms = if self.deadline == 0 {
-                c::WSA_INFINITE as u64
-            } else {
-                let now = ::io::timer::now();
-                if self.deadline < now {0} else {self.deadline - now}
-            };
-            let ret = unsafe {
-                c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
-                                            ms as libc::DWORD, libc::FALSE)
-            };
-            match ret {
-                c::WSA_WAIT_TIMEOUT => {
-                    return Err(util::timeout("accept timed out"))
-                }
-                c::WSA_WAIT_FAILED => return Err(os::last_error()),
-                c::WSA_WAIT_EVENT_0 => break,
-                n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
-            }
-
-            let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
-            let ret = unsafe {
-                c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
-            };
-            if ret != 0 { return Err(os::last_error()) }
-
-            if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
-            match unsafe {
-                libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
-            } {
-                -1 if util::wouldblock() => {}
-                -1 => return Err(os::last_error()),
-
-                // Accepted sockets inherit the same properties as the caller,
-                // so we need to deregister our event and switch the socket back
-                // to blocking mode
-                fd => {
-                    let stream = TcpStream::new(Inner::new(fd));
-                    let ret = unsafe {
-                        c::WSAEventSelect(fd, events[1], 0)
-                    };
-                    if ret != 0 { return Err(os::last_error()) }
-                    try!(util::set_nonblocking(fd, false));
-                    return Ok(stream)
-                }
-            }
-        }
-
-        Err(util::eof())
+    pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
-}
 
-impl rtio::RtioSocket for TcpAcceptor {
-    fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
+    pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
         sockname(self.fd(), libc::getsockname)
     }
 }
 
-impl rtio::RtioTcpAcceptor for TcpAcceptor {
-    fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
-        self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
-    }
-
-    fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
-    fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
-    }
-
-    fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
-        box TcpAcceptor {
+impl Clone for TcpStream {
+    fn clone(&self) -> TcpStream {
+        TcpStream {
             inner: self.inner.clone(),
-            deadline: 0,
-        } as Box<rtio::RtioTcpAcceptor + Send>
-    }
-
-    #[cfg(unix)]
-    fn close_accept(&mut self) -> IoResult<()> {
-        self.inner.closed.store(true, atomic::SeqCst);
-        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
-        match fd.inner_write([0]) {
-            Ok(..) => Ok(()),
-            Err(..) if util::wouldblock() => Ok(()),
-            Err(e) => Err(e),
-        }
-    }
-
-    #[cfg(windows)]
-    fn close_accept(&mut self) -> IoResult<()> {
-        self.inner.closed.store(true, atomic::SeqCst);
-        let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
-        if ret == libc::TRUE {
-            Ok(())
-        } else {
-            Err(os::last_error())
+            read_deadline: 0,
+            write_deadline: 0,
         }
     }
 }
@@ -690,7 +740,9 @@ pub struct UdpSocket {
 }
 
 impl UdpSocket {
-    pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
+    pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> {
+        sys::init_net();
+
         let fd = try!(socket(addr, libc::SOCK_DGRAM));
         let ret = UdpSocket {
             inner: Arc::new(Inner::new(fd)),
@@ -703,7 +755,7 @@ impl UdpSocket {
         let addrp = &storage as *const _ as *const libc::sockaddr;
 
         match unsafe { libc::bind(fd, addrp, len) } {
-            -1 => Err(os::last_error()),
+            -1 => Err(last_error()),
             _ => Ok(ret),
         }
     }
@@ -720,8 +772,7 @@ impl UdpSocket {
                    on as libc::c_int)
     }
 
-    pub fn set_membership(&mut self, addr: rtio::IpAddr,
-                          opt: libc::c_int) -> IoResult<()> {
+    pub fn set_membership(&mut self, addr: IpAddr, opt: libc::c_int) -> IoResult<()> {
         match ip_to_inaddr(addr) {
             In4Addr(addr) => {
                 let mreq = libc::ip_mreq {
@@ -750,22 +801,15 @@ impl UdpSocket {
             fd: self.fd(),
             guard: unsafe { self.inner.lock.lock() },
         };
-        assert!(util::set_nonblocking(self.fd(), true).is_ok());
+        assert!(set_nonblocking(self.fd(), true).is_ok());
         ret
     }
-}
 
-impl rtio::RtioSocket for UdpSocket {
-    fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
+    pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
         sockname(self.fd(), libc::getsockname)
     }
-}
-
-#[cfg(windows)] type msglen_t = libc::c_int;
-#[cfg(unix)]    type msglen_t = libc::size_t;
 
-impl rtio::RtioUdpSocket for UdpSocket {
-    fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
+    pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> {
         let fd = self.fd();
         let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
         let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
@@ -787,7 +831,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
         })
     }
 
-    fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
+    pub fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> {
         let mut storage = unsafe { mem::zeroed() };
         let dstlen = addr_to_sockaddr(dst, &mut storage);
         let dstp = &storage as *const _ as *const libc::sockaddr;
@@ -806,298 +850,60 @@ impl rtio::RtioUdpSocket for UdpSocket {
 
         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
         if n != buf.len() {
-            Err(util::short_write(n, "couldn't send entire packet at once"))
+            Err(short_write(n, "couldn't send entire packet at once"))
         } else {
             Ok(())
         }
     }
 
-    fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
+    pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
         match multi {
-            rtio::Ipv4Addr(..) => {
+            Ipv4Addr(..) => {
                 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
             }
-            rtio::Ipv6Addr(..) => {
+            Ipv6Addr(..) => {
                 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
             }
         }
     }
-    fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
+    pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
         match multi {
-            rtio::Ipv4Addr(..) => {
+            Ipv4Addr(..) => {
                 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
             }
-            rtio::Ipv6Addr(..) => {
+            Ipv6Addr(..) => {
                 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
             }
         }
     }
 
-    fn loop_multicast_locally(&mut self) -> IoResult<()> {
-        self.set_multicast_loop(true)
-    }
-    fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
-        self.set_multicast_loop(false)
-    }
-
-    fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
+    pub fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
                    ttl as libc::c_int)
     }
-    fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
+    pub fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
     }
 
-    fn hear_broadcasts(&mut self) -> IoResult<()> {
-        self.set_broadcast(true)
-    }
-    fn ignore_broadcasts(&mut self) -> IoResult<()> {
-        self.set_broadcast(false)
-    }
-
-    fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
-        box UdpSocket {
-            inner: self.inner.clone(),
-            read_deadline: 0,
-            write_deadline: 0,
-        } as Box<rtio::RtioUdpSocket + Send>
-    }
-
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
         self.read_deadline = deadline;
         self.write_deadline = deadline;
     }
-    fn set_read_timeout(&mut self, timeout: Option<u64>) {
-        self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
-    fn set_write_timeout(&mut self, timeout: Option<u64>) {
-        self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
 }
 
-////////////////////////////////////////////////////////////////////////////////
-// Timeout helpers
-//
-// The read/write functions below are the helpers for reading/writing a socket
-// with a possible deadline specified. This is generally viewed as a timed out
-// I/O operation.
-//
-// From the application's perspective, timeouts apply to the I/O object, not to
-// the underlying file descriptor (it's one timeout per object). This means that
-// we can't use the SO_RCVTIMEO and corresponding send timeout option.
-//
-// The next idea to implement timeouts would be to use nonblocking I/O. An
-// invocation of select() would wait (with a timeout) for a socket to be ready.
-// Once its ready, we can perform the operation. Note that the operation *must*
-// be nonblocking, even though select() says the socket is ready. This is
-// because some other thread could have come and stolen our data (handles can be
-// cloned).
-//
-// To implement nonblocking I/O, the first option we have is to use the
-// O_NONBLOCK flag. Remember though that this is a global setting, affecting all
-// I/O objects, so this was initially viewed as unwise.
-//
-// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
-// send/recv, but the niftiness wears off once you realize it only works well on
-// Linux [1] [2]. This means that it's pretty easy to get a nonblocking
-// operation on Linux (no flag fiddling, no affecting other objects), but not on
-// other platforms.
-//
-// To work around this constraint on other platforms, we end up using the
-// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
-// could cause other objects' blocking operations to suddenly become
-// nonblocking. To get around this, a "blocking operation" which returns EAGAIN
-// falls back to using the same code path as nonblocking operations, but with an
-// infinite timeout (select + send/recv). This helps emulate blocking
-// reads/writes despite the underlying descriptor being nonblocking, as well as
-// optimizing the fast path of just hitting one syscall in the good case.
-//
-// As a final caveat, this implementation uses a mutex so only one thread is
-// doing a nonblocking operation at at time. This is the operation that comes
-// after the select() (at which point we think the socket is ready). This is
-// done for sanity to ensure that the state of the O_NONBLOCK flag is what we
-// expect (wouldn't want someone turning it on when it should be off!). All
-// operations performed in the lock are *nonblocking* to avoid holding the mutex
-// forever.
-//
-// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
-// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
-// reads/writes are still blocking.
-//
-// Fun, fun!
-//
-// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
-// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
-
-pub fn read<T>(fd: sock_t,
-               deadline: u64,
-               lock: || -> T,
-               read: |bool| -> libc::c_int) -> IoResult<uint> {
-    let mut ret = -1;
-    if deadline == 0 {
-        ret = retry(|| read(false));
-    }
-
-    if deadline != 0 || (ret == -1 && util::wouldblock()) {
-        let deadline = match deadline {
-            0 => None,
-            n => Some(n),
-        };
-        loop {
-            // With a timeout, first we wait for the socket to become
-            // readable using select(), specifying the relevant timeout for
-            // our previously set deadline.
-            try!(util::await([fd], deadline, util::Readable));
-
-            // At this point, we're still within the timeout, and we've
-            // determined that the socket is readable (as returned by
-            // select). We must still read the socket in *nonblocking* mode
-            // because some other thread could come steal our data. If we
-            // fail to read some data, we retry (hence the outer loop) and
-            // wait for the socket to become readable again.
-            let _guard = lock();
-            match retry(|| read(deadline.is_some())) {
-                -1 if util::wouldblock() => {}
-                -1 => return Err(os::last_error()),
-               n => { ret = n; break }
-            }
-        }
-    }
-
-    match ret {
-        0 => Err(util::eof()),
-        n if n < 0 => Err(os::last_error()),
-        n => Ok(n as uint)
-    }
-}
-
-pub fn write<T>(fd: sock_t,
-                deadline: u64,
-                buf: &[u8],
-                write_everything: bool,
-                lock: || -> T,
-                write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
-    let mut ret = -1;
-    let mut written = 0;
-    if deadline == 0 {
-        if write_everything {
-            ret = keep_going(buf, |inner, len| {
-                written = buf.len() - len;
-                write(false, inner, len)
-            });
-        } else {
-            ret = retry(|| { write(false, buf.as_ptr(), buf.len()) });
-            if ret > 0 { written = ret as uint; }
-        }
-    }
-
-    if deadline != 0 || (ret == -1 && util::wouldblock()) {
-        let deadline = match deadline {
-            0 => None,
-            n => Some(n),
-        };
-        while written < buf.len() && (write_everything || written == 0) {
-            // As with read(), first wait for the socket to be ready for
-            // the I/O operation.
-            match util::await([fd], deadline, util::Writable) {
-                Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
-                    assert!(deadline.is_some());
-                    return Err(util::short_write(written, "short write"))
-                }
-                Err(e) => return Err(e),
-                Ok(()) => {}
-            }
-
-            // Also as with read(), we use MSG_DONTWAIT to guard ourselves
-            // against unforeseen circumstances.
-            let _guard = lock();
-            let ptr = buf[written..].as_ptr();
-            let len = buf.len() - written;
-            match retry(|| write(deadline.is_some(), ptr, len)) {
-                -1 if util::wouldblock() => {}
-                -1 => return Err(os::last_error()),
-                n => { written += n as uint; }
-            }
-        }
-        ret = 0;
-    }
-    if ret < 0 {
-        Err(os::last_error())
-    } else {
-        Ok(written)
-    }
-}
-
-#[cfg(windows)]
-mod os {
-    use libc;
-    use std::mem;
-    use std::rt::rtio::{IoError, IoResult};
-
-    use io::c;
-
-    pub type sock_t = libc::SOCKET;
-    pub struct Event(c::WSAEVENT);
-
-    impl Event {
-        pub fn new() -> IoResult<Event> {
-            let event = unsafe { c::WSACreateEvent() };
-            if event == c::WSA_INVALID_EVENT {
-                Err(last_error())
-            } else {
-                Ok(Event(event))
-            }
-        }
-
-        pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
-    }
-
-    impl Drop for Event {
-        fn drop(&mut self) {
-            unsafe { let _ = c::WSACloseEvent(self.handle()); }
-        }
-    }
-
-    pub fn init() {
-        unsafe {
-            use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
-            static mut INITIALIZED: bool = false;
-            static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
-
-            let _guard = LOCK.lock();
-            if !INITIALIZED {
-                let mut data: c::WSADATA = mem::zeroed();
-                let ret = c::WSAStartup(0x202,      // version 2.2
-                                        &mut data);
-                assert_eq!(ret, 0);
-                INITIALIZED = true;
-            }
-        }
-    }
-
-    pub fn last_error() -> IoError {
-        use std::os;
-        let code = unsafe { c::WSAGetLastError() as uint };
-        IoError {
-            code: code,
-            extra: 0,
-            detail: Some(os::error_string(code)),
+impl Clone for UdpSocket {
+    fn clone(&self) -> UdpSocket {
+        UdpSocket {
+            inner: self.inner.clone(),
+            read_deadline: 0,
+            write_deadline: 0,
         }
     }
-
-    pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
-}
-
-#[cfg(unix)]
-mod os {
-    use libc;
-    use std::rt::rtio::IoError;
-    use io;
-
-    pub type sock_t = io::file::fd_t;
-
-    pub fn init() {}
-    pub fn last_error() -> IoError { io::last_error() }
-    pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
 }
diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs
index ad5de2dad48..5a43fd08f90 100644
--- a/src/libstd/sys/unix/mod.rs
+++ b/src/libstd/sys/unix/mod.rs
@@ -8,24 +8,51 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+#![allow(missing_doc)]
+
 extern crate libc;
 
 use num;
 use prelude::*;
 use io::{mod, IoResult, IoError};
+use sys_common::mkerr_libc;
 
+pub mod c;
 pub mod fs;
 pub mod os;
-pub mod c;
+pub mod tcp;
+pub mod udp;
+pub mod pipe;
+
+pub mod addrinfo {
+    pub use sys_common::net::get_host_addresses;
+}
 
-pub type sock_t = io::file::fd_t;
+// FIXME: move these to c module
+pub type sock_t = self::fs::fd_t;
 pub type wrlen = libc::size_t;
+pub type msglen_t = libc::size_t;
 pub unsafe fn close_sock(sock: sock_t) { let _ = libc::close(sock); }
 
 pub fn last_error() -> IoError {
-    let errno = os::errno() as i32;
-    let mut err = decode_error(errno);
-    err.detail = Some(os::error_string(errno));
+    decode_error_detailed(os::errno() as i32)
+}
+
+pub fn last_net_error() -> IoError {
+    last_error()
+}
+
+extern "system" {
+    fn gai_strerror(errcode: libc::c_int) -> *const libc::c_char;
+}
+
+pub fn last_gai_error(s: libc::c_int) -> IoError {
+    use c_str::CString;
+
+    let mut err = decode_error(s);
+    err.detail = Some(unsafe {
+        CString::new(gai_strerror(s), false).as_str().unwrap().to_string()
+    });
     err
 }
 
@@ -64,6 +91,12 @@ pub fn decode_error(errno: i32) -> IoError {
     IoError { kind: kind, desc: desc, detail: None }
 }
 
+pub fn decode_error_detailed(errno: i32) -> IoError {
+    let mut err = decode_error(errno);
+    err.detail = Some(os::error_string(errno));
+    err
+}
+
 #[inline]
 pub fn retry<I: PartialEq + num::One + Neg<I>> (f: || -> I) -> I {
     let minus_one = -num::one::<I>();
@@ -86,7 +119,10 @@ pub fn wouldblock() -> bool {
     err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int
 }
 
-pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
     let set = nb as libc::c_int;
-    super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
+    mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
 }
+
+// nothing needed on unix platforms
+pub fn init_net() {}
diff --git a/src/libstd/sys/unix/os.rs b/src/libstd/sys/unix/os.rs
index 34699eb27c1..4e495f043bc 100644
--- a/src/libstd/sys/unix/os.rs
+++ b/src/libstd/sys/unix/os.rs
@@ -11,6 +11,8 @@
 use libc;
 use libc::{c_int, c_char};
 use prelude::*;
+use io::IoResult;
+use sys::fs::FileDesc;
 
 use os::TMPBUF_SZ;
 
@@ -99,3 +101,12 @@ pub fn error_string(errno: i32) -> String {
         ::string::raw::from_buf(p as *const u8)
     }
 }
+
+pub unsafe fn pipe() -> IoResult<(FileDesc, FileDesc)> {
+    let mut fds = [0, ..2];
+    if libc::pipe(fds.as_mut_ptr()) == 0 {
+        Ok((FileDesc::new(fds[0], true), FileDesc::new(fds[1], true)))
+    } else {
+        Err(super::last_error())
+    }
+}
diff --git a/src/libnative/io/pipe_unix.rs b/src/libstd/sys/unix/pipe.rs
index 48f31615339..67384848a94 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libstd/sys/unix/pipe.rs
@@ -10,19 +10,17 @@
 
 use alloc::arc::Arc;
 use libc;
-use std::c_str::CString;
-use std::mem;
-use std::rt::mutex;
-use std::rt::rtio;
-use std::rt::rtio::{IoResult, IoError};
-use std::sync::atomic;
-
-use super::retry;
-use super::net;
-use super::util;
-use super::c;
-use super::process;
-use super::file::{fd_t, FileDesc};
+use c_str::CString;
+use mem;
+use rt::mutex;
+use sync::atomic;
+use io::{mod, IoResult, IoError};
+use prelude::*;
+
+use sys::{mod, timer, retry, c, set_nonblocking, wouldblock};
+use sys::fs::{fd_t, FileDesc};
+use sys_common::net::*;
+use sys_common::{eof, mkerr_libc};
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -41,12 +39,10 @@ fn addr_to_sockaddr_un(addr: &CString,
 
     let len = addr.len();
     if len > s.sun_path.len() - 1 {
-        #[cfg(unix)] use libc::EINVAL as ERROR;
-        #[cfg(windows)] use libc::WSAEINVAL as ERROR;
         return Err(IoError {
-            code: ERROR as uint,
-            extra: 0,
-            detail: Some("path must be smaller than SUN_LEN".to_string()),
+            kind: io::InvalidInput,
+            desc: "invalid argument: path must be smaller than SUN_LEN",
+            detail: None,
         })
     }
     s.sun_family = libc::AF_UNIX as libc::sa_family_t;
@@ -92,7 +88,7 @@ fn connect(addr: &CString, ty: libc::c_int,
             }
         }
         Some(timeout_ms) => {
-            try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
+            try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
             Ok(inner)
         }
     }
@@ -143,18 +139,16 @@ impl UnixStream {
     fn lock_nonblocking(&self) {}
 
     #[cfg(not(target_os = "linux"))]
-    fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
-        let ret = net::Guard {
+    fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
+        let ret = Guard {
             fd: self.fd(),
             guard: unsafe { self.inner.lock.lock() },
         };
-        assert!(util::set_nonblocking(self.fd(), true).is_ok());
+        assert!(set_nonblocking(self.fd(), true).is_ok());
         ret
     }
-}
 
-impl rtio::RtioPipe for UnixStream {
-    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
+    pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
         let fd = self.fd();
         let dolock = || self.lock_nonblocking();
         let doread = |nb| unsafe {
@@ -164,10 +158,10 @@ impl rtio::RtioPipe for UnixStream {
                        buf.len() as libc::size_t,
                        flags) as libc::c_int
         };
-        net::read(fd, self.read_deadline, dolock, doread)
+        read(fd, self.read_deadline, dolock, doread)
     }
 
-    fn write(&mut self, buf: &[u8]) -> IoResult<()> {
+    pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
         let fd = self.fd();
         let dolock = || self.lock_nonblocking();
         let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
@@ -177,32 +171,38 @@ impl rtio::RtioPipe for UnixStream {
                        len as libc::size_t,
                        flags) as i64
         };
-        match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
+        match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
             Ok(_) => Ok(()),
             Err(e) => Err(e)
         }
     }
 
-    fn clone(&self) -> Box<rtio::RtioPipe + Send> {
-        box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send>
+    pub fn close_write(&mut self) -> IoResult<()> {
+        mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
     }
 
-    fn close_write(&mut self) -> IoResult<()> {
-        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
-    }
-    fn close_read(&mut self) -> IoResult<()> {
-        super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
+    pub fn close_read(&mut self) -> IoResult<()> {
+        mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
     }
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
         self.read_deadline = deadline;
         self.write_deadline = deadline;
     }
-    fn set_read_timeout(&mut self, timeout: Option<u64>) {
-        self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+
+    pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+
+    pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
-    fn set_write_timeout(&mut self, timeout: Option<u64>) {
-        self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+}
+
+impl Clone for UnixStream {
+    fn clone(&self) -> UnixStream {
+        UnixStream::new(self.inner.clone())
     }
 }
 
@@ -224,16 +224,15 @@ impl UnixListener {
 
     fn fd(&self) -> fd_t { self.inner.fd }
 
-    pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
-        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
+    pub fn listen(self) -> IoResult<UnixAcceptor> {
+        match unsafe { libc::listen(self.fd(), 128) } {
             -1 => Err(super::last_error()),
 
-            #[cfg(unix)]
             _ => {
-                let (reader, writer) = try!(process::pipe());
-                try!(util::set_nonblocking(reader.fd(), true));
-                try!(util::set_nonblocking(writer.fd(), true));
-                try!(util::set_nonblocking(self.fd(), true));
+                let (reader, writer) = try!(unsafe { sys::os::pipe() });
+                try!(set_nonblocking(reader.fd(), true));
+                try!(set_nonblocking(writer.fd(), true));
+                try!(set_nonblocking(self.fd(), true));
                 Ok(UnixAcceptor {
                     inner: Arc::new(AcceptorInner {
                         listener: self,
@@ -248,21 +247,11 @@ impl UnixListener {
     }
 }
 
-impl rtio::RtioUnixListener for UnixListener {
-    fn listen(self: Box<UnixListener>)
-              -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
-        self.native_listen(128).map(|a| {
-            box a as Box<rtio::RtioUnixAcceptor + Send>
-        })
-    }
-}
-
 pub struct UnixAcceptor {
     inner: Arc<AcceptorInner>,
     deadline: u64,
 }
 
-#[cfg(unix)]
 struct AcceptorInner {
     listener: UnixListener,
     reader: FileDesc,
@@ -273,7 +262,7 @@ struct AcceptorInner {
 impl UnixAcceptor {
     fn fd(&self) -> fd_t { self.inner.listener.fd() }
 
-    pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+    pub fn accept(&mut self) -> IoResult<UnixStream> {
         let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
 
         while !self.inner.closed.load(atomic::SeqCst) {
@@ -287,46 +276,39 @@ impl UnixAcceptor {
                                  storagep as *mut libc::sockaddr,
                                  &mut size as *mut libc::socklen_t) as libc::c_int
                 }) {
-                    -1 if util::wouldblock() => {}
+                    -1 if wouldblock() => {}
                     -1 => return Err(super::last_error()),
                     fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
                 }
             }
-            try!(util::await([self.fd(), self.inner.reader.fd()],
-                             deadline, util::Readable));
+            try!(await([self.fd(), self.inner.reader.fd()],
+                             deadline, Readable));
         }
 
-        Err(util::eof())
+        Err(eof())
     }
-}
 
-impl rtio::RtioUnixAcceptor for UnixAcceptor {
-    fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
-        self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
-    }
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
 
-    fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
-        box UnixAcceptor {
-            inner: self.inner.clone(),
-            deadline: 0,
-        } as Box<rtio::RtioUnixAcceptor + Send>
-    }
-
-    #[cfg(unix)]
-    fn close_accept(&mut self) -> IoResult<()> {
+    pub fn close_accept(&mut self) -> IoResult<()> {
         self.inner.closed.store(true, atomic::SeqCst);
-        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
-        match fd.inner_write([0]) {
+        let fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.write([0]) {
             Ok(..) => Ok(()),
-            Err(..) if util::wouldblock() => Ok(()),
+            Err(..) if wouldblock() => Ok(()),
             Err(e) => Err(e),
         }
     }
 }
 
+impl Clone for UnixAcceptor {
+    fn clone(&self) -> UnixAcceptor {
+        UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
+    }
+}
+
 impl Drop for UnixListener {
     fn drop(&mut self) {
         // Unlink the path to the socket to ensure that it doesn't linger. We're
diff --git a/src/libstd/sys/unix/tcp.rs b/src/libstd/sys/unix/tcp.rs
new file mode 100644
index 00000000000..962475e4177
--- /dev/null
+++ b/src/libstd/sys/unix/tcp.rs
@@ -0,0 +1,157 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use io::net::ip;
+use io::IoResult;
+use libc;
+use mem;
+use ptr;
+use prelude::*;
+use super::{last_error, last_net_error, retry, sock_t};
+use sync::{Arc, atomic};
+use sys::fs::FileDesc;
+use sys::{set_nonblocking, wouldblock};
+use sys;
+use sys_common;
+use sys_common::net::*;
+
+pub use sys_common::net::TcpStream;
+
+////////////////////////////////////////////////////////////////////////////////
+// TCP listeners
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpListener {
+    pub inner: FileDesc,
+}
+
+impl TcpListener {
+    pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
+        let fd = try!(socket(addr, libc::SOCK_STREAM));
+        let ret = TcpListener { inner: FileDesc::new(fd, true) };
+
+        let mut storage = unsafe { mem::zeroed() };
+        let len = addr_to_sockaddr(addr, &mut storage);
+        let addrp = &storage as *const _ as *const libc::sockaddr;
+
+        // On platforms with Berkeley-derived sockets, this allows
+        // to quickly rebind a socket, without needing to wait for
+        // the OS to clean up the previous one.
+        try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, 1 as libc::c_int));
+
+
+        match unsafe { libc::bind(fd, addrp, len) } {
+            -1 => Err(last_error()),
+            _ => Ok(ret),
+        }
+    }
+
+    pub fn fd(&self) -> sock_t { self.inner.fd() }
+
+    pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> {
+        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
+            -1 => Err(last_net_error()),
+            _ => {
+                let (reader, writer) = try!(unsafe { sys::os::pipe() });
+                try!(set_nonblocking(reader.fd(), true));
+                try!(set_nonblocking(writer.fd(), true));
+                try!(set_nonblocking(self.fd(), true));
+                Ok(TcpAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        reader: reader,
+                        writer: writer,
+                        closed: atomic::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
+        }
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+}
+
+pub struct TcpAcceptor {
+    inner: Arc<AcceptorInner>,
+    deadline: u64,
+}
+
+struct AcceptorInner {
+    listener: TcpListener,
+    reader: FileDesc,
+    writer: FileDesc,
+    closed: atomic::AtomicBool,
+}
+
+impl TcpAcceptor {
+    pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
+
+    pub fn accept(&mut self) -> IoResult<TcpStream> {
+        // In implementing accept, the two main concerns are dealing with
+        // close_accept() and timeouts. The unix implementation is based on a
+        // nonblocking accept plus a call to select(). Windows ends up having
+        // an entirely separate implementation than unix, which is explained
+        // below.
+        //
+        // To implement timeouts, all blocking is done via select() instead of
+        // accept() by putting the socket in non-blocking mode. Because
+        // select() takes a timeout argument, we just pass through the timeout
+        // to select().
+        //
+        // To implement close_accept(), we have a self-pipe to ourselves which
+        // is passed to select() along with the socket being accepted on. The
+        // self-pipe is never written to unless close_accept() is called.
+        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+        while !self.inner.closed.load(atomic::SeqCst) {
+            match retry(|| unsafe {
+                libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
+            }) {
+                -1 if wouldblock() => {}
+                -1 => return Err(last_net_error()),
+                fd => return Ok(TcpStream::new(fd as sock_t)),
+            }
+            try!(await([self.fd(), self.inner.reader.fd()],
+                             deadline, Readable));
+        }
+
+        Err(sys_common::eof())
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| sys::timer::now() + a).unwrap_or(0);
+    }
+
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.write([0]) {
+            Ok(..) => Ok(()),
+            Err(..) if wouldblock() => Ok(()),
+            Err(e) => Err(e),
+        }
+    }
+}
+
+impl Clone for TcpAcceptor {
+    fn clone(&self) -> TcpAcceptor {
+        TcpAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        }
+    }
+}
diff --git a/src/libstd/sys/unix/udp.rs b/src/libstd/sys/unix/udp.rs
new file mode 100644
index 00000000000..50f8fb828ad
--- /dev/null
+++ b/src/libstd/sys/unix/udp.rs
@@ -0,0 +1,11 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+pub use sys_common::net::UdpSocket;
diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs
index 5f4129c1484..85fbc6b936c 100644
--- a/src/libstd/sys/windows/mod.rs
+++ b/src/libstd/sys/windows/mod.rs
@@ -33,12 +33,21 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
     };
 ) )
 
+pub mod c;
 pub mod fs;
 pub mod os;
-pub mod c;
+pub mod tcp;
+pub mod udp;
+pub mod pipe;
+
+pub mod addrinfo {
+    pub use sys_common::net::get_host_addresses;
+}
 
+// FIXME: move these to c module
 pub type sock_t = libc::SOCKET;
 pub type wrlen = libc::c_int;
+pub type msglen_t = libc::c_int;
 pub unsafe fn close_sock(sock: sock_t) { let _ = libc::closesocket(sock); }
 
 // windows has zero values as errors
@@ -140,7 +149,6 @@ pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
     }
 }
 
-// FIXME: call this
 pub fn init_net() {
     unsafe {
         static START: Once = ONCE_INIT;
diff --git a/src/libnative/io/pipe_windows.rs b/src/libstd/sys/windows/pipe.rs
index f764470f37d..f2f7994a005 100644
--- a/src/libnative/io/pipe_windows.rs
+++ b/src/libstd/sys/windows/pipe.rs
@@ -86,18 +86,17 @@
 
 use alloc::arc::Arc;
 use libc;
-use std::c_str::CString;
-use std::mem;
-use std::os;
-use std::ptr;
-use std::rt::rtio;
-use std::rt::rtio::{IoResult, IoError};
-use std::sync::atomic;
-use std::rt::mutex;
-
-use super::c;
-use super::util;
-use super::file::to_utf16;
+use c_str::CString;
+use mem;
+use ptr;
+use sync::atomic;
+use rt::mutex;
+use io::{mod, IoError, IoResult};
+use prelude::*;
+
+use sys_common::{mod, eof};
+
+use super::{c, os, timer, to_utf16, decode_error_detailed};
 
 struct Event(libc::HANDLE);
 
@@ -177,7 +176,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
     let ms = if deadline == 0 {
         libc::INFINITE as u64
     } else {
-        let now = ::io::timer::now();
+        let now = timer::now();
         if deadline < now {0} else {deadline - now}
     };
     let ret = unsafe {
@@ -190,7 +189,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
         WAIT_FAILED => Err(super::last_error()),
         WAIT_TIMEOUT => unsafe {
             let _ = c::CancelIo(handle);
-            Err(util::timeout("operation timed out"))
+            Err(sys_common::timeout("operation timed out"))
         },
         n => Ok((n - WAIT_OBJECT_0) as uint)
     }
@@ -198,8 +197,8 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
 
 fn epipe() -> IoError {
     IoError {
-        code: libc::ERROR_BROKEN_PIPE as uint,
-        extra: 0,
+        kind: io::EndOfFile,
+        desc: "the pipe has ended",
         detail: None,
     }
 }
@@ -268,8 +267,8 @@ impl UnixStream {
     }
 
     pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
-        let addr = try!(to_utf16(addr));
-        let start = ::io::timer::now();
+        let addr = try!(to_utf16(addr.as_str()));
+        let start = timer::now();
         loop {
             match UnixStream::try_connect(addr.as_ptr()) {
                 Some(handle) => {
@@ -308,13 +307,13 @@ impl UnixStream {
 
             match timeout {
                 Some(timeout) => {
-                    let now = ::io::timer::now();
+                    let now = timer::now();
                     let timed_out = (now - start) >= timeout || unsafe {
                         let ms = (timeout - (now - start)) as libc::DWORD;
                         libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
                     };
                     if timed_out {
-                        return Err(util::timeout("connect timed out"))
+                        return Err(sys_common::timeout("connect timed out"))
                     }
                 }
 
@@ -349,10 +348,8 @@ impl UnixStream {
             _ => Ok(())
         }
     }
-}
 
-impl rtio::RtioPipe for UnixStream {
-    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
+    pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
         if self.read.is_none() {
             self.read = Some(try!(Event::new(true, false)));
         }
@@ -368,7 +365,7 @@ impl rtio::RtioPipe for UnixStream {
         // See comments in close_read() about why this lock is necessary.
         let guard = unsafe { self.inner.lock.lock() };
         if self.read_closed() {
-            return Err(util::eof())
+            return Err(eof())
         }
 
         // Issue a nonblocking requests, succeeding quickly if it happened to
@@ -416,15 +413,15 @@ impl rtio::RtioPipe for UnixStream {
             // If the reading half is now closed, then we're done. If we woke up
             // because the writing half was closed, keep trying.
             if wait_succeeded.is_err() {
-                return Err(util::timeout("read timed out"))
+                return Err(sys_common::timeout("read timed out"))
             }
             if self.read_closed() {
-                return Err(util::eof())
+                return Err(eof())
             }
         }
     }
 
-    fn write(&mut self, buf: &[u8]) -> IoResult<()> {
+    pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
         if self.write.is_none() {
             self.write = Some(try!(Event::new(true, false)));
         }
@@ -458,11 +455,7 @@ impl rtio::RtioPipe for UnixStream {
 
             if ret == 0 {
                 if err != libc::ERROR_IO_PENDING as uint {
-                    return Err(IoError {
-                        code: err as uint,
-                        extra: 0,
-                        detail: Some(os::error_string(err as uint)),
-                    })
+                    return Err(decode_error_detailed(err as i32))
                 }
                 // Process a timeout if one is pending
                 let wait_succeeded = await(self.handle(), self.write_deadline,
@@ -484,12 +477,12 @@ impl rtio::RtioPipe for UnixStream {
                         let amt = offset + bytes_written as uint;
                         return if amt > 0 {
                             Err(IoError {
-                                code: libc::ERROR_OPERATION_ABORTED as uint,
-                                extra: amt,
-                                detail: Some("short write during write".to_string()),
+                                kind: io::ShortWrite(amt),
+                                desc: "short write during write",
+                                detail: None,
                             })
                         } else {
-                            Err(util::timeout("write timed out"))
+                            Err(sys_common::timeout("write timed out"))
                         }
                     }
                     if self.write_closed() {
@@ -503,17 +496,7 @@ impl rtio::RtioPipe for UnixStream {
         Ok(())
     }
 
-    fn clone(&self) -> Box<rtio::RtioPipe + Send> {
-        box UnixStream {
-            inner: self.inner.clone(),
-            read: None,
-            write: None,
-            read_deadline: 0,
-            write_deadline: 0,
-        } as Box<rtio::RtioPipe + Send>
-    }
-
-    fn close_read(&mut self) -> IoResult<()> {
+    pub fn close_read(&mut self) -> IoResult<()> {
         // On windows, there's no actual shutdown() method for pipes, so we're
         // forced to emulate the behavior manually at the application level. To
         // do this, we need to both cancel any pending requests, as well as
@@ -536,23 +519,35 @@ impl rtio::RtioPipe for UnixStream {
         self.cancel_io()
     }
 
-    fn close_write(&mut self) -> IoResult<()> {
+    pub fn close_write(&mut self) -> IoResult<()> {
         // see comments in close_read() for why this lock is necessary
         let _guard = unsafe { self.inner.lock.lock() };
         self.inner.write_closed.store(true, atomic::SeqCst);
         self.cancel_io()
     }
 
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
         self.read_deadline = deadline;
         self.write_deadline = deadline;
     }
-    fn set_read_timeout(&mut self, timeout: Option<u64>) {
-        self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
     }
-    fn set_write_timeout(&mut self, timeout: Option<u64>) {
-        self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+}
+
+impl Clone for UnixStream {
+    fn clone(&self) -> UnixStream {
+        UnixStream {
+            inner: self.inner.clone(),
+            read: None,
+            write: None,
+            read_deadline: 0,
+            write_deadline: 0,
+        }
     }
 }
 
@@ -570,7 +565,7 @@ impl UnixListener {
         // Although we technically don't need the pipe until much later, we
         // create the initial handle up front to test the validity of the name
         // and such.
-        let addr_v = try!(to_utf16(addr));
+        let addr_v = try!(to_utf16(addr.as_str()));
         let ret = unsafe { pipe(addr_v.as_ptr(), true) };
         if ret == libc::INVALID_HANDLE_VALUE {
             Err(super::last_error())
@@ -579,7 +574,7 @@ impl UnixListener {
         }
     }
 
-    pub fn native_listen(self) -> IoResult<UnixAcceptor> {
+    pub fn listen(self) -> IoResult<UnixAcceptor> {
         Ok(UnixAcceptor {
             listener: self,
             event: try!(Event::new(true, false)),
@@ -598,15 +593,6 @@ impl Drop for UnixListener {
     }
 }
 
-impl rtio::RtioUnixListener for UnixListener {
-    fn listen(self: Box<UnixListener>)
-              -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
-        self.native_listen().map(|a| {
-            box a as Box<rtio::RtioUnixAcceptor + Send>
-        })
-    }
-}
-
 pub struct UnixAcceptor {
     inner: Arc<AcceptorState>,
     listener: UnixListener,
@@ -620,7 +606,7 @@ struct AcceptorState {
 }
 
 impl UnixAcceptor {
-    pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+    pub fn accept(&mut self) -> IoResult<UnixStream> {
         // This function has some funky implementation details when working with
         // unix pipes. On windows, each server named pipe handle can be
         // connected to a one or zero clients. To the best of my knowledge, a
@@ -657,9 +643,9 @@ impl UnixAcceptor {
 
         // If we've had an artificial call to close_accept, be sure to never
         // proceed in accepting new clients in the future
-        if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
+        if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) }
 
-        let name = try!(to_utf16(&self.listener.name));
+        let name = try!(to_utf16(self.listener.name.as_str()));
 
         // Once we've got a "server handle", we need to wait for a client to
         // connect. The ConnectNamedPipe function will block this thread until
@@ -691,7 +677,7 @@ impl UnixAcceptor {
                     if wait_succeeded.is_ok() {
                         err = unsafe { libc::GetLastError() };
                     } else {
-                        return Err(util::timeout("accept timed out"))
+                        return Err(sys_common::timeout("accept timed out"))
                     }
                 } else {
                     // we succeeded, bypass the check below
@@ -727,19 +713,28 @@ impl UnixAcceptor {
             write_deadline: 0,
         })
     }
-}
 
-impl rtio::RtioUnixAcceptor for UnixAcceptor {
-    fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
-        self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0);
     }
-    fn set_timeout(&mut self, timeout: Option<u64>) {
-        self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
+
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let ret = unsafe {
+            c::SetEvent(self.inner.abort.handle())
+        };
+        if ret == 0 {
+            Err(super::last_error())
+        } else {
+            Ok(())
+        }
     }
+}
 
-    fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
-        let name = to_utf16(&self.listener.name).ok().unwrap();
-        box UnixAcceptor {
+impl Clone for UnixAcceptor {
+    fn clone(&self) -> UnixAcceptor {
+        let name = to_utf16(self.listener.name.as_str()).ok().unwrap();
+        UnixAcceptor {
             inner: self.inner.clone(),
             event: Event::new(true, false).ok().unwrap(),
             deadline: 0,
@@ -751,19 +746,6 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
                     p
                 },
             },
-        } as Box<rtio::RtioUnixAcceptor + Send>
-    }
-
-    fn close_accept(&mut self) -> IoResult<()> {
-        self.inner.closed.store(true, atomic::SeqCst);
-        let ret = unsafe {
-            c::SetEvent(self.inner.abort.handle())
-        };
-        if ret == 0 {
-            Err(super::last_error())
-        } else {
-            Ok(())
         }
     }
 }
-
diff --git a/src/libstd/sys/windows/tcp.rs b/src/libstd/sys/windows/tcp.rs
new file mode 100644
index 00000000000..3baf2be08d2
--- /dev/null
+++ b/src/libstd/sys/windows/tcp.rs
@@ -0,0 +1,219 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use io::net::ip;
+use io::IoResult;
+use libc;
+use mem;
+use ptr;
+use prelude::*;
+use super::{last_error, last_net_error, retry, sock_t};
+use sync::{Arc, atomic};
+use sys::fs::FileDesc;
+use sys::{mod, c, set_nonblocking, wouldblock, timer};
+use sys_common::{mod, timeout, eof};
+use sys_common::net::*;
+
+pub use sys_common::net::TcpStream;
+
+pub struct Event(c::WSAEVENT);
+
+impl Event {
+    pub fn new() -> IoResult<Event> {
+        let event = unsafe { c::WSACreateEvent() };
+        if event == c::WSA_INVALID_EVENT {
+            Err(super::last_error())
+        } else {
+            Ok(Event(event))
+        }
+    }
+
+    pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
+}
+
+impl Drop for Event {
+    fn drop(&mut self) {
+        unsafe { let _ = c::WSACloseEvent(self.handle()); }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// TCP listeners
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpListener {
+    inner: FileDesc,
+}
+
+impl TcpListener {
+    pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
+        sys::init_net();
+
+        let fd = try!(socket(addr, libc::SOCK_STREAM));
+        let ret = TcpListener { inner: FileDesc::new(fd as libc::c_int, true) };
+
+        let mut storage = unsafe { mem::zeroed() };
+        let len = addr_to_sockaddr(addr, &mut storage);
+        let addrp = &storage as *const _ as *const libc::sockaddr;
+
+        match unsafe { libc::bind(fd, addrp, len) } {
+            -1 => Err(last_net_error()),
+            _ => Ok(ret),
+        }
+    }
+
+    pub fn fd(&self) -> sock_t { self.inner.fd as sock_t }
+
+    pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> {
+        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
+            -1 => Err(last_net_error()),
+
+            _ => {
+                let accept = try!(Event::new());
+                let ret = unsafe {
+                    c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
+                };
+                if ret != 0 {
+                    return Err(last_net_error())
+                }
+                Ok(TcpAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        abort: try!(Event::new()),
+                        accept: accept,
+                        closed: atomic::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
+        }
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+}
+
+pub struct TcpAcceptor {
+    inner: Arc<AcceptorInner>,
+    deadline: u64,
+}
+
+struct AcceptorInner {
+    listener: TcpListener,
+    abort: Event,
+    accept: Event,
+    closed: atomic::AtomicBool,
+}
+
+impl TcpAcceptor {
+    pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
+
+    pub fn accept(&mut self) -> IoResult<TcpStream> {
+        // Unlink unix, windows cannot invoke `select` on arbitrary file
+        // descriptors like pipes, only sockets. Consequently, windows cannot
+        // use the same implementation as unix for accept() when close_accept()
+        // is considered.
+        //
+        // In order to implement close_accept() and timeouts, windows uses
+        // event handles. An acceptor-specific abort event is created which
+        // will only get set in close_accept(), and it will never be un-set.
+        // Additionally, another acceptor-specific event is associated with the
+        // FD_ACCEPT network event.
+        //
+        // These two events are then passed to WaitForMultipleEvents to see
+        // which one triggers first, and the timeout passed to this function is
+        // the local timeout for the acceptor.
+        //
+        // If the wait times out, then the accept timed out. If the wait
+        // succeeds with the abort event, then we were closed, and if the wait
+        // succeeds otherwise, then we do a nonblocking poll via `accept` to
+        // see if we can accept a connection. The connection is candidate to be
+        // stolen, so we do all of this in a loop as well.
+        let events = [self.inner.abort.handle(), self.inner.accept.handle()];
+
+        while !self.inner.closed.load(atomic::SeqCst) {
+            let ms = if self.deadline == 0 {
+                c::WSA_INFINITE as u64
+            } else {
+                let now = timer::now();
+                if self.deadline < now {0} else {self.deadline - now}
+            };
+            let ret = unsafe {
+                c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
+                                            ms as libc::DWORD, libc::FALSE)
+            };
+            match ret {
+                c::WSA_WAIT_TIMEOUT => {
+                    return Err(timeout("accept timed out"))
+                }
+                c::WSA_WAIT_FAILED => return Err(last_net_error()),
+                c::WSA_WAIT_EVENT_0 => break,
+                n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
+            }
+
+            let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
+            let ret = unsafe {
+                c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
+            };
+            if ret != 0 { return Err(last_net_error()) }
+
+            if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
+            match unsafe {
+                libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
+            } {
+                -1 if wouldblock() => {}
+                -1 => return Err(last_net_error()),
+
+                // Accepted sockets inherit the same properties as the caller,
+                // so we need to deregister our event and switch the socket back
+                // to blocking mode
+                fd => {
+                    let stream = TcpStream::new(fd);
+                    let ret = unsafe {
+                        c::WSAEventSelect(fd, events[1], 0)
+                    };
+                    if ret != 0 { return Err(last_net_error()) }
+                    try!(set_nonblocking(fd, false));
+                    return Ok(stream)
+                }
+            }
+        }
+
+        Err(eof())
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
+        if ret == libc::TRUE {
+            Ok(())
+        } else {
+            Err(last_net_error())
+        }
+    }
+}
+
+impl Clone for TcpAcceptor {
+    fn clone(&self) -> TcpAcceptor {
+        TcpAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        }
+    }
+}
diff --git a/src/libstd/sys/windows/udp.rs b/src/libstd/sys/windows/udp.rs
new file mode 100644
index 00000000000..50f8fb828ad
--- /dev/null
+++ b/src/libstd/sys/windows/udp.rs
@@ -0,0 +1,11 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+pub use sys_common::net::UdpSocket;