about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-02-07 10:37:58 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-02-16 18:46:01 -0800
commita526aa139ec1c95cbad4c1d3187c437eb45d4bae (patch)
tree6a0092a8ac88e5842f00b74df7314faa9074de04
parent94b2d9dc4dd864b481bcf279921bc7ea796355e5 (diff)
downloadrust-a526aa139ec1c95cbad4c1d3187c437eb45d4bae.tar.gz
rust-a526aa139ec1c95cbad4c1d3187c437eb45d4bae.zip
Implement named pipes for windows, touch up unix
* Implementation of pipe_win32 filled out for libnative
* Reorganize pipes to be clone-able
* Fix a few file descriptor leaks on error
* Factor out some common code into shared functions
* Make use of the if_ok!() macro for less indentation

Closes #11201
-rw-r--r--src/libnative/io/mod.rs9
-rw-r--r--src/libnative/io/net.rs53
-rw-r--r--src/libnative/io/pipe_unix.rs349
-rw-r--r--src/libnative/io/pipe_win32.rs492
-rw-r--r--src/libstd/io/net/mod.rs2
-rw-r--r--src/libstd/io/net/unix.rs101
-rw-r--r--src/libstd/libc.rs57
-rw-r--r--src/libstd/rt/rtio.rs5
8 files changed, 780 insertions, 288 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index ad0d7270c1a..0f9439b3eb5 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -61,11 +61,11 @@ pub mod timer;
 pub mod timer;
 
 #[cfg(unix)]
-#[path = "path_unix.rs"]
+#[path = "pipe_unix.rs"]
 pub mod pipe;
 
 #[cfg(windows)]
-#[path = "path_win32.rs"]
+#[path = "pipe_win32.rs"]
 pub mod pipe;
 
 mod timer_helper;
@@ -85,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
     fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) {
         match errno {
             libc::EOF => (io::EndOfFile, "end of file"),
+            libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"),
+            libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"),
+            libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"),
             libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"),
             libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"),
             libc::WSAEACCES => (io::PermissionDenied, "permission denied"),
@@ -94,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
             libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"),
             libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
             libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"),
+            libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"),
 
             x => {
                 debug!("ignoring {}: {}", x, os::last_os_error());
@@ -116,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
             libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"),
             libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
             libc::EADDRINUSE => (io::ConnectionRefused, "address in use"),
+            libc::ENOENT => (io::FileNotFound, "no such file or directory"),
 
             // These two constants can have the same value on some systems, but
             // different values on others, so we can't use a match clause
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index dce890dc129..b33b54862dc 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::c_str::CString;
 use std::cast;
 use std::io::net::ip;
 use std::io;
@@ -16,7 +15,6 @@ use std::libc;
 use std::mem;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
-use std::unstable::intrinsics;
 
 use super::{IoResult, retry};
 use super::file::keep_going;
@@ -90,30 +88,6 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
     }
 }
 
-fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
-    // the sun_path length is limited to SUN_LEN (with null)
-    if addr.len() > libc::sun_len -1 {
-        return Err(io::IoError {
-            kind: io::OtherIoError,
-            desc: "path must be smaller than SUN_LEN",
-            detail: None,
-        })
-    }
-    unsafe {
-        let storage: libc::sockaddr_storage = intrinsics::init();
-        let s: *mut libc::sockaddr_un = cast::transmute(&storage);
-        (*s).sun_family = libc::AF_UNIX as libc::sa_family_t;
-        let mut i = 0;
-        for c in addr.iter() {
-          (*s).sun_path[i] = c;
-          i += 1;
-        }
-
-        let len = mem::size_of::<libc::sa_family_t>() + i + 1; //count the null terminator
-        return Ok((storage, len));
-    }
-}
-
 fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
     unsafe {
         let fam = match addr.ip {
@@ -127,15 +101,6 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
     }
 }
 
-fn unix_socket(ty: libc::c_int) -> IoResult<sock_t> {
-    unsafe {
-        match libc::socket(libc::AF_UNIX, ty, 0) {
-            -1 => Err(super::last_error()),
-            fd => Ok(fd)
-        }
-    }
-}
-
 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
                  payload: T) -> IoResult<()> {
     unsafe {
@@ -228,24 +193,6 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
     }
 }
 
-fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
-                    len: uint) -> IoResult<CString> {
-    match storage.ss_family as libc::c_int {
-        libc::AF_UNIX => {
-            assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
-            let storage: &libc::sockaddr_un = unsafe {
-                cast::transmute(storage)
-            };
-            unsafe {
-                Ok(CString::new(storage.sun_path.to_owned().as_ptr(), false))
-            }
-        }
-        _ => {
-            Err(io::standard_error(io::OtherIoError))
-        }
-    }
-}
-
 #[cfg(unix)]
 pub fn init() {}
 
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs
index 1160bc196d8..a6d75d93d67 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libnative/io/pipe_unix.rs
@@ -8,46 +8,124 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use std::c_str::CString;
+use std::cast;
+use std::io;
+use std::libc;
+use std::mem;
+use std::rt::rtio;
+use std::sync::arc::UnsafeArc;
+use std::unstable::intrinsics;
+
+use super::{IoResult, retry};
+use super::file::{keep_going, fd_t};
+
+fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
+    match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
+        -1 => Err(super::last_error()),
+        fd => Ok(fd)
+    }
+}
+
+fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
+    // the sun_path length is limited to SUN_LEN (with null)
+    assert!(mem::size_of::<libc::sockaddr_storage>() >=
+            mem::size_of::<libc::sockaddr_un>());
+    let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
+    let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) };
+
+    let len = addr.len();
+    if len > s.sun_path.len() - 1 {
+        return Err(io::IoError {
+            kind: io::InvalidInput,
+            desc: "path must be smaller than SUN_LEN",
+            detail: None,
+        })
+    }
+    s.sun_family = libc::AF_UNIX as libc::sa_family_t;
+    for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
+        *slot = value;
+    }
+
+    // count the null terminator
+    let len = mem::size_of::<libc::sa_family_t>() + len + 1;
+    return Ok((storage, len));
+}
+
+fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
+                    len: uint) -> IoResult<CString> {
+    match storage.ss_family as libc::c_int {
+        libc::AF_UNIX => {
+            assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
+            let storage: &libc::sockaddr_un = unsafe {
+                cast::transmute(storage)
+            };
+            unsafe {
+                Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
+            }
+        }
+        _ => Err(io::standard_error(io::InvalidInput))
+    }
+}
+
+struct Inner {
+    fd: fd_t,
+}
+
+impl Drop for Inner {
+    fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
+}
+
+fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
+    let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
+    let inner = Inner { fd: if_ok!(unix_socket(ty)) };
+    let addrp = &addr as *libc::sockaddr_storage;
+    match retry(|| unsafe {
+        libc::connect(inner.fd, addrp as *libc::sockaddr,
+                      len as libc::socklen_t)
+    }) {
+        -1 => Err(super::last_error()),
+        _  => Ok(inner)
+    }
+}
+
+fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
+    let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
+    let inner = Inner { fd: if_ok!(unix_socket(ty)) };
+    let addrp = &addr as *libc::sockaddr_storage;
+    match unsafe {
+        libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t)
+    } {
+        -1 => Err(super::last_error()),
+        _  => Ok(inner)
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
-// Unix
+// Unix Streams
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct UnixStream {
-    priv fd: sock_t,
+    priv inner: UnsafeArc<Inner>,
 }
 
 impl UnixStream {
-    pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixStream> {
-        unix_socket(ty).and_then(|fd| {
-            match addr_to_sockaddr_un(addr) {
-                Err(e)          => return Err(e),
-                Ok((addr, len)) => {
-                    let ret = UnixStream{ fd: fd };
-                    let addrp = &addr as *libc::sockaddr_storage;
-                    match retry(|| {
-                      libc::connect(fd, addrp as *libc::sockaddr,
-                                     len as libc::socklen_t)
-                    }) {
-                        -1 => return Err(super::last_error()),
-                        _  => return Ok(ret)
-                    }
-                }
-            }
+    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
+        connect(addr, libc::SOCK_STREAM).map(|inner| {
+            UnixStream { inner: UnsafeArc::new(inner) }
         })
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
+    fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
 }
 
 impl rtio::RtioPipe for UnixStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        let ret = retry(|| {
-            unsafe {
-                libc::recv(self.fd,
-                           buf.as_ptr() as *mut libc::c_void,
-                           buf.len() as wrlen,
-                           0) as libc::c_int
-            }
+        let ret = retry(|| unsafe {
+            libc::recv(self.fd(),
+                       buf.as_ptr() as *mut libc::c_void,
+                       buf.len() as libc::size_t,
+                       0) as libc::c_int
         });
         if ret == 0 {
             Err(io::standard_error(io::EndOfFile))
@@ -57,14 +135,13 @@ impl rtio::RtioPipe for UnixStream {
             Ok(ret as uint)
         }
     }
+
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        let ret = keep_going(buf, |buf, len| {
-            unsafe {
-                libc::send(self.fd,
-                           buf as *mut libc::c_void,
-                           len as wrlen,
-                           0) as i64
-            }
+        let ret = keep_going(buf, |buf, len| unsafe {
+            libc::send(self.fd(),
+                       buf as *mut libc::c_void,
+                       len as libc::size_t,
+                       0) as i64
         });
         if ret < 0 {
             Err(super::last_error())
@@ -72,10 +149,10 @@ impl rtio::RtioPipe for UnixStream {
             Ok(())
         }
     }
-}
 
-impl Drop for UnixStream {
-    fn drop(&mut self) { unsafe { close(self.fd); } }
+    fn clone(&self) -> ~rtio::RtioPipe {
+        ~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -83,176 +160,89 @@ impl Drop for UnixStream {
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct UnixDatagram {
-    priv fd: sock_t,
+    priv inner: UnsafeArc<Inner>,
 }
 
 impl UnixDatagram {
-    pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixDatagram> {
-        unsafe {
-            unix_socket(ty).and_then(|fd| {
-                match addr_to_sockaddr_un(addr) {
-                    Err(e)          => return Err(e),
-                    Ok((addr, len)) => {
-                        let ret = UnixDatagram{ fd: fd };
-                        let addrp = &addr as *libc::sockaddr_storage;
-                        match retry(|| {
-                          libc::connect(fd, addrp as *libc::sockaddr,
-                                         len as libc::socklen_t)
-                        }) {
-                            -1 => return Err(super::last_error()),
-                            _  => return Ok(ret)
-                        }
-                    }
-                }
-            })
-        }
+    pub fn connect(addr: &CString) -> IoResult<UnixDatagram> {
+        connect(addr, libc::SOCK_DGRAM).map(|inner| {
+            UnixDatagram { inner: UnsafeArc::new(inner) }
+        })
     }
 
     pub fn bind(addr: &CString) -> IoResult<UnixDatagram> {
-        unsafe {
-            unix_socket(libc::SOCK_DGRAM).and_then(|fd| {
-                match addr_to_sockaddr_un(addr) {
-                    Err(e)          => return Err(e),
-                    Ok((addr, len)) => {
-                        let ret = UnixDatagram{ fd: fd };
-                        let addrp = &addr as *libc::sockaddr_storage;
-                        match libc::bind(fd, addrp as *libc::sockaddr,
-                                         len as libc::socklen_t) {
-                            -1 => return Err(super::last_error()),
-                            _  => return Ok(ret)
-                        }
-                    }
-                }
-            })
-        }
+        bind(addr, libc::SOCK_DGRAM).map(|inner| {
+            UnixDatagram { inner: UnsafeArc::new(inner) }
+        })
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
-}
+    fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
 
-impl rtio::RtioPipe for UnixDatagram {
-    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
-        let ret = retry(|| {
-            unsafe {
-                libc::recv(self.fd,
+    pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
+        let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
+        let storagep = &mut storage as *mut libc::sockaddr_storage;
+        let mut addrlen: libc::socklen_t =
+                mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+        let ret = retry(|| unsafe {
+            libc::recvfrom(self.fd(),
                            buf.as_ptr() as *mut libc::c_void,
-                           buf.len() as wrlen,
-                           0) as libc::c_int
-            }
-        });
-        if ret == 0 {
-            Err(io::standard_error(io::EndOfFile))
-        } else if ret < 0 {
-            Err(super::last_error())
-        } else {
-            Ok(ret as uint)
-        }
-    }
-    fn write(&mut self, buf: &[u8]) -> IoResult<()> {
-        let ret = keep_going(buf, |buf, len| {
-            unsafe {
-                libc::send(self.fd,
-                           buf as *mut libc::c_void,
-                           len as wrlen,
-                           0) as i64
-            }
+                           buf.len() as libc::size_t,
+                           0,
+                           storagep as *mut libc::sockaddr,
+                           &mut addrlen) as libc::c_int
         });
-        if ret < 0 {
-            Err(super::last_error())
-        } else {
-            Ok(())
-        }
+        if ret < 0 { return Err(super::last_error()) }
+        sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
+            Ok((ret as uint, addr))
+        })
     }
-}
 
-impl rtio::RtioDatagramPipe for UnixDatagram {
-    fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
-        unsafe {
-            let mut storage: libc::sockaddr_storage = intrinsics::init();
-            let storagep = &mut storage as *mut libc::sockaddr_storage;
-            let mut addrlen: libc::socklen_t =
-                    mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
-            let ret = retry(|| {
-                libc::recvfrom(self.fd,
-                               buf.as_ptr() as *mut libc::c_void,
-                               buf.len() as msglen_t,
-                               0,
-                               storagep as *mut libc::sockaddr,
-                               &mut addrlen) as libc::c_int
-            });
-            if ret < 0 { return Err(super::last_error()) }
-            sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
-                Ok((ret as uint, addr))
-            })
+    pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
+        let (dst, len) = if_ok!(addr_to_sockaddr_un(dst));
+        let dstp = &dst as *libc::sockaddr_storage;
+        let ret = retry(|| unsafe {
+            libc::sendto(self.fd(),
+                         buf.as_ptr() as *libc::c_void,
+                         buf.len() as libc::size_t,
+                         0,
+                         dstp as *libc::sockaddr,
+                         len as libc::socklen_t) as libc::c_int
+        });
+        match ret {
+            -1 => Err(super::last_error()),
+            n if n as uint != buf.len() => {
+                Err(io::IoError {
+                    kind: io::OtherIoError,
+                    desc: "couldn't send entire packet at once",
+                    detail: None,
+                })
+            }
+            _ => Ok(())
         }
     }
 
-    fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
-        match addr_to_sockaddr_un(dst) {
-            Err(e)         => Err(e),
-            Ok((dst, len)) => {
-                let dstp = &dst as *libc::sockaddr_storage;
-                unsafe {
-                    let ret = retry(|| {
-                        libc::sendto(self.fd,
-                                     buf.as_ptr() as *libc::c_void,
-                                     buf.len() as msglen_t,
-                                     0,
-                                     dstp as *libc::sockaddr,
-                                     len as libc::socklen_t) as libc::c_int
-                    });
-                    match ret {
-                        -1 => Err(super::last_error()),
-                        n if n as uint != buf.len() => {
-                            Err(io::IoError {
-                                kind: io::OtherIoError,
-                                desc: "couldn't send entire packet at once",
-                                detail: None,
-                            })
-                        }
-                        _ => Ok(())
-                    }
-                }
-            }
-        }
+    pub fn clone(&mut self) -> UnixDatagram {
+        UnixDatagram { inner: self.inner.clone() }
     }
 }
 
-impl Drop for UnixDatagram {
-    fn drop(&mut self) { unsafe { close(self.fd); } }
-}
 ////////////////////////////////////////////////////////////////////////////////
 // Unix Listener
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct UnixListener {
-    priv fd: sock_t,
+    priv inner: Inner,
 }
 
 impl UnixListener {
     pub fn bind(addr: &CString) -> IoResult<UnixListener> {
-        unsafe {
-            unix_socket(libc::SOCK_STREAM).and_then(|fd| {
-                match addr_to_sockaddr_un(addr) {
-                    Err(e)          => return Err(e),
-                    Ok((addr, len)) => {
-                        let ret = UnixListener{ fd: fd };
-                        let addrp = &addr as *libc::sockaddr_storage;
-                        match libc::bind(fd, addrp as *libc::sockaddr,
-                                         len as libc::socklen_t) {
-                            -1 => return Err(super::last_error()),
-                            _  => return Ok(ret)
-                        }
-                    }
-                }
-            })
-        }
+        bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd })
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
+    fn fd(&self) -> fd_t { self.inner.fd }
 
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
-        match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
+        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
             _ => Ok(UnixAcceptor { listener: self })
         }
@@ -265,16 +255,12 @@ impl rtio::RtioUnixListener for UnixListener {
     }
 }
 
-impl Drop for UnixListener {
-    fn drop(&mut self) { unsafe { close(self.fd); } }
-}
-
 pub struct UnixAcceptor {
     priv listener: UnixListener,
 }
 
 impl UnixAcceptor {
-    pub fn fd(&self) -> sock_t { self.listener.fd }
+    fn fd(&self) -> fd_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
         let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
@@ -285,9 +271,9 @@ impl UnixAcceptor {
             libc::accept(self.fd(),
                          storagep as *mut libc::sockaddr,
                          &mut size as *mut libc::socklen_t) as libc::c_int
-        }) as sock_t {
+        }) {
             -1 => Err(super::last_error()),
-            fd => Ok(UnixStream { fd: fd })
+            fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) })
         }
     }
 }
@@ -297,4 +283,3 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
         self.native_accept().map(|s| ~s as ~rtio::RtioPipe)
     }
 }
-
diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs
new file mode 100644
index 00000000000..83731cc02a6
--- /dev/null
+++ b/src/libnative/io/pipe_win32.rs
@@ -0,0 +1,492 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Named pipes implementation for windows
+//!
+//! If are unfortunate enough to be reading this code, I would like to first
+//! apologize. This was my first encounter with windows named pipes, and it
+//! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
+//! read on as I'll try to explain some fun things that I ran into.
+//!
+//! # Unix pipes vs Named pipes
+//!
+//! As with everything else, named pipes on windows are pretty different from
+//! unix pipes on unix. On unix, you use one "server pipe" to accept new client
+//! pipes. So long as this server pipe is active, new children pipes can
+//! connect. On windows, you instead have a number of "server pipes", and each
+//! of these server pipes can throughout their lifetime be attached to a client
+//! or not. Once attached to a client, a server pipe may then disconnect at a
+//! later date.
+//!
+//! # Accepting clients
+//!
+//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
+//! are built around the unix flavors. This means that we have one "server
+//! pipe" to which many clients can connect. In order to make this compatible
+//! with the windows model, each connected client consumes ownership of a server
+//! pipe, and then a new server pipe is created for the next client.
+//!
+//! Note that the server pipes attached to clients are never given back to the
+//! listener for recycling. This could possibly be implemented with a channel so
+//! the listener half can re-use server pipes, but for now I err'd on the simple
+//! side of things. Each stream accepted by a listener will destroy the server
+//! pipe after the stream is dropped.
+//!
+//! This model ends up having a small race or two, and you can find more details
+//! on the `native_accept` method.
+//!
+//! # Simultaneous reads and writes
+//!
+//! In testing, I found that two simultaneous writes and two simultaneous reads
+//! on a pipe ended up working out just fine, but problems were encountered when
+//! a read was executed simultaneously with a write. After some googling around,
+//! it sounded like named pipes just weren't built for this kind of interaction,
+//! and the suggested solution was to use overlapped I/O.
+//!
+//! I don't realy know what overlapped I/O is, but my basic understanding after
+//! reading about it is that you have an external Event which is used to signal
+//! I/O completion, passed around in some OVERLAPPED structures. As to what this
+//! is, I'm not exactly sure.
+//!
+//! This problem implies that all named pipes are created with the
+//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
+//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
+//! inside of this structure is a HANDLE from CreateEvent. After the I/O is
+//! determined to be pending (may complete in the future), the
+//! GetOverlappedResult function is used to block on the event, waiting for the
+//! I/O to finish.
+//!
+//! This scheme ended up working well enough. There were two snags that I ran
+//! into, however:
+//!
+//! * Each UnixStream instance needs its own read/write events to wait on. These
+//!   can't be shared among clones of the same stream because the documentation
+//!   states that it unsets the event when the I/O is started (would possibly
+//!   corrupt other events simultaneously waiting). For convenience's sake,
+//!   these events are lazily initialized.
+//!
+//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
+//!   to all pipes created through `connect`. Notably this means that the
+//!   ConnectNamedPipe function is nonblocking, implying that the Listener needs
+//!   to have yet another event to do the actual blocking.
+//!
+//! # Conclusion
+//!
+//! The conclusion here is that I probably don't know the best way to work with
+//! windows named pipes, but the solution here seems to work well enough to get
+//! the test suite passing (the suite is in libstd), and that's good enough for
+//! me!
+
+use std::c_str::CString;
+use std::libc;
+use std::os::win32::as_utf16_p;
+use std::ptr;
+use std::rt::rtio;
+use std::sync::arc::UnsafeArc;
+use std::unstable::intrinsics;
+
+use super::IoResult;
+
+struct Event(libc::HANDLE);
+
+impl Event {
+    fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
+        let event = unsafe {
+            libc::CreateEventW(ptr::mut_null(),
+                               manual_reset as libc::BOOL,
+                               initial_state as libc::BOOL,
+                               ptr::null())
+        };
+        if event as uint == 0 {
+            Err(super::last_error())
+        } else {
+            Ok(Event(event))
+        }
+    }
+
+    fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
+}
+
+impl Drop for Event {
+    fn drop(&mut self) {
+        unsafe { let _ = libc::CloseHandle(self.handle()); }
+    }
+}
+
+struct Inner {
+    handle: libc::HANDLE,
+}
+
+impl Drop for Inner {
+    fn drop(&mut self) {
+        unsafe {
+            let _ = libc::FlushFileBuffers(self.handle);
+            let _ = libc::CloseHandle(self.handle);
+        }
+    }
+}
+
+unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE {
+    libc::CreateNamedPipeW(
+        name,
+        libc::PIPE_ACCESS_DUPLEX |
+            if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
+            libc::FILE_FLAG_OVERLAPPED,
+        libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
+            libc::PIPE_WAIT,
+        libc::PIPE_UNLIMITED_INSTANCES,
+        65536,
+        65536,
+        0,
+        ptr::mut_null()
+    )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Unix Streams
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UnixStream {
+    priv inner: UnsafeArc<Inner>,
+    priv write: Option<Event>,
+    priv read: Option<Event>,
+}
+
+impl UnixStream {
+    fn try_connect(p: *u16) -> Option<libc::HANDLE> {
+        // Note that most of this is lifted from the libuv implementation.
+        // The idea is that if we fail to open a pipe in read/write mode
+        // that we try afterwards in just read or just write
+        let mut result = unsafe {
+            libc::CreateFileW(p,
+                libc::GENERIC_READ | libc::GENERIC_WRITE,
+                0,
+                ptr::mut_null(),
+                libc::OPEN_EXISTING,
+                libc::FILE_FLAG_OVERLAPPED,
+                ptr::mut_null())
+        };
+        if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
+            return Some(result)
+        }
+
+        let err = unsafe { libc::GetLastError() };
+        if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
+            result = unsafe {
+                libc::CreateFileW(p,
+                    libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
+                    0,
+                    ptr::mut_null(),
+                    libc::OPEN_EXISTING,
+                    libc::FILE_FLAG_OVERLAPPED,
+                    ptr::mut_null())
+            };
+            if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
+                return Some(result)
+            }
+        }
+        let err = unsafe { libc::GetLastError() };
+        if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
+            result = unsafe {
+                libc::CreateFileW(p,
+                    libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
+                    0,
+                    ptr::mut_null(),
+                    libc::OPEN_EXISTING,
+                    libc::FILE_FLAG_OVERLAPPED,
+                    ptr::mut_null())
+            };
+            if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
+                return Some(result)
+            }
+        }
+        None
+    }
+
+    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
+        as_utf16_p(addr.as_str().unwrap(), |p| {
+            loop {
+                match UnixStream::try_connect(p) {
+                    Some(handle) => {
+                        let inner = Inner { handle: handle };
+                        let mut mode = libc::PIPE_TYPE_BYTE |
+                                       libc::PIPE_READMODE_BYTE |
+                                       libc::PIPE_WAIT;
+                        let ret = unsafe {
+                            libc::SetNamedPipeHandleState(inner.handle,
+                                                          &mut mode,
+                                                          ptr::mut_null(),
+                                                          ptr::mut_null())
+                        };
+                        return if ret == 0 {
+                            Err(super::last_error())
+                        } else {
+                            Ok(UnixStream {
+                                inner: UnsafeArc::new(inner),
+                                read: None,
+                                write: None,
+                            })
+                        }
+                    }
+                    None => {}
+                }
+
+                // On windows, if you fail to connect, you may need to call the
+                // `WaitNamedPipe` function, and this is indicated with an error
+                // code of ERROR_PIPE_BUSY.
+                let code = unsafe { libc::GetLastError() };
+                if code as int != libc::ERROR_PIPE_BUSY as int {
+                    return Err(super::last_error())
+                }
+
+                // An example I found on microsoft's website used 20 seconds,
+                // libuv uses 30 seconds, hence we make the obvious choice of
+                // waiting for 25 seconds.
+                if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
+                    return Err(super::last_error())
+                }
+            }
+        })
+    }
+
+    fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
+}
+
+impl rtio::RtioPipe for UnixStream {
+    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
+        if self.read.is_none() {
+            self.read = Some(if_ok!(Event::new(true, false)));
+        }
+
+        let mut bytes_read = 0;
+        let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
+        overlapped.hEvent = self.read.get_ref().handle();
+
+        let ret = unsafe {
+            libc::ReadFile(self.handle(),
+                           buf.as_ptr() as libc::LPVOID,
+                           buf.len() as libc::DWORD,
+                           &mut bytes_read,
+                           &mut overlapped)
+        };
+        if ret == 0 {
+            let err = unsafe { libc::GetLastError() };
+            if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                let ret = unsafe {
+                    libc::GetOverlappedResult(self.handle(),
+                                              &mut overlapped,
+                                              &mut bytes_read,
+                                              libc::TRUE)
+                };
+                if ret == 0 {
+                    return Err(super::last_error())
+                }
+            } else {
+                return Err(super::last_error())
+            }
+        }
+
+        Ok(bytes_read as uint)
+    }
+
+    fn write(&mut self, buf: &[u8]) -> IoResult<()> {
+        if self.write.is_none() {
+            self.write = Some(if_ok!(Event::new(true, false)));
+        }
+
+        let mut offset = 0;
+        let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
+        overlapped.hEvent = self.write.get_ref().handle();
+
+        while offset < buf.len() {
+            let mut bytes_written = 0;
+            let ret = unsafe {
+                libc::WriteFile(self.handle(),
+                                buf.slice_from(offset).as_ptr() as libc::LPVOID,
+                                (buf.len() - offset) as libc::DWORD,
+                                &mut bytes_written,
+                                &mut overlapped)
+            };
+            if ret == 0 {
+                let err = unsafe { libc::GetLastError() };
+                if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                    let ret = unsafe {
+                        libc::GetOverlappedResult(self.handle(),
+                                                  &mut overlapped,
+                                                  &mut bytes_written,
+                                                  libc::TRUE)
+                    };
+                    if ret == 0 {
+                        return Err(super::last_error())
+                    }
+                } else {
+                    return Err(super::last_error())
+                }
+            }
+            offset += bytes_written as uint;
+        }
+        Ok(())
+    }
+
+    fn clone(&self) -> ~rtio::RtioPipe {
+        ~UnixStream {
+            inner: self.inner.clone(),
+            read: None,
+            write: None,
+        } as ~rtio::RtioPipe
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Unix Listener
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UnixListener {
+    priv handle: libc::HANDLE,
+    priv name: CString,
+}
+
+impl UnixListener {
+    pub fn bind(addr: &CString) -> IoResult<UnixListener> {
+        // Although we technically don't need the pipe until much later, we
+        // create the initial handle up front to test the validity of the name
+        // and such.
+        as_utf16_p(addr.as_str().unwrap(), |p| {
+            let ret = unsafe { pipe(p, true) };
+            if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
+                Err(super::last_error())
+            } else {
+                Ok(UnixListener { handle: ret, name: addr.clone() })
+            }
+        })
+    }
+
+    pub fn native_listen(self) -> IoResult<UnixAcceptor> {
+        Ok(UnixAcceptor {
+            listener: self,
+            event: if_ok!(Event::new(true, false)),
+        })
+    }
+}
+
+impl Drop for UnixListener {
+    fn drop(&mut self) {
+        unsafe { let _ = libc::CloseHandle(self.handle); }
+    }
+}
+
+impl rtio::RtioUnixListener for UnixListener {
+    fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> {
+        self.native_listen().map(|a| ~a as ~rtio::RtioUnixAcceptor)
+    }
+}
+
+pub struct UnixAcceptor {
+    priv listener: UnixListener,
+    priv event: Event,
+}
+
+impl UnixAcceptor {
+    pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+        // This function has some funky implementation details when working with
+        // unix pipes. On windows, each server named pipe handle can be
+        // connected to a one or zero clients. To the best of my knowledge, a
+        // named server is considered active and present if there exists at
+        // least one server named pipe for it.
+        //
+        // The model of this function is to take the current known server
+        // handle, connect a client to it, and then transfer ownership to the
+        // UnixStream instance. The next time accept() is invoked, it'll need a
+        // different server handle to connect a client to.
+        //
+        // Note that there is a possible race here. Once our server pipe is
+        // handed off to a `UnixStream` object, the stream could be closed,
+        // meaning that there would be no active server pipes, hence even though
+        // we have a valid `UnixAcceptor`, no one can connect to it. For this
+        // reason, we generate the next accept call's server pipe at the end of
+        // this function call.
+        //
+        // This provides us an invariant that we always have at least one server
+        // connection open at a time, meaning that all connects to this acceptor
+        // should succeed while this is active.
+        //
+        // The actual implementation of doing this is a little tricky. Once a
+        // server pipe is created, a client can connect to it at any time. I
+        // assume that which server a client connects to is nondeterministic, so
+        // we also need to guarantee that the only server able to be connected
+        // to is the one that we're calling ConnectNamedPipe on. This means that
+        // we have to create the second server pipe *after* we've already
+        // accepted a connection. In order to at least somewhat gracefully
+        // handle errors, this means that if the second server pipe creation
+        // fails that we disconnect the connected client and then just keep
+        // using the original server pipe.
+        let handle = self.listener.handle;
+
+        // Once we've got a "server handle", we need to wait for a client to
+        // connect. The ConnectNamedPipe function will block this thread until
+        // someone on the other end connects. This function can "fail" if a
+        // client connects after we created the pipe but before we got down
+        // here. Thanks windows.
+        let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
+        overlapped.hEvent = self.event.handle();
+        if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
+            let mut err = unsafe { libc::GetLastError() };
+            if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                let ret = unsafe {
+                    let mut transfer = 0;
+                    libc::GetOverlappedResult(handle,
+                                              &mut overlapped,
+                                              &mut transfer,
+                                              libc::TRUE)
+                };
+                if ret == 0 {
+                    err = unsafe { libc::GetLastError() };
+                } else {
+                    // we succeeded, bypass the check below
+                    err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
+                }
+            }
+            if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
+                return Err(super::last_error())
+            }
+        }
+
+        // Now that we've got a connected client to our handle, we need to
+        // create a second server pipe. If this fails, we disconnect the
+        // connected client and return an error (see comments above).
+        let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| {
+            unsafe { pipe(p, false) }
+        });
+        if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
+            let ret = Err(super::last_error());
+            // If our disconnection fails, then there's not really a whole lot
+            // that we can do, so fail the task.
+            let err = unsafe { libc::DisconnectNamedPipe(handle) };
+            assert!(err != 0);
+            return ret;
+        } else {
+            self.listener.handle = new_handle;
+        }
+
+        // Transfer ownership of our handle into this stream
+        Ok(UnixStream {
+            inner: UnsafeArc::new(Inner { handle: handle }),
+            read: None,
+            write: None,
+        })
+    }
+}
+
+impl rtio::RtioUnixAcceptor for UnixAcceptor {
+    fn accept(&mut self) -> IoResult<~rtio::RtioPipe> {
+        self.native_accept().map(|s| ~s as ~rtio::RtioPipe)
+    }
+}
+
diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs
index cf109167089..436156a1219 100644
--- a/src/libstd/io/net/mod.rs
+++ b/src/libstd/io/net/mod.rs
@@ -14,5 +14,5 @@ pub mod addrinfo;
 pub mod tcp;
 pub mod udp;
 pub mod ip;
-#[cfg(unix)]
+// FIXME(#12093) - this should not be called unix
 pub mod unix;
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index 23c01aa6354..a1f3cbbe326 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -134,7 +134,7 @@ mod tests {
     use io::*;
     use io::test::*;
 
-    fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
+    pub fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
         let path1 = next_test_unix();
         let path2 = path1.clone();
         let (port, chan) = Chan::new();
@@ -149,25 +149,32 @@ mod tests {
         server(acceptor.accept().unwrap());
     }
 
-    #[test]
-    fn bind_error() {
-        match UnixListener::bind(&("path/to/nowhere")) {
+    iotest!(fn bind_error() {
+        let path = "path/to/nowhere";
+        match UnixListener::bind(&path) {
             Ok(..) => fail!(),
-            Err(e) => assert_eq!(e.kind, PermissionDenied),
+            Err(e) => {
+                assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
+                        e.kind == InvalidInput);
+            }
         }
-    }
-
-    #[test]
-    fn connect_error() {
-        match UnixStream::connect(&("path/to/nowhere")) {
+    })
+
+    iotest!(fn connect_error() {
+        let path = if cfg!(windows) {
+            r"\\.\pipe\this_should_not_exist_ever"
+        } else {
+            "path/to/nowhere"
+        };
+        match UnixStream::connect(&path) {
             Ok(..) => fail!(),
-            Err(e) => assert_eq!(e.kind,
-                        if cfg!(windows) {OtherIoError} else {FileNotFound})
+            Err(e) => {
+                assert!(e.kind == FileNotFound || e.kind == OtherIoError);
+            }
         }
-    }
+    })
 
-    #[test]
-    fn smoke() {
+    iotest!(fn smoke() {
         smalltest(proc(mut server) {
             let mut buf = [0];
             server.read(buf).unwrap();
@@ -175,10 +182,9 @@ mod tests {
         }, proc(mut client) {
             client.write([99]).unwrap();
         })
-    }
+    })
 
-    #[test]
-    fn read_eof() {
+    iotest!(fn read_eof() {
         smalltest(proc(mut server) {
             let mut buf = [0];
             assert!(server.read(buf).is_err());
@@ -186,17 +192,18 @@ mod tests {
         }, proc(_client) {
             // drop the client
         })
-    }
+    })
 
-    #[test]
-    fn write_begone() {
+    iotest!(fn write_begone() {
         smalltest(proc(mut server) {
             let buf = [0];
             loop {
                 match server.write(buf) {
                     Ok(..) => {}
                     Err(e) => {
-                        assert!(e.kind == BrokenPipe || e.kind == NotConnected,
+                        assert!(e.kind == BrokenPipe ||
+                                e.kind == NotConnected ||
+                                e.kind == ConnectionReset,
                                 "unknown error {:?}", e);
                         break;
                     }
@@ -205,10 +212,9 @@ mod tests {
         }, proc(_client) {
             // drop the client
         })
-    }
+    })
 
-    #[test]
-    fn accept_lots() {
+    iotest!(fn accept_lots() {
         let times = 10;
         let path1 = next_test_unix();
         let path2 = path1.clone();
@@ -218,38 +224,49 @@ mod tests {
             port.recv();
             for _ in range(0, times) {
                 let mut stream = UnixStream::connect(&path2);
-                stream.write([100]).unwrap();
+                match stream.write([100]) {
+                    Ok(..) => {}
+                    Err(e) => fail!("failed write: {}", e)
+                }
             }
         });
 
-        let mut acceptor = UnixListener::bind(&path1).listen();
+        let mut acceptor = match UnixListener::bind(&path1).listen() {
+            Ok(a) => a,
+            Err(e) => fail!("failed listen: {}", e),
+        };
         chan.send(());
         for _ in range(0, times) {
             let mut client = acceptor.accept();
             let mut buf = [0];
-            client.read(buf).unwrap();
+            match client.read(buf) {
+                Ok(..) => {}
+                Err(e) => fail!("failed read/accept: {}", e),
+            }
             assert_eq!(buf[0], 100);
         }
-    }
+    })
 
-    #[test]
-    fn path_exists() {
+    #[cfg(unix)]
+    iotest!(fn path_exists() {
         let path = next_test_unix();
         let _acceptor = UnixListener::bind(&path).listen();
         assert!(path.exists());
-    }
+    })
 
-    #[test]
-    fn unix_clone_smoke() {
+    iotest!(fn unix_clone_smoke() {
         let addr = next_test_unix();
         let mut acceptor = UnixListener::bind(&addr).listen();
 
         spawn(proc() {
             let mut s = UnixStream::connect(&addr);
             let mut buf = [0, 0];
+            debug!("client reading");
             assert_eq!(s.read(buf), Ok(1));
             assert_eq!(buf[0], 1);
+            debug!("client writing");
             s.write([2]).unwrap();
+            debug!("client dropping");
         });
 
         let mut s1 = acceptor.accept().unwrap();
@@ -260,17 +277,20 @@ mod tests {
         spawn(proc() {
             let mut s2 = s2;
             p1.recv();
+            debug!("writer writing");
             s2.write([1]).unwrap();
+            debug!("writer done");
             c2.send(());
         });
         c1.send(());
         let mut buf = [0, 0];
+        debug!("reader reading");
         assert_eq!(s1.read(buf), Ok(1));
+        debug!("reader done");
         p2.recv();
-    }
+    })
 
-    #[test]
-    fn unix_clone_two_read() {
+    iotest!(fn unix_clone_two_read() {
         let addr = next_test_unix();
         let mut acceptor = UnixListener::bind(&addr).listen();
         let (p, c) = Chan::new();
@@ -300,10 +320,9 @@ mod tests {
         c.send(());
 
         p.recv();
-    }
+    })
 
-    #[test]
-    fn unix_clone_two_write() {
+    iotest!(fn unix_clone_two_write() {
         let addr = next_test_unix();
         let mut acceptor = UnixListener::bind(&addr).listen();
 
@@ -326,5 +345,5 @@ mod tests {
         s1.write([2]).unwrap();
 
         p.recv();
-    }
+    })
 }
diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs
index c42a8896053..73bf4a1e69a 100644
--- a/src/libstd/libc.rs
+++ b/src/libstd/libc.rs
@@ -269,7 +269,6 @@ pub mod types {
             pub mod bsd44 {
                 use libc::types::os::arch::c95::{c_char, c_int, c_uint};
 
-                pub static sun_len:uint = 108;
                 pub type socklen_t = u32;
                 pub type sa_family_t = u16;
                 pub type in_port_t = u16;
@@ -641,7 +640,6 @@ pub mod types {
             pub mod bsd44 {
                 use libc::types::os::arch::c95::{c_char, c_int, c_uint};
 
-                pub static sun_len:uint = 104;
                 pub type socklen_t = u32;
                 pub type sa_family_t = u8;
                 pub type in_port_t = u16;
@@ -844,7 +842,6 @@ pub mod types {
             pub mod bsd44 {
                 use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t};
 
-                pub static sun_len:uint = 108;
                 pub type SOCKET = c_uint;
                 pub type socklen_t = c_int;
                 pub type sa_family_t = u16;
@@ -1213,7 +1210,6 @@ pub mod types {
             pub mod bsd44 {
                 use libc::types::os::arch::c95::{c_char, c_int, c_uint};
 
-                pub static sun_len:uint = 104;
                 pub type socklen_t = c_int;
                 pub type sa_family_t = u8;
                 pub type in_port_t = u16;
@@ -1627,11 +1623,19 @@ pub mod consts {
             pub static O_NOINHERIT: c_int = 128;
 
             pub static ERROR_SUCCESS : c_int = 0;
+            pub static ERROR_FILE_NOT_FOUND: c_int = 2;
+            pub static ERROR_ACCESS_DENIED: c_int = 5;
             pub static ERROR_INVALID_HANDLE : c_int = 6;
+            pub static ERROR_BROKEN_PIPE: c_int = 109;
             pub static ERROR_DISK_FULL : c_int = 112;
             pub static ERROR_INSUFFICIENT_BUFFER : c_int = 122;
+            pub static ERROR_INVALID_NAME : c_int = 123;
             pub static ERROR_ALREADY_EXISTS : c_int = 183;
+            pub static ERROR_PIPE_BUSY: c_int = 231;
+            pub static ERROR_NO_DATA: c_int = 232;
             pub static ERROR_INVALID_ADDRESS : c_int = 487;
+            pub static ERROR_PIPE_CONNECTED: c_int = 535;
+            pub static ERROR_IO_PENDING: c_int = 997;
             pub static ERROR_FILE_INVALID : c_int = 1006;
             pub static INVALID_HANDLE_VALUE : c_int = -1;
 
@@ -1770,6 +1774,7 @@ pub mod consts {
             pub static FILE_FLAG_SESSION_AWARE: DWORD = 0x00800000;
             pub static FILE_FLAG_SEQUENTIAL_SCAN: DWORD = 0x08000000;
             pub static FILE_FLAG_WRITE_THROUGH: DWORD = 0x80000000;
+            pub static FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000;
 
             pub static FILE_NAME_NORMALIZED: DWORD = 0x0;
             pub static FILE_NAME_OPENED: DWORD = 0x8;
@@ -1783,6 +1788,8 @@ pub mod consts {
             pub static GENERIC_WRITE: DWORD = 0x40000000;
             pub static GENERIC_EXECUTE: DWORD = 0x20000000;
             pub static GENERIC_ALL: DWORD = 0x10000000;
+            pub static FILE_WRITE_ATTRIBUTES: DWORD = 0x00000100;
+            pub static FILE_READ_ATTRIBUTES: DWORD = 0x00000080;
 
             pub static FILE_BEGIN: DWORD = 0;
             pub static FILE_CURRENT: DWORD = 1;
@@ -1794,6 +1801,19 @@ pub mod consts {
 
             pub static DETACHED_PROCESS: DWORD = 0x00000008;
             pub static CREATE_NEW_PROCESS_GROUP: DWORD = 0x00000200;
+
+            pub static PIPE_ACCESS_DUPLEX: DWORD = 0x00000003;
+            pub static PIPE_ACCESS_INBOUND: DWORD = 0x00000001;
+            pub static PIPE_ACCESS_OUTBOUND: DWORD = 0x00000002;
+            pub static PIPE_TYPE_BYTE: DWORD = 0x00000000;
+            pub static PIPE_TYPE_MESSAGE: DWORD = 0x00000004;
+            pub static PIPE_READMODE_BYTE: DWORD = 0x00000000;
+            pub static PIPE_READMODE_MESSAGE: DWORD = 0x00000002;
+            pub static PIPE_WAIT: DWORD = 0x00000000;
+            pub static PIPE_NOWAIT: DWORD = 0x00000001;
+            pub static PIPE_ACCEPT_REMOTE_CLIENTS: DWORD = 0x00000000;
+            pub static PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008;
+            pub static PIPE_UNLIMITED_INSTANCES: DWORD = 255;
         }
         pub mod sysconf {
         }
@@ -2784,6 +2804,7 @@ pub mod consts {
 
             pub static AF_INET: c_int = 2;
             pub static AF_INET6: c_int = 28;
+            pub static AF_UNIX: c_int = 1;
             pub static SOCK_STREAM: c_int = 1;
             pub static SOCK_DGRAM: c_int = 2;
             pub static IPPROTO_TCP: c_int = 6;
@@ -4177,6 +4198,34 @@ pub mod funcs {
                             lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL;
 
                 pub fn GetCurrentProcessId() -> DWORD;
+                pub fn CreateNamedPipeW(
+                            lpName: LPCWSTR,
+                            dwOpenMode: DWORD,
+                            dwPipeMode: DWORD,
+                            nMaxInstances: DWORD,
+                            nOutBufferSize: DWORD,
+                            nInBufferSize: DWORD,
+                            nDefaultTimeOut: DWORD,
+                            lpSecurityAttributes: LPSECURITY_ATTRIBUTES
+                            ) -> HANDLE;
+                pub fn ConnectNamedPipe(hNamedPipe: HANDLE,
+                                        lpOverlapped: LPOVERLAPPED) -> BOOL;
+                pub fn WaitNamedPipeW(lpNamedPipeName: LPCWSTR,
+                                      nTimeOut: DWORD) -> BOOL;
+                pub fn SetNamedPipeHandleState(hNamedPipe: HANDLE,
+                                               lpMode: LPDWORD,
+                                               lpMaxCollectionCount: LPDWORD,
+                                               lpCollectDataTimeout: LPDWORD)
+                                                            -> BOOL;
+                pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES,
+                                    bManualReset: BOOL,
+                                    bInitialState: BOOL,
+                                    lpName: LPCWSTR) -> HANDLE;
+                pub fn GetOverlappedResult(hFile: HANDLE,
+                                           lpOverlapped: LPOVERLAPPED,
+                                           lpNumberOfBytesTransferred: LPDWORD,
+                                           bWait: BOOL) -> BOOL;
+                pub fn DisconnectNamedPipe(hNamedPipe: HANDLE) -> BOOL;
             }
         }
 
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 578ace2ba86..5573f8ec02e 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -260,11 +260,6 @@ pub trait RtioPipe {
     fn clone(&self) -> ~RtioPipe;
 }
 
-pub trait RtioDatagramPipe : RtioPipe {
-    fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>;
-    fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>;
-}
-
 pub trait RtioUnixListener {
     fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>;
 }