about summary refs log tree commit diff
path: root/src/libstd/io
diff options
context:
space:
mode:
authorAaron Turon <aturon@mozilla.com>2014-10-10 10:11:49 -0700
committerAaron Turon <aturon@mozilla.com>2014-11-08 20:40:38 -0800
commitd34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41 (patch)
treebdb9af03a1b73d4edc9ae5e6193a010c9b2b4edc /src/libstd/io
parent0c1e1ff1e300868a29405a334e65eae690df971d (diff)
downloadrust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.tar.gz
rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.zip
Runtime removal: refactor pipes and networking
This patch continues the runtime removal by moving pipe and
networking-related code into `sys`.

Because this eliminates APIs in `libnative` and `librustrt`, it is a:

[breaking-change]

This functionality is likely to be available publicly, in some form,
from `std` in the future.
Diffstat (limited to 'src/libstd/io')
-rw-r--r--src/libstd/io/net/addrinfo.rs32
-rw-r--r--src/libstd/io/net/mod.rs45
-rw-r--r--src/libstd/io/net/pipe.rs65
-rw-r--r--src/libstd/io/net/tcp.rs105
-rw-r--r--src/libstd/io/net/udp.rs60
-rw-r--r--src/libstd/io/pipe.rs58
6 files changed, 120 insertions, 245 deletions
diff --git a/src/libstd/io/net/addrinfo.rs b/src/libstd/io/net/addrinfo.rs
index 3c72f58b10d..22775d54eff 100644
--- a/src/libstd/io/net/addrinfo.rs
+++ b/src/libstd/io/net/addrinfo.rs
@@ -20,12 +20,10 @@ getaddrinfo()
 #![allow(missing_docs)]
 
 use iter::Iterator;
-use io::{IoResult, IoError};
+use io::{IoResult};
 use io::net::ip::{SocketAddr, IpAddr};
 use option::{Option, Some, None};
-use result::{Ok, Err};
-use rt::rtio::{IoFactory, LocalIo};
-use rt::rtio;
+use sys;
 use vec::Vec;
 
 /// Hints to the types of sockets that are desired when looking up hosts
@@ -94,31 +92,7 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> {
 #[allow(unused_variables)]
 fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>)
           -> IoResult<Vec<Info>> {
-    let hint = hint.map(|Hint { family, socktype, protocol, flags }| {
-        rtio::AddrinfoHint {
-            family: family,
-            socktype: 0, // FIXME: this should use the above variable
-            protocol: 0, // FIXME: this should use the above variable
-            flags: flags,
-        }
-    });
-    match LocalIo::maybe_raise(|io| {
-        io.get_host_addresses(hostname, servname, hint)
-    }) {
-        Ok(v) => Ok(v.into_iter().map(|info| {
-            Info {
-                address: SocketAddr {
-                    ip: super::from_rtio(info.address.ip),
-                    port: info.address.port,
-                },
-                family: info.family,
-                socktype: None, // FIXME: this should use the above variable
-                protocol: None, // FIXME: this should use the above variable
-                flags: info.flags,
-            }
-        }).collect()),
-        Err(e) => Err(IoError::from_rtio_error(e)),
-    }
+    sys::addrinfo::get_host_addresses(hostname, servname, hint)
 }
 
 // Ignored on android since we cannot give tcp/ip
diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs
index b9b50a55a10..5b1747876d7 100644
--- a/src/libstd/io/net/mod.rs
+++ b/src/libstd/io/net/mod.rs
@@ -12,9 +12,8 @@
 
 use io::{IoError, IoResult, InvalidInput};
 use option::None;
-use result::{Result, Ok, Err};
-use rt::rtio;
-use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr, SocketAddr, ToSocketAddr};
+use result::{Ok, Err};
+use self::ip::{SocketAddr, ToSocketAddr};
 
 pub use self::addrinfo::get_host_addresses;
 
@@ -24,46 +23,6 @@ pub mod udp;
 pub mod ip;
 pub mod pipe;
 
-fn to_rtio(ip: IpAddr) -> rtio::IpAddr {
-    match ip {
-        Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d),
-        Ipv6Addr(a, b, c, d, e, f, g, h) => {
-            rtio::Ipv6Addr(a, b, c, d, e, f, g, h)
-        }
-    }
-}
-
-fn from_rtio(ip: rtio::IpAddr) -> IpAddr {
-    match ip {
-        rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d),
-        rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
-            Ipv6Addr(a, b, c, d, e, f, g, h)
-        }
-    }
-}
-
-fn with_addresses_io<A: ToSocketAddr, T>(
-    addr: A,
-    action: |&mut rtio::IoFactory, rtio::SocketAddr| -> Result<T, rtio::IoError>
-) -> Result<T, IoError> {
-    const DEFAULT_ERROR: IoError = IoError {
-        kind: InvalidInput,
-        desc: "no addresses found for hostname",
-        detail: None
-    };
-
-    let addresses = try!(addr.to_socket_addr_all());
-    let mut err = DEFAULT_ERROR;
-    for addr in addresses.into_iter() {
-        let addr = rtio::SocketAddr { ip: to_rtio(addr.ip), port: addr.port };
-        match rtio::LocalIo::maybe_raise(|io| action(io, addr)) {
-            Ok(r) => return Ok(r),
-            Err(e) => err = IoError::from_rtio_error(e)
-        }
-    }
-    Err(err)
-}
-
 fn with_addresses<A: ToSocketAddr, T>(addr: A, action: |SocketAddr| -> IoResult<T>)
     -> IoResult<T> {
     const DEFAULT_ERROR: IoError = IoError {
diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs
index 8c7deadebea..111b0f2b081 100644
--- a/src/libstd/io/net/pipe.rs
+++ b/src/libstd/io/net/pipe.rs
@@ -26,17 +26,20 @@ instances as clients.
 
 use prelude::*;
 
-use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error};
-use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
-use rt::rtio::{RtioUnixAcceptor, RtioPipe};
+use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
 use time::Duration;
 
+use sys::pipe::UnixStream as UnixStreamImp;
+use sys::pipe::UnixListener as UnixListenerImp;
+use sys::pipe::UnixAcceptor as UnixAcceptorImp;
+
 /// A stream which communicates over a named pipe.
 pub struct UnixStream {
-    obj: Box<RtioPipe + Send>,
+    inner: UnixStreamImp,
 }
 
 impl UnixStream {
+
     /// Connect to a pipe named by `path`. This will attempt to open a
     /// connection to the underlying socket.
     ///
@@ -53,9 +56,8 @@ impl UnixStream {
     /// stream.write([1, 2, 3]);
     /// ```
     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
-        LocalIo::maybe_raise(|io| {
-            io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
-        }).map_err(IoError::from_rtio_error)
+        UnixStreamImp::connect(&path.to_c_str(), None)
+            .map(|inner| UnixStream { inner: inner })
     }
 
     /// Connect to a pipe named by `path`, timing out if the specified number of
@@ -73,10 +75,8 @@ impl UnixStream {
             return Err(standard_error(TimedOut));
         }
 
-        LocalIo::maybe_raise(|io| {
-            let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64));
-            s.map(|p| UnixStream { obj: p })
-        }).map_err(IoError::from_rtio_error)
+        UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64))
+            .map(|inner| UnixStream { inner: inner })
     }
 
 
@@ -88,7 +88,7 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_read(&mut self) -> IoResult<()> {
-        self.obj.close_read().map_err(IoError::from_rtio_error)
+        self.inner.close_read()
     }
 
     /// Closes the writing half of this connection.
@@ -99,7 +99,7 @@ impl UnixStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> {
-        self.obj.close_write().map_err(IoError::from_rtio_error)
+        self.inner.close_write()
     }
 
     /// Sets the read/write timeout for this socket.
@@ -107,7 +107,7 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the read timeout for this socket.
@@ -115,7 +115,7 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the write timeout for this socket.
@@ -123,36 +123,35 @@ impl UnixStream {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
 impl Clone for UnixStream {
     fn clone(&self) -> UnixStream {
-        UnixStream { obj: self.obj.clone() }
+        UnixStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for UnixStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for UnixStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }
 
 /// A value that can listen for incoming named pipe connection requests.
 pub struct UnixListener {
     /// The internal, opaque runtime Unix listener.
-    obj: Box<RtioUnixListener + Send>,
+    inner: UnixListenerImp,
 }
 
 impl UnixListener {
-
     /// Creates a new listener, ready to receive incoming connections on the
     /// specified socket. The server will be named by `path`.
     ///
@@ -175,24 +174,22 @@ impl UnixListener {
     /// # }
     /// ```
     pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
-        LocalIo::maybe_raise(|io| {
-            io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
-        }).map_err(IoError::from_rtio_error)
+        UnixListenerImp::bind(&path.to_c_str())
+            .map(|inner| UnixListener { inner: inner })
     }
 }
 
 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
     fn listen(self) -> IoResult<UnixAcceptor> {
-        self.obj.listen().map(|obj| {
-            UnixAcceptor { obj: obj }
-        }).map_err(IoError::from_rtio_error)
+        self.inner.listen()
+            .map(|inner| UnixAcceptor { inner: inner })
     }
 }
 
 /// A value that can accept named pipe connections, returned from `listen()`.
 pub struct UnixAcceptor {
     /// The internal, opaque runtime Unix acceptor.
-    obj: Box<RtioUnixAcceptor + Send>,
+    inner: UnixAcceptorImp
 }
 
 impl UnixAcceptor {
@@ -210,7 +207,7 @@ impl UnixAcceptor {
     #[experimental = "the name and arguments to this function are likely \
                       to change"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Closes the accepting capabilities of this acceptor.
@@ -219,15 +216,15 @@ impl UnixAcceptor {
     /// more information can be found in that documentation.
     #[experimental]
     pub fn close_accept(&mut self) -> IoResult<()> {
-        self.obj.close_accept().map_err(IoError::from_rtio_error)
+        self.inner.close_accept()
     }
 }
 
 impl Acceptor<UnixStream> for UnixAcceptor {
     fn accept(&mut self) -> IoResult<UnixStream> {
-        self.obj.accept().map(|s| {
-            UnixStream { obj: s }
-        }).map_err(IoError::from_rtio_error)
+        self.inner.accept().map(|s| {
+            UnixStream { inner: s }
+        })
     }
 }
 
@@ -246,7 +243,7 @@ impl Clone for UnixAcceptor {
     /// This function is useful for creating a handle to invoke `close_accept`
     /// on to wake up any other task blocked in `accept`.
     fn clone(&self) -> UnixAcceptor {
-        UnixAcceptor { obj: self.obj.clone() }
+        UnixAcceptor { inner: self.inner.clone() }
     }
 }
 
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 928c8586739..2545e07cbb5 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -20,19 +20,17 @@
 use clone::Clone;
 use io::IoResult;
 use iter::Iterator;
-use result::{Ok,Err};
+use result::Err;
 use io::net::ip::{SocketAddr, ToSocketAddr};
-use io::IoError;
 use io::{Reader, Writer, Listener, Acceptor};
 use io::{standard_error, TimedOut};
-use kinds::Send;
 use option::{None, Some, Option};
-use boxed::Box;
-use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener};
-use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
-use rt::rtio;
 use time::Duration;
 
+use sys::tcp::TcpStream as TcpStreamImp;
+use sys::tcp::TcpListener as TcpListenerImp;
+use sys::tcp::TcpAcceptor as TcpAcceptorImp;
+
 /// A structure which represents a TCP stream between a local socket and a
 /// remote socket.
 ///
@@ -50,12 +48,12 @@ use time::Duration;
 /// drop(stream); // close the connection
 /// ```
 pub struct TcpStream {
-    obj: Box<RtioTcpStream + Send>,
+    inner: TcpStreamImp,
 }
 
 impl TcpStream {
-    fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
-        TcpStream { obj: s }
+    fn new(s: TcpStreamImp) -> TcpStream {
+        TcpStream { inner: s }
     }
 
     /// Open a TCP connection to a remote host.
@@ -64,7 +62,9 @@ impl TcpStream {
     /// trait can be supplied for the address; see this trait documentation for
     /// concrete examples.
     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
-        super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new))
+        super::with_addresses(addr, |addr| {
+            TcpStreamImp::connect(addr, None).map(TcpStream::new)
+        })
     }
 
     /// Creates a TCP connection to a remote socket address, timing out after
@@ -86,39 +86,26 @@ impl TcpStream {
             return Err(standard_error(TimedOut));
         }
 
-        super::with_addresses_io(addr, |io, addr|
-            io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
-        )
+        super::with_addresses(addr, |addr| {
+            TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
+                .map(TcpStream::new)
+        })
     }
 
     /// Returns the socket address of the remote peer of this TCP connection.
     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.peer_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.peer_name()
     }
 
     /// Returns the socket address of the local half of this TCP connection.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.socket_name()
     }
 
     /// Sets the nodelay flag on this connection to the boolean specified
     #[experimental]
     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
-        if nodelay {
-            self.obj.nodelay()
-        } else {
-            self.obj.control_congestion()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_nodelay(nodelay)
     }
 
     /// Sets the keepalive timeout to the timeout specified.
@@ -128,10 +115,7 @@ impl TcpStream {
     /// specified time, in seconds.
     #[experimental]
     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
-        match delay_in_seconds {
-            Some(i) => self.obj.keepalive(i),
-            None => self.obj.letdie(),
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_keepalive(delay_in_seconds)
     }
 
     /// Closes the reading half of this connection.
@@ -165,7 +149,7 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_read(&mut self) -> IoResult<()> {
-        self.obj.close_read().map_err(IoError::from_rtio_error)
+        self.inner.close_read()
     }
 
     /// Closes the writing half of this connection.
@@ -176,7 +160,7 @@ impl TcpStream {
     /// Note that this method affects all cloned handles associated with this
     /// stream, not just this one handle.
     pub fn close_write(&mut self) -> IoResult<()> {
-        self.obj.close_write().map_err(IoError::from_rtio_error)
+        self.inner.close_write()
     }
 
     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
@@ -198,7 +182,7 @@ impl TcpStream {
     /// take a look at `set_read_timeout` and `set_write_timeout`.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the timeout for read operations on this stream.
@@ -215,7 +199,7 @@ impl TcpStream {
     /// during the timeout period.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the timeout for write operations on this stream.
@@ -242,7 +226,7 @@ impl TcpStream {
     /// asynchronous fashion after the call to write returns.
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
@@ -256,19 +240,19 @@ impl Clone for TcpStream {
     /// Instead, the first read will receive the first packet received, and the
     /// second read will receive the second packet.
     fn clone(&self) -> TcpStream {
-        TcpStream { obj: self.obj.clone() }
+        TcpStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for TcpStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }
 
@@ -309,7 +293,7 @@ impl Writer for TcpStream {
 /// # }
 /// ```
 pub struct TcpListener {
-    obj: Box<RtioTcpListener + Send>,
+    inner: TcpListenerImp,
 }
 
 impl TcpListener {
@@ -324,26 +308,20 @@ impl TcpListener {
     /// The address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
-        super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l }))
+        super::with_addresses(addr, |addr| {
+            TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
+        })
     }
 
     /// Returns the local socket address of this listener.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(rtio::SocketAddr { ip, port }) => {
-                Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.socket_name()
     }
 }
 
 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
     fn listen(self) -> IoResult<TcpAcceptor> {
-        match self.obj.listen() {
-            Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
     }
 }
 
@@ -351,7 +329,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
 /// `TcpStream` instances.
 pub struct TcpAcceptor {
-    obj: Box<RtioTcpAcceptor + Send>,
+    inner: TcpAcceptorImp,
 }
 
 impl TcpAcceptor {
@@ -399,7 +377,7 @@ impl TcpAcceptor {
     /// ```
     #[experimental = "the type of the argument and name of this function are \
                       subject to change"]
-    pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
+    pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
 
     /// Closes the accepting capabilities of this acceptor.
     ///
@@ -445,16 +423,13 @@ impl TcpAcceptor {
     /// ```
     #[experimental]
     pub fn close_accept(&mut self) -> IoResult<()> {
-        self.obj.close_accept().map_err(IoError::from_rtio_error)
+        self.inner.close_accept()
     }
 }
 
 impl Acceptor<TcpStream> for TcpAcceptor {
     fn accept(&mut self) -> IoResult<TcpStream> {
-        match self.obj.accept(){
-            Ok(s) => Ok(TcpStream::new(s)),
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.accept().map(TcpStream::new)
     }
 }
 
@@ -473,7 +448,7 @@ impl Clone for TcpAcceptor {
     /// This function is useful for creating a handle to invoke `close_accept`
     /// on to wake up any other task blocked in `accept`.
     fn clone(&self) -> TcpAcceptor {
-        TcpAcceptor { obj: self.obj.clone() }
+        TcpAcceptor { inner: self.inner.clone() }
     }
 }
 
@@ -1112,8 +1087,6 @@ mod test {
 
     #[test]
     fn shutdown_smoke() {
-        use rt::rtio::RtioTcpStream;
-
         let addr = next_test_ip4();
         let a = TcpListener::bind(addr).unwrap().listen();
         spawn(proc() {
@@ -1124,7 +1097,7 @@ mod test {
         });
 
         let mut s = TcpStream::connect(addr).unwrap();
-        assert!(s.obj.close_write().is_ok());
+        assert!(s.inner.close_write().is_ok());
         assert!(s.write([1]).is_err());
         assert_eq!(s.read_to_end(), Ok(vec!(1)));
     }
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index 4ae054beadb..31b61989647 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -17,13 +17,10 @@
 
 use clone::Clone;
 use io::net::ip::{SocketAddr, IpAddr, ToSocketAddr};
-use io::{Reader, Writer, IoResult, IoError};
-use kinds::Send;
-use boxed::Box;
+use io::{Reader, Writer, IoResult};
 use option::Option;
 use result::{Ok, Err};
-use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory};
-use rt::rtio;
+use sys::udp::UdpSocket as UdpSocketImp;
 
 /// A User Datagram Protocol socket.
 ///
@@ -60,7 +57,7 @@ use rt::rtio;
 /// }
 /// ```
 pub struct UdpSocket {
-    obj: Box<RtioUdpSocket + Send>,
+    inner: UdpSocketImp,
 }
 
 impl UdpSocket {
@@ -69,18 +66,15 @@ impl UdpSocket {
     /// Address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> {
-        super::with_addresses_io(addr, |io, addr| io.udp_bind(addr).map(|s| UdpSocket { obj: s }))
+        super::with_addresses(addr, |addr| {
+            UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s })
+        })
     }
 
     /// Receives data from the socket. On success, returns the number of bytes
     /// read and the address from whence the data came.
     pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> {
-        match self.obj.recv_from(buf) {
-            Ok((amt, rtio::SocketAddr { ip, port })) => {
-                Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port }))
-            }
-            Err(e) => Err(IoError::from_rtio_error(e)),
-        }
+        self.inner.recv_from(buf)
     }
 
     /// Sends data on the socket to the given address. Returns nothing on
@@ -89,10 +83,7 @@ impl UdpSocket {
     /// Address type can be any implementor of `ToSocketAddr` trait. See its
     /// documentation for concrete examples.
     pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> {
-        super::with_addresses(addr, |addr| self.obj.send_to(buf, rtio::SocketAddr {
-            ip: super::to_rtio(addr.ip),
-            port: addr.port,
-        }).map_err(IoError::from_rtio_error))
+        super::with_addresses(addr, |addr| self.inner.send_to(buf, addr))
     }
 
     /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
@@ -112,24 +103,19 @@ impl UdpSocket {
 
     /// Returns the socket address that this socket was created from.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
-        match self.obj.socket_name() {
-            Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }),
-            Err(e) => Err(IoError::from_rtio_error(e))
-        }
+        self.inner.socket_name()
     }
 
     /// Joins a multicast IP address (becomes a member of it)
     #[experimental]
     pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
-        let e = self.obj.join_multicast(super::to_rtio(multi));
-        e.map_err(IoError::from_rtio_error)
+        self.inner.join_multicast(multi)
     }
 
     /// Leaves a multicast IP address (drops membership from it)
     #[experimental]
     pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
-        let e = self.obj.leave_multicast(super::to_rtio(multi));
-        e.map_err(IoError::from_rtio_error)
+        self.inner.leave_multicast(multi)
     }
 
     /// Set the multicast loop flag to the specified value
@@ -137,33 +123,25 @@ impl UdpSocket {
     /// This lets multicast packets loop back to local sockets (if enabled)
     #[experimental]
     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
-        if on {
-            self.obj.loop_multicast_locally()
-        } else {
-            self.obj.dont_loop_multicast_locally()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_multicast_loop(on)
     }
 
     /// Sets the multicast TTL
     #[experimental]
     pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
-        self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error)
+        self.inner.multicast_time_to_live(ttl)
     }
 
     /// Sets this socket's TTL
     #[experimental]
     pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
-        self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error)
+        self.inner.time_to_live(ttl)
     }
 
     /// Sets the broadcast flag on or off
     #[experimental]
     pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> {
-        if broadcast {
-            self.obj.hear_broadcasts()
-        } else {
-            self.obj.ignore_broadcasts()
-        }.map_err(IoError::from_rtio_error)
+        self.inner.set_broadcast(broadcast)
     }
 
     /// Sets the read/write timeout for this socket.
@@ -171,7 +149,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_timeout(timeout_ms)
+        self.inner.set_timeout(timeout_ms)
     }
 
     /// Sets the read timeout for this socket.
@@ -179,7 +157,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_read_timeout(timeout_ms)
+        self.inner.set_read_timeout(timeout_ms)
     }
 
     /// Sets the write timeout for this socket.
@@ -187,7 +165,7 @@ impl UdpSocket {
     /// For more information, see `TcpStream::set_timeout`
     #[experimental = "the timeout argument may change in type and value"]
     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
-        self.obj.set_write_timeout(timeout_ms)
+        self.inner.set_write_timeout(timeout_ms)
     }
 }
 
@@ -201,7 +179,7 @@ impl Clone for UdpSocket {
     /// received, and the second read will receive the second packet.
     fn clone(&self) -> UdpSocket {
         UdpSocket {
-            obj: self.obj.clone(),
+            inner: self.inner.clone(),
         }
     }
 }
diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs
index c77cffd561e..64b2518fab1 100644
--- a/src/libstd/io/pipe.rs
+++ b/src/libstd/io/pipe.rs
@@ -17,15 +17,17 @@
 
 use prelude::*;
 
-use io::{IoResult, IoError};
+use io::IoResult;
 use libc;
-use os;
-use rt::rtio::{RtioPipe, LocalIo};
+use sync::Arc;
+
+use sys_common;
+use sys;
+use sys::fs::FileDesc as FileDesc;
 
 /// A synchronous, in-memory pipe.
 pub struct PipeStream {
-    /// The internal, opaque runtime pipe object.
-    obj: Box<RtioPipe + Send>,
+    inner: Arc<FileDesc>
 }
 
 pub struct PipePair {
@@ -55,14 +57,14 @@ impl PipeStream {
     /// }
     /// ```
     pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
-        LocalIo::maybe_raise(|io| {
-            io.pipe_open(fd).map(|obj| PipeStream { obj: obj })
-        }).map_err(IoError::from_rtio_error)
+        Ok(PipeStream::from_filedesc(FileDesc::new(fd, true)))
     }
 
+    // FIXME: expose this some other way
+    /// Wrap a FileDesc directly, taking ownership.
     #[doc(hidden)]
-    pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream {
-        PipeStream { obj: inner }
+    pub fn from_filedesc(fd: FileDesc) -> PipeStream {
+        PipeStream { inner: Arc::new(fd) }
     }
 
     /// Creates a pair of in-memory OS pipes for a unidirectional communication
@@ -76,43 +78,35 @@ impl PipeStream {
     /// This function can fail to succeed if the underlying OS has run out of
     /// available resources to allocate a new pipe.
     pub fn pair() -> IoResult<PipePair> {
-        struct Closer { fd: libc::c_int }
-
-        let os::Pipe { reader, writer } = try!(unsafe { os::pipe() });
-        let mut reader = Closer { fd: reader };
-        let mut writer = Closer { fd: writer };
-
-        let io_reader = try!(PipeStream::open(reader.fd));
-        reader.fd = -1;
-        let io_writer = try!(PipeStream::open(writer.fd));
-        writer.fd = -1;
-        return Ok(PipePair { reader: io_reader, writer: io_writer });
-
-        impl Drop for Closer {
-            fn drop(&mut self) {
-                if self.fd != -1 {
-                    let _ = unsafe { libc::close(self.fd) };
-                }
-            }
-        }
+        let (reader, writer) = try!(unsafe { sys::os::pipe() });
+        Ok(PipePair {
+            reader: PipeStream::from_filedesc(reader),
+            writer: PipeStream::from_filedesc(writer),
+        })
+    }
+}
+
+impl sys_common::AsFileDesc for PipeStream {
+    fn as_fd(&self) -> &sys::fs::FileDesc {
+        &*self.inner
     }
 }
 
 impl Clone for PipeStream {
     fn clone(&self) -> PipeStream {
-        PipeStream { obj: self.obj.clone() }
+        PipeStream { inner: self.inner.clone() }
     }
 }
 
 impl Reader for PipeStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        self.obj.read(buf).map_err(IoError::from_rtio_error)
+        self.inner.read(buf)
     }
 }
 
 impl Writer for PipeStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        self.obj.write(buf).map_err(IoError::from_rtio_error)
+        self.inner.write(buf)
     }
 }