about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libnative/io/file.rs47
-rw-r--r--src/libnative/io/net.rs130
-rw-r--r--src/librustuv/access.rs109
-rw-r--r--src/librustuv/homing.rs4
-rw-r--r--src/librustuv/lib.rs4
-rw-r--r--src/librustuv/net.rs62
-rw-r--r--src/librustuv/pipe.rs31
-rw-r--r--src/librustuv/rc.rs49
-rw-r--r--src/libstd/io/net/tcp.rs181
-rw-r--r--src/libstd/io/net/udp.rs117
-rw-r--r--src/libstd/io/net/unix.rs96
-rw-r--r--src/libstd/io/pipe.rs6
-rw-r--r--src/libstd/libc.rs49
-rw-r--r--src/libstd/option.rs1
-rw-r--r--src/libstd/rt/rtio.rs4
-rw-r--r--src/libstd/unstable/mutex.rs1
-rw-r--r--src/libstd/util.rs1
-rw-r--r--src/libstd/vec.rs2
18 files changed, 812 insertions, 82 deletions
diff --git a/src/libnative/io/file.rs b/src/libnative/io/file.rs
index cc5b0770d4d..25fb2809e76 100644
--- a/src/libnative/io/file.rs
+++ b/src/libnative/io/file.rs
@@ -10,6 +10,7 @@
 
 //! Blocking posix-based file I/O
 
+use std::sync::arc::UnsafeArc;
 use std::c_str::CString;
 use std::io::IoError;
 use std::io;
@@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 {
 
 pub type fd_t = libc::c_int;
 
+struct Inner {
+    fd: fd_t,
+    close_on_drop: bool,
+}
+
 pub struct FileDesc {
-    priv fd: fd_t,
-    priv close_on_drop: bool,
+    priv inner: UnsafeArc<Inner>
 }
 
 impl FileDesc {
@@ -70,7 +75,10 @@ impl FileDesc {
     /// Note that all I/O operations done on this object will be *blocking*, but
     /// they do not require the runtime to be active.
     pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
-        FileDesc { fd: fd, close_on_drop: close_on_drop }
+        FileDesc { inner: UnsafeArc::new(Inner {
+            fd: fd,
+            close_on_drop: close_on_drop
+        }) }
     }
 
     // FIXME(#10465) these functions should not be public, but anything in
@@ -80,7 +88,7 @@ impl FileDesc {
         #[cfg(windows)] type rlen = libc::c_uint;
         #[cfg(not(windows))] type rlen = libc::size_t;
         let ret = retry(|| unsafe {
-            libc::read(self.fd,
+            libc::read(self.fd(),
                        buf.as_ptr() as *mut libc::c_void,
                        buf.len() as rlen) as libc::c_int
         });
@@ -97,7 +105,7 @@ impl FileDesc {
         #[cfg(not(windows))] type wlen = libc::size_t;
         let ret = keep_going(buf, |buf, len| {
             unsafe {
-                libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64
+                libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64
             }
         });
         if ret < 0 {
@@ -107,7 +115,11 @@ impl FileDesc {
         }
     }
 
-    pub fn fd(&self) -> fd_t { self.fd }
+    pub fn fd(&self) -> fd_t {
+        // This unsafety is fine because we're just reading off the file
+        // descriptor, no one is modifying this.
+        unsafe { (*self.inner.get()).fd }
+    }
 }
 
 impl io::Reader for FileDesc {
@@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc {
         self.inner_write(buf)
     }
     fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
-        return os_pread(self.fd, buf.as_ptr(), buf.len(), offset);
+        return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset);
 
         #[cfg(windows)]
         fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<int> {
@@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc {
         }
     }
     fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
-        return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset);
+        return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset);
 
         #[cfg(windows)]
         fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> {
@@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc {
             io::SeekCur => libc::FILE_CURRENT,
         };
         unsafe {
-            let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
+            let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
             let mut newpos = 0;
             match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) {
                 0 => Err(super::last_error()),
@@ -212,7 +224,7 @@ impl rtio::RtioFileStream for FileDesc {
             io::SeekEnd => libc::SEEK_END,
             io::SeekCur => libc::SEEK_CUR,
         };
-        let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) };
+        let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) };
         if n < 0 {
             Err(super::last_error())
         } else {
@@ -220,7 +232,7 @@ impl rtio::RtioFileStream for FileDesc {
         }
     }
     fn tell(&self) -> Result<u64, IoError> {
-        let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) };
+        let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) };
         if n < 0 {
             Err(super::last_error())
         } else {
@@ -228,7 +240,7 @@ impl rtio::RtioFileStream for FileDesc {
         }
     }
     fn fsync(&mut self) -> Result<(), IoError> {
-        return os_fsync(self.fd);
+        return os_fsync(self.fd());
 
         #[cfg(windows)]
         fn os_fsync(fd: c_int) -> IoResult<()> {
@@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc {
 
     #[cfg(not(windows))]
     fn datasync(&mut self) -> Result<(), IoError> {
-        return super::mkerr_libc(os_datasync(self.fd));
+        return super::mkerr_libc(os_datasync(self.fd()));
 
         #[cfg(target_os = "macos")]
         fn os_datasync(fd: c_int) -> c_int {
@@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc {
             Ok(_) => {}, Err(e) => return Err(e),
         };
         let ret = unsafe {
-            let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
+            let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
             match libc::SetEndOfFile(handle) {
                 0 => Err(super::last_error()),
                 _ => Ok(())
@@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc {
     #[cfg(unix)]
     fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
         super::mkerr_libc(retry(|| unsafe {
-            libc::ftruncate(self.fd, offset as libc::off_t)
+            libc::ftruncate(self.fd(), offset as libc::off_t)
         }))
     }
 }
@@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc {
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         self.inner_write(buf)
     }
+    fn clone(&self) -> ~rtio::RtioPipe {
+        ~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe
+    }
 }
 
 impl rtio::RtioTTY for FileDesc {
@@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc {
     fn isatty(&self) -> bool { false }
 }
 
-impl Drop for FileDesc {
+impl Drop for Inner {
     fn drop(&mut self) {
         // closing stdio file handles makes no sense, so never do it. Also, note
         // that errors are ignored when closing a file descriptor. The reason
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index dd916c8f3c4..32cd6337f99 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -14,6 +14,7 @@ use std::io;
 use std::libc;
 use std::mem;
 use std::rt::rtio;
+use std::sync::arc::UnsafeArc;
 use std::unstable::intrinsics;
 
 use super::{IoResult, retry};
@@ -108,10 +109,27 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
         let ret = libc::setsockopt(fd, opt, val,
                                    payload,
                                    mem::size_of::<T>() as libc::socklen_t);
-        super::mkerr_libc(ret)
+        if ret != 0 {
+            Err(last_error())
+        } else {
+            Ok(())
+        }
     }
 }
 
+#[cfg(windows)]
+fn last_error() -> io::IoError {
+    extern "system" {
+        fn WSAGetLastError() -> libc::c_int;
+    }
+    super::translate_error(unsafe { WSAGetLastError() }, true)
+}
+
+#[cfg(not(windows))]
+fn last_error() -> io::IoError {
+    super::last_error()
+}
+
 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
 
@@ -128,7 +146,7 @@ fn sockname(fd: sock_t,
                     storage as *mut libc::sockaddr,
                     &mut len as *mut libc::socklen_t);
         if ret != 0 {
-            return Err(super::last_error())
+            return Err(last_error())
         }
     }
     return sockaddr_to_addr(&storage, len as uint);
@@ -222,7 +240,11 @@ pub fn init() {
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct TcpStream {
-    priv fd: sock_t,
+    priv inner: UnsafeArc<Inner>,
+}
+
+struct Inner {
+    fd: sock_t,
 }
 
 impl TcpStream {
@@ -231,27 +253,31 @@ impl TcpStream {
             socket(addr, libc::SOCK_STREAM).and_then(|fd| {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
-                let ret = TcpStream { fd: fd };
+                let inner = Inner { fd: fd };
+                let ret = TcpStream { inner: UnsafeArc::new(inner) };
                 match retry(|| {
                     libc::connect(fd, addrp as *libc::sockaddr,
                                   len as libc::socklen_t)
                 }) {
-                    -1 => Err(super::last_error()),
+                    -1 => Err(last_error()),
                     _ => Ok(ret),
                 }
             })
         }
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
+    pub fn fd(&self) -> sock_t {
+        // This unsafety is fine because it's just a read-only arc
+        unsafe { (*self.inner.get()).fd }
+    }
 
     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY,
+        setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
                    nodelay as libc::c_int)
     }
 
     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
-        let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE,
+        let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
                              seconds.is_some() as libc::c_int);
         match seconds {
             Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
@@ -261,12 +287,12 @@ impl TcpStream {
 
     #[cfg(target_os = "macos")]
     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
+        setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
                    seconds as libc::c_int)
     }
     #[cfg(target_os = "freebsd")]
     fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
+        setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
                    seconds as libc::c_int)
     }
     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
@@ -282,7 +308,7 @@ impl rtio::RtioTcpStream for TcpStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
         let ret = retry(|| {
             unsafe {
-                libc::recv(self.fd,
+                libc::recv(self.fd(),
                            buf.as_ptr() as *mut libc::c_void,
                            buf.len() as wrlen,
                            0) as libc::c_int
@@ -291,7 +317,7 @@ impl rtio::RtioTcpStream for TcpStream {
         if ret == 0 {
             Err(io::standard_error(io::EndOfFile))
         } else if ret < 0 {
-            Err(super::last_error())
+            Err(last_error())
         } else {
             Ok(ret as uint)
         }
@@ -299,20 +325,20 @@ impl rtio::RtioTcpStream for TcpStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
         let ret = keep_going(buf, |buf, len| {
             unsafe {
-                libc::send(self.fd,
+                libc::send(self.fd(),
                            buf as *mut libc::c_void,
                            len as wrlen,
                            0) as i64
             }
         });
         if ret < 0 {
-            Err(super::last_error())
+            Err(last_error())
         } else {
             Ok(())
         }
     }
     fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
-        sockname(self.fd, libc::getpeername)
+        sockname(self.fd(), libc::getpeername)
     }
     fn control_congestion(&mut self) -> IoResult<()> {
         self.set_nodelay(false)
@@ -326,15 +352,19 @@ impl rtio::RtioTcpStream for TcpStream {
     fn letdie(&mut self) -> IoResult<()> {
         self.set_keepalive(None)
     }
+
+    fn clone(&self) -> ~rtio::RtioTcpStream {
+        ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream
+    }
 }
 
 impl rtio::RtioSocket for TcpStream {
     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
-        sockname(self.fd, libc::getsockname)
+        sockname(self.fd(), libc::getsockname)
     }
 }
 
-impl Drop for TcpStream {
+impl Drop for Inner {
     fn drop(&mut self) { unsafe { close(self.fd); } }
 }
 
@@ -343,7 +373,7 @@ impl Drop for TcpStream {
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct TcpListener {
-    priv fd: sock_t,
+    priv inner: UnsafeArc<Inner>,
 }
 
 impl TcpListener {
@@ -352,7 +382,8 @@ impl TcpListener {
             socket(addr, libc::SOCK_STREAM).and_then(|fd| {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
-                let ret = TcpListener { fd: fd };
+                let inner = Inner { fd: fd };
+                let ret = TcpListener { inner: UnsafeArc::new(inner) };
                 // On platforms with Berkeley-derived sockets, this allows
                 // to quickly rebind a socket, without needing to wait for
                 // the OS to clean up the previous one.
@@ -366,18 +397,21 @@ impl TcpListener {
                 }
                 match libc::bind(fd, addrp as *libc::sockaddr,
                                  len as libc::socklen_t) {
-                    -1 => Err(super::last_error()),
+                    -1 => Err(last_error()),
                     _ => Ok(ret),
                 }
             })
         }
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
+    pub fn fd(&self) -> sock_t {
+        // This is just a read-only arc so the unsafety is fine
+        unsafe { (*self.inner.get()).fd }
+    }
 
     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
-        match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
-            -1 => Err(super::last_error()),
+        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
+            -1 => Err(last_error()),
             _ => Ok(TcpAcceptor { listener: self })
         }
     }
@@ -391,20 +425,16 @@ impl rtio::RtioTcpListener for TcpListener {
 
 impl rtio::RtioSocket for TcpListener {
     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
-        sockname(self.fd, libc::getsockname)
+        sockname(self.fd(), libc::getsockname)
     }
 }
 
-impl Drop for TcpListener {
-    fn drop(&mut self) { unsafe { close(self.fd); } }
-}
-
 pub struct TcpAcceptor {
     priv listener: TcpListener,
 }
 
 impl TcpAcceptor {
-    pub fn fd(&self) -> sock_t { self.listener.fd }
+    pub fn fd(&self) -> sock_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
         unsafe {
@@ -417,8 +447,8 @@ impl TcpAcceptor {
                              storagep as *mut libc::sockaddr,
                              &mut size as *mut libc::socklen_t) as libc::c_int
             }) as sock_t {
-                -1 => Err(super::last_error()),
-                fd => Ok(TcpStream { fd: fd })
+                -1 => Err(last_error()),
+                fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })})
             }
         }
     }
@@ -444,7 +474,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
 ////////////////////////////////////////////////////////////////////////////////
 
 pub struct UdpSocket {
-    priv fd: sock_t,
+    priv inner: UnsafeArc<Inner>,
 }
 
 impl UdpSocket {
@@ -453,25 +483,29 @@ impl UdpSocket {
             socket(addr, libc::SOCK_DGRAM).and_then(|fd| {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
-                let ret = UdpSocket { fd: fd };
+                let inner = Inner { fd: fd };
+                let ret = UdpSocket { inner: UnsafeArc::new(inner) };
                 match libc::bind(fd, addrp as *libc::sockaddr,
                                  len as libc::socklen_t) {
-                    -1 => Err(super::last_error()),
+                    -1 => Err(last_error()),
                     _ => Ok(ret),
                 }
             })
         }
     }
 
-    pub fn fd(&self) -> sock_t { self.fd }
+    pub fn fd(&self) -> sock_t {
+        // unsafety is fine because it's just a read-only arc
+        unsafe { (*self.inner.get()).fd }
+    }
 
     pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
-        setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST,
+        setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
                    on as libc::c_int)
     }
 
     pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
+        setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
                    on as libc::c_int)
     }
 
@@ -484,14 +518,14 @@ impl UdpSocket {
                     // interface == INADDR_ANY
                     imr_interface: libc::in_addr { s_addr: 0x0 },
                 };
-                setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq)
+                setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
             }
             In6Addr(addr) => {
                 let mreq = libc::ip6_mreq {
                     ipv6mr_multiaddr: addr,
                     ipv6mr_interface: 0,
                 };
-                setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq)
+                setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
             }
         }
     }
@@ -514,14 +548,14 @@ impl rtio::RtioUdpSocket for UdpSocket {
             let mut addrlen: libc::socklen_t =
                     mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
             let ret = retry(|| {
-                libc::recvfrom(self.fd,
+                libc::recvfrom(self.fd(),
                                buf.as_ptr() as *mut libc::c_void,
                                buf.len() as msglen_t,
                                0,
                                storagep as *mut libc::sockaddr,
                                &mut addrlen) as libc::c_int
             });
-            if ret < 0 { return Err(super::last_error()) }
+            if ret < 0 { return Err(last_error()) }
             sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
                 Ok((ret as uint, addr))
             })
@@ -532,7 +566,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
         let dstp = &dst as *libc::sockaddr_storage;
         unsafe {
             let ret = retry(|| {
-                libc::sendto(self.fd,
+                libc::sendto(self.fd(),
                              buf.as_ptr() as *libc::c_void,
                              buf.len() as msglen_t,
                              0,
@@ -540,7 +574,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
                              len as libc::socklen_t) as libc::c_int
             });
             match ret {
-                -1 => Err(super::last_error()),
+                -1 => Err(last_error()),
                 n if n as uint != buf.len() => {
                     Err(io::IoError {
                         kind: io::OtherIoError,
@@ -582,11 +616,11 @@ impl rtio::RtioUdpSocket for UdpSocket {
     }
 
     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
+        setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
                    ttl as libc::c_int)
     }
     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
-        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
+        setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
     }
 
     fn hear_broadcasts(&mut self) -> IoResult<()> {
@@ -595,8 +629,8 @@ impl rtio::RtioUdpSocket for UdpSocket {
     fn ignore_broadcasts(&mut self) -> IoResult<()> {
         self.set_broadcast(false)
     }
-}
 
-impl Drop for UdpSocket {
-    fn drop(&mut self) { unsafe { close(self.fd) } }
+    fn clone(&self) -> ~rtio::RtioUdpSocket {
+        ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket
+    }
 }
diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs
new file mode 100644
index 00000000000..9d06593a6ea
--- /dev/null
+++ b/src/librustuv/access.rs
@@ -0,0 +1,109 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// An exclusive access primitive
+///
+/// This primitive is used to gain exclusive access to read() and write() in uv.
+/// It is assumed that all invocations of this struct happen on the same thread
+/// (the uv event loop).
+
+use std::cast;
+use std::sync::arc::UnsafeArc;
+use std::rt::task::{BlockedTask, Task};
+use std::rt::local::Local;
+
+use homing::HomingMissile;
+
+pub struct Access {
+    priv inner: UnsafeArc<Inner>,
+}
+
+pub struct Guard<'a> {
+    priv access: &'a mut Access,
+    priv missile: Option<HomingMissile>,
+}
+
+struct Inner {
+    queue: ~[BlockedTask],
+    held: bool,
+}
+
+impl Access {
+    pub fn new() -> Access {
+        Access {
+            inner: UnsafeArc::new(Inner {
+                queue: ~[],
+                held: false,
+            })
+        }
+    }
+
+    pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
+        // This unsafety is actually OK because the homing missile argument
+        // guarantees that we're on the same event loop as all the other objects
+        // attempting to get access granted.
+        let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
+
+        if inner.held {
+            let t: ~Task = Local::take();
+            t.deschedule(1, |task| {
+                inner.queue.push(task);
+                Ok(())
+            });
+            assert!(inner.held);
+        } else {
+            inner.held = true;
+        }
+
+        Guard { access: self, missile: Some(missile) }
+    }
+}
+
+impl Clone for Access {
+    fn clone(&self) -> Access {
+        Access { inner: self.inner.clone() }
+    }
+}
+
+#[unsafe_destructor]
+impl<'a> Drop for Guard<'a> {
+    fn drop(&mut self) {
+        // This guard's homing missile is still armed, so we're guaranteed to be
+        // on the same I/O event loop, so this unsafety should be ok.
+        assert!(self.missile.is_some());
+        let inner: &mut Inner = unsafe {
+            cast::transmute(self.access.inner.get())
+        };
+
+        match inner.queue.shift() {
+            // Here we have found a task that was waiting for access, and we
+            // current have the "access lock" we need to relinquish access to
+            // this sleeping task.
+            //
+            // To do so, we first drop out homing missile and we then reawaken
+            // the task. In reawakening the task, it will be immediately
+            // scheduled on this scheduler. Because we might be woken up on some
+            // other scheduler, we drop our homing missile before we reawaken
+            // the task.
+            Some(task) => {
+                drop(self.missile.take());
+                let _ = task.wake().map(|t| t.reawaken());
+            }
+            None => { inner.held = false; }
+        }
+    }
+}
+
+impl Drop for Inner {
+    fn drop(&mut self) {
+        assert!(!self.held);
+        assert_eq!(self.queue.len(), 0);
+    }
+}
diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs
index a2f3457a943..25c929c995d 100644
--- a/src/librustuv/homing.rs
+++ b/src/librustuv/homing.rs
@@ -125,8 +125,8 @@ pub trait HomingIO {
 /// After a homing operation has been completed, this will return the current
 /// task back to its appropriate home (if applicable). The field is used to
 /// assert that we are where we think we are.
-struct HomingMissile {
-    io_home: uint,
+pub struct HomingMissile {
+    priv io_home: uint,
 }
 
 impl HomingMissile {
diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs
index 39d6f851e17..b463bb7fd73 100644
--- a/src/librustuv/lib.rs
+++ b/src/librustuv/lib.rs
@@ -68,8 +68,10 @@ pub use self::tty::TtyWatcher;
 
 mod macros;
 
-mod queue;
+mod access;
 mod homing;
+mod queue;
+mod rc;
 
 /// The implementation of `rtio` for libuv
 pub mod uvio;
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 5461fc6272d..7660d2c4f2b 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -19,7 +19,9 @@ use std::rt::rtio;
 use std::rt::task::BlockedTask;
 use std::unstable::intrinsics;
 
+use access::Access;
 use homing::{HomingIO, HomeHandle};
+use rc::Refcount;
 use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
@@ -152,6 +154,14 @@ pub struct TcpWatcher {
     handle: *uvll::uv_tcp_t,
     stream: StreamWatcher,
     home: HomeHandle,
+    priv refcount: Refcount,
+
+    // libuv can't support concurrent reads and concurrent writes of the same
+    // stream object, so we use these access guards in order to arbitrate among
+    // multiple concurrent reads and writes. Note that libuv *can* read and
+    // write simultaneously, it just can't read and read simultaneously.
+    priv read_access: Access,
+    priv write_access: Access,
 }
 
 pub struct TcpListener {
@@ -183,6 +193,9 @@ impl TcpWatcher {
             home: home,
             handle: handle,
             stream: StreamWatcher::new(handle),
+            refcount: Refcount::new(),
+            read_access: Access::new(),
+            write_access: Access::new(),
         }
     }
 
@@ -238,12 +251,14 @@ impl rtio::RtioSocket for TcpWatcher {
 
 impl rtio::RtioTcpStream for TcpWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.read_access.grant(m);
         self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.write_access.grant(m);
         self.stream.write(buf).map_err(uv_error_to_io_error)
     }
 
@@ -280,6 +295,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
         })
     }
+
+    fn clone(&self) -> ~rtio::RtioTcpStream {
+        ~TcpWatcher {
+            handle: self.handle,
+            stream: StreamWatcher::new(self.handle),
+            home: self.home.clone(),
+            refcount: self.refcount.clone(),
+            write_access: self.write_access.clone(),
+            read_access: self.read_access.clone(),
+        } as ~rtio::RtioTcpStream
+    }
 }
 
 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@@ -289,7 +315,9 @@ impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
 impl Drop for TcpWatcher {
     fn drop(&mut self) {
         let _m = self.fire_homing_missile();
-        self.close();
+        if self.refcount.decrement() {
+            self.close();
+        }
     }
 }
 
@@ -415,6 +443,11 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
 pub struct UdpWatcher {
     handle: *uvll::uv_udp_t,
     home: HomeHandle,
+
+    // See above for what these fields are
+    priv refcount: Refcount,
+    priv read_access: Access,
+    priv write_access: Access,
 }
 
 impl UdpWatcher {
@@ -423,6 +456,9 @@ impl UdpWatcher {
         let udp = UdpWatcher {
             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
             home: io.make_handle(),
+            refcount: Refcount::new(),
+            read_access: Access::new(),
+            write_access: Access::new(),
         };
         assert_eq!(unsafe {
             uvll::uv_udp_init(io.uv_loop(), udp.handle)
@@ -463,7 +499,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
             buf: Option<Buf>,
             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
         }
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.read_access.grant(m);
 
         let a = match unsafe {
             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
@@ -533,7 +570,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
         struct Ctx { task: Option<BlockedTask>, result: c_int }
 
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.write_access.grant(m);
 
         let mut req = Request::new(uvll::UV_UDP_SEND);
         let buf = slice_to_uv_buf(buf);
@@ -636,13 +674,25 @@ impl rtio::RtioUdpSocket for UdpWatcher {
                                        0 as c_int)
         })
     }
+
+    fn clone(&self) -> ~rtio::RtioUdpSocket {
+        ~UdpWatcher {
+            handle: self.handle,
+            home: self.home.clone(),
+            refcount: self.refcount.clone(),
+            write_access: self.write_access.clone(),
+            read_access: self.read_access.clone(),
+        } as ~rtio::RtioUdpSocket
+    }
 }
 
 impl Drop for UdpWatcher {
     fn drop(&mut self) {
         // Send ourselves home to close this handle (blocking while doing so).
         let _m = self.fire_homing_missile();
-        self.close();
+        if self.refcount.decrement() {
+            self.close();
+        }
     }
 }
 
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index a021a13e2d9..c312f112d28 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -14,7 +14,9 @@ use std::libc;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
 use std::rt::task::BlockedTask;
 
+use access::Access;
 use homing::{HomingIO, HomeHandle};
+use rc::Refcount;
 use stream::StreamWatcher;
 use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
             wait_until_woken_after, wakeup};
@@ -25,6 +27,11 @@ pub struct PipeWatcher {
     stream: StreamWatcher,
     home: HomeHandle,
     priv defused: bool,
+    priv refcount: Refcount,
+
+    // see comments in TcpWatcher for why these exist
+    priv write_access: Access,
+    priv read_access: Access,
 }
 
 pub struct PipeListener {
@@ -61,6 +68,9 @@ impl PipeWatcher {
             stream: StreamWatcher::new(handle),
             home: home,
             defused: false,
+            refcount: Refcount::new(),
+            read_access: Access::new(),
+            write_access: Access::new(),
         }
     }
 
@@ -118,14 +128,27 @@ impl PipeWatcher {
 
 impl RtioPipe for PipeWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.read_access.grant(m);
         self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
+        let m = self.fire_homing_missile();
+        let _g = self.write_access.grant(m);
         self.stream.write(buf).map_err(uv_error_to_io_error)
     }
+
+    fn clone(&self) -> ~RtioPipe {
+        ~PipeWatcher {
+            stream: StreamWatcher::new(self.stream.handle),
+            defused: false,
+            home: self.home.clone(),
+            refcount: self.refcount.clone(),
+            read_access: self.read_access.clone(),
+            write_access: self.write_access.clone(),
+        } as ~RtioPipe
+    }
 }
 
 impl HomingIO for PipeWatcher {
@@ -138,8 +161,8 @@ impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
 
 impl Drop for PipeWatcher {
     fn drop(&mut self) {
-        if !self.defused {
-            let _m = self.fire_homing_missile();
+        let _m = self.fire_homing_missile();
+        if !self.defused && self.refcount.decrement() {
             self.close();
         }
     }
diff --git a/src/librustuv/rc.rs b/src/librustuv/rc.rs
new file mode 100644
index 00000000000..f43cf722361
--- /dev/null
+++ b/src/librustuv/rc.rs
@@ -0,0 +1,49 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Simple refcount structure for cloning handles
+///
+/// This is meant to be an unintrusive solution to cloning handles in rustuv.
+/// The handles themselves shouldn't be sharing memory because there are bits of
+/// state in the rust objects which shouldn't be shared across multiple users of
+/// the same underlying uv object, hence Rc is not used and this simple counter
+/// should suffice.
+
+use std::sync::arc::UnsafeArc;
+
+pub struct Refcount {
+    priv rc: UnsafeArc<uint>,
+}
+
+impl Refcount {
+    /// Creates a new refcount of 1
+    pub fn new() -> Refcount {
+        Refcount { rc: UnsafeArc::new(1) }
+    }
+
+    fn increment(&self) {
+        unsafe { *self.rc.get() += 1; }
+    }
+
+    /// Returns whether the refcount just hit 0 or not
+    pub fn decrement(&self) -> bool {
+        unsafe {
+            *self.rc.get() -= 1;
+            *self.rc.get() == 0
+        }
+    }
+}
+
+impl Clone for Refcount {
+    fn clone(&self) -> Refcount {
+        self.increment();
+        Refcount { rc: self.rc.clone() }
+    }
+}
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index a0bdc193d98..66ceb03082f 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -8,11 +8,42 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+//! TCP network connections
+//!
+//! This module contains the ability to open a TCP stream to a socket address,
+//! as well as creating a socket server to accept incoming connections. The
+//! destination and binding addresses can either be an IPv4 or IPv6 address.
+//!
+//! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
+//! listener (socket server) implements the `Listener` and `Acceptor` traits.
+
+#[deny(missing_doc)];
+
+use clone::Clone;
 use io::net::ip::SocketAddr;
-use io::{Reader, Writer, Listener, Acceptor, IoResult};
+use io::{Reader, Writer, Listener, Acceptor};
+use io::IoResult;
 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
 
+/// A structure which represents a TCP stream between a local socket and a
+/// remote socket.
+///
+/// # Example
+///
+/// ```rust
+/// # #[allow(unused_must_use)];
+/// use std::io::net::tcp::TcpStream;
+/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
+///
+/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
+/// let mut stream = TcpStream::connect(addr);
+///
+/// stream.write([1]);
+/// let mut buf = [0];
+/// stream.read(buf);
+/// drop(stream); // close the connection
+/// ```
 pub struct TcpStream {
     priv obj: ~RtioTcpStream
 }
@@ -22,21 +53,40 @@ impl TcpStream {
         TcpStream { obj: s }
     }
 
+    /// Creates a TCP connection to a remote socket address.
+    ///
+    /// If no error is encountered, then `Ok(stream)` is returned.
     pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
         LocalIo::maybe_raise(|io| {
             io.tcp_connect(addr).map(TcpStream::new)
         })
     }
 
+    /// Returns the socket address of the remote peer of this TCP connection.
     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
         self.obj.peer_name()
     }
 
+    /// Returns the socket address of the local half of this TCP connection.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
         self.obj.socket_name()
     }
 }
 
+impl Clone for TcpStream {
+    /// Creates a new handle to this TCP stream, allowing for simultaneous reads
+    /// and writes of this connection.
+    ///
+    /// The underlying TCP stream will not be closed until all handles to the
+    /// stream have been deallocated. All handles will also follow the same
+    /// stream, but two concurrent reads will not receive the same data.
+    /// Instead, the first read will receive the first packet received, and the
+    /// second read will receive the second packet.
+    fn clone(&self) -> TcpStream {
+        TcpStream { obj: self.obj.clone() }
+    }
+}
+
 impl Reader for TcpStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
 }
@@ -45,17 +95,56 @@ impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
 }
 
+/// A structure representing a socket server. This listener is used to create a
+/// `TcpAcceptor` which can be used to accept sockets on a local port.
+///
+/// # Example
+///
+/// ```rust
+/// # fn main() {}
+/// # fn foo() {
+/// # #[allow(unused_must_use, dead_code)];
+/// use std::io::net::tcp::TcpListener;
+/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
+/// use std::io::{Acceptor, Listener};
+///
+/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
+/// let listener = TcpListener::bind(addr);
+///
+/// // bind the listener to the specified address
+/// let mut acceptor = listener.listen();
+///
+/// // accept connections and process them
+/// # fn handle_client<T>(_: T) {}
+/// for stream in acceptor.incoming() {
+///     spawn(proc() {
+///         handle_client(stream);
+///     });
+/// }
+///
+/// // close the socket server
+/// drop(acceptor);
+/// # }
+/// ```
 pub struct TcpListener {
     priv obj: ~RtioTcpListener
 }
 
 impl TcpListener {
+    /// Creates a new `TcpListener` which will be bound to the specified local
+    /// socket address. This listener is not ready for accepting connections,
+    /// `listen` must be called on it before that's possible.
+    ///
+    /// Binding with a port number of 0 will request that the OS assigns a port
+    /// to this listener. The port allocated can be queried via the
+    /// `socket_name` function.
     pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> {
         LocalIo::maybe_raise(|io| {
             io.tcp_bind(addr).map(|l| TcpListener { obj: l })
         })
     }
 
+    /// Returns the local socket address of this listener.
     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
         self.obj.socket_name()
     }
@@ -67,6 +156,9 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
     }
 }
 
+/// The accepting half of a TCP socket server. This structure is created through
+/// a `TcpListener`'s `listen` method, and this object can be used to accept new
+/// `TcpStream` instances.
 pub struct TcpAcceptor {
     priv obj: ~RtioTcpAcceptor
 }
@@ -573,4 +665,91 @@ mod test {
         }
         let _listener = TcpListener::bind(addr);
     })
+
+    iotest!(fn tcp_clone_smoke() {
+        let addr = next_test_ip4();
+        let mut acceptor = TcpListener::bind(addr).listen();
+
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr);
+            let mut buf = [0, 0];
+            assert_eq!(s.read(buf), Ok(1));
+            assert_eq!(buf[0], 1);
+            s.write([2]).unwrap();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            p1.recv();
+            s2.write([1]).unwrap();
+            c2.send(());
+        });
+        c1.send(());
+        let mut buf = [0, 0];
+        assert_eq!(s1.read(buf), Ok(1));
+        p2.recv();
+    })
+
+    iotest!(fn tcp_clone_two_read() {
+        let addr = next_test_ip6();
+        let mut acceptor = TcpListener::bind(addr).listen();
+        let (p, c) = SharedChan::new();
+        let c2 = c.clone();
+
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr);
+            s.write([1]).unwrap();
+            p.recv();
+            s.write([2]).unwrap();
+            p.recv();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p, done) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            let mut buf = [0, 0];
+            s2.read(buf).unwrap();
+            c2.send(());
+            done.send(());
+        });
+        let mut buf = [0, 0];
+        s1.read(buf).unwrap();
+        c.send(());
+
+        p.recv();
+    })
+
+    iotest!(fn tcp_clone_two_write() {
+        let addr = next_test_ip4();
+        let mut acceptor = TcpListener::bind(addr).listen();
+
+        spawn(proc() {
+            let mut s = TcpStream::connect(addr);
+            let mut buf = [0, 1];
+            s.read(buf).unwrap();
+            s.read(buf).unwrap();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p, done) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            s2.write([1]).unwrap();
+            done.send(());
+        });
+        s1.write([2]).unwrap();
+
+        p.recv();
+    })
 }
+
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index 0ef62648afc..3c02f563847 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -8,6 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use clone::Clone;
 use result::{Ok, Err};
 use io::net::ip::SocketAddr;
 use io::{Reader, Writer, IoResult};
@@ -41,6 +42,19 @@ impl UdpSocket {
     }
 }
 
+impl Clone for UdpSocket {
+    /// Creates a new handle to this UDP socket, allowing for simultaneous reads
+    /// and writes of the socket.
+    ///
+    /// The underlying UDP socket will not be closed until all handles to the
+    /// socket have been deallocated. Two concurrent reads will not receive the
+    /// same data.  Instead, the first read will receive the first packet
+    /// received, and the second read will receive the second packet.
+    fn clone(&self) -> UdpSocket {
+        UdpSocket { obj: self.obj.clone() }
+    }
+}
+
 pub struct UdpStream {
     priv socket: UdpSocket,
     priv connectedTo: SocketAddr
@@ -250,4 +264,107 @@ mod test {
     iotest!(fn socket_name_ip6() {
         socket_name(next_test_ip6());
     })
+
+    iotest!(fn udp_clone_smoke() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut sock1 = UdpSocket::bind(addr1).unwrap();
+        let sock2 = UdpSocket::bind(addr2).unwrap();
+
+        spawn(proc() {
+            let mut sock2 = sock2;
+            let mut buf = [0, 0];
+            assert_eq!(sock2.recvfrom(buf), Ok((1, addr1)));
+            assert_eq!(buf[0], 1);
+            sock2.sendto([2], addr1).unwrap();
+        });
+
+        let sock3 = sock1.clone();
+
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
+        spawn(proc() {
+            let mut sock3 = sock3;
+            p1.recv();
+            sock3.sendto([1], addr2).unwrap();
+            c2.send(());
+        });
+        c1.send(());
+        let mut buf = [0, 0];
+        assert_eq!(sock1.recvfrom(buf), Ok((1, addr2)));
+        p2.recv();
+    })
+
+    iotest!(fn udp_clone_two_read() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut sock1 = UdpSocket::bind(addr1).unwrap();
+        let sock2 = UdpSocket::bind(addr2).unwrap();
+        let (p, c) = SharedChan::new();
+        let c2 = c.clone();
+
+        spawn(proc() {
+            let mut sock2 = sock2;
+            sock2.sendto([1], addr1).unwrap();
+            p.recv();
+            sock2.sendto([2], addr1).unwrap();
+            p.recv();
+        });
+
+        let sock3 = sock1.clone();
+
+        let (p, done) = Chan::new();
+        spawn(proc() {
+            let mut sock3 = sock3;
+            let mut buf = [0, 0];
+            sock3.recvfrom(buf).unwrap();
+            c2.send(());
+            done.send(());
+        });
+        let mut buf = [0, 0];
+        sock1.recvfrom(buf).unwrap();
+        c.send(());
+
+        p.recv();
+    })
+
+    iotest!(fn udp_clone_two_write() {
+        let addr1 = next_test_ip4();
+        let addr2 = next_test_ip4();
+        let mut sock1 = UdpSocket::bind(addr1).unwrap();
+        let sock2 = UdpSocket::bind(addr2).unwrap();
+
+        let (p, c) = SharedChan::new();
+
+        spawn(proc() {
+            let mut sock2 = sock2;
+            let mut buf = [0, 1];
+
+            for _ in p.iter() {
+                match sock2.recvfrom(buf) {
+                    Ok(..) => {}
+                    Err(e) => fail!("failed receive: {}", e),
+                }
+            }
+        });
+
+        let sock3 = sock1.clone();
+
+        let (p, done) = Chan::new();
+        let c2 = c.clone();
+        spawn(proc() {
+            let mut sock3 = sock3;
+            match sock3.sendto([1], addr2) {
+                Ok(..) => c2.send(()),
+                Err(..) => {}
+            }
+            done.send(());
+        });
+        match sock1.sendto([2], addr2) {
+            Ok(..) => c.send(()),
+            Err(..) => {}
+        }
+
+        p.recv();
+    })
 }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index ce95b987663..3c7db9c8686 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -25,6 +25,7 @@ instances as clients.
 use prelude::*;
 
 use c_str::ToCStr;
+use clone::Clone;
 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
 use io::pipe::PipeStream;
@@ -62,6 +63,12 @@ impl UnixStream {
     }
 }
 
+impl Clone for UnixStream {
+    fn clone(&self) -> UnixStream {
+        UnixStream { obj: self.obj.clone() }
+    }
+}
+
 impl Reader for UnixStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
 }
@@ -228,4 +235,93 @@ mod tests {
         let _acceptor = UnixListener::bind(&path).listen();
         assert!(path.exists());
     }
+
+    #[test]
+    fn unix_clone_smoke() {
+        let addr = next_test_unix();
+        let mut acceptor = UnixListener::bind(&addr).listen();
+
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr);
+            let mut buf = [0, 0];
+            assert_eq!(s.read(buf), Ok(1));
+            assert_eq!(buf[0], 1);
+            s.write([2]).unwrap();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p1, c1) = Chan::new();
+        let (p2, c2) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            p1.recv();
+            s2.write([1]).unwrap();
+            c2.send(());
+        });
+        c1.send(());
+        let mut buf = [0, 0];
+        assert_eq!(s1.read(buf), Ok(1));
+        p2.recv();
+    }
+
+    #[test]
+    fn unix_clone_two_read() {
+        let addr = next_test_unix();
+        let mut acceptor = UnixListener::bind(&addr).listen();
+        let (p, c) = SharedChan::new();
+        let c2 = c.clone();
+
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr);
+            s.write([1]).unwrap();
+            p.recv();
+            s.write([2]).unwrap();
+            p.recv();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p, done) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            let mut buf = [0, 0];
+            s2.read(buf).unwrap();
+            c2.send(());
+            done.send(());
+        });
+        let mut buf = [0, 0];
+        s1.read(buf).unwrap();
+        c.send(());
+
+        p.recv();
+    }
+
+    #[test]
+    fn unix_clone_two_write() {
+        let addr = next_test_unix();
+        let mut acceptor = UnixListener::bind(&addr).listen();
+
+        spawn(proc() {
+            let mut s = UnixStream::connect(&addr);
+            let mut buf = [0, 1];
+            s.read(buf).unwrap();
+            s.read(buf).unwrap();
+        });
+
+        let mut s1 = acceptor.accept().unwrap();
+        let s2 = s1.clone();
+
+        let (p, done) = Chan::new();
+        spawn(proc() {
+            let mut s2 = s2;
+            s2.write([1]).unwrap();
+            done.send(());
+        });
+        s1.write([2]).unwrap();
+
+        p.recv();
+    }
 }
diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs
index ca85707149b..83250bdae73 100644
--- a/src/libstd/io/pipe.rs
+++ b/src/libstd/io/pipe.rs
@@ -51,6 +51,12 @@ impl PipeStream {
     }
 }
 
+impl Clone for PipeStream {
+    fn clone(&self) -> PipeStream {
+        PipeStream { obj: self.obj.clone() }
+    }
+}
+
 impl Reader for PipeStream {
     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
 }
diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs
index 11a7b5dd191..057d618f444 100644
--- a/src/libstd/libc.rs
+++ b/src/libstd/libc.rs
@@ -960,6 +960,8 @@ pub mod types {
             }
             pub mod extra {
                 use ptr;
+                use libc::consts::os::extra::{MAX_PROTOCOL_CHAIN,
+                                              WSAPROTOCOL_LEN};
                 use libc::types::common::c95::c_void;
                 use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t};
                 use libc::types::os::arch::c95::{c_long, c_ulong};
@@ -1106,6 +1108,47 @@ pub mod types {
                 }
 
                 pub type LPFILETIME = *mut FILETIME;
+
+                pub struct GUID {
+                    Data1: DWORD,
+                    Data2: DWORD,
+                    Data3: DWORD,
+                    Data4: [BYTE, ..8],
+                }
+
+                struct WSAPROTOCOLCHAIN {
+                    ChainLen: c_int,
+                    ChainEntries: [DWORD, ..MAX_PROTOCOL_CHAIN],
+                }
+
+                pub type LPWSAPROTOCOLCHAIN = *mut WSAPROTOCOLCHAIN;
+
+                pub struct WSAPROTOCOL_INFO {
+                    dwServiceFlags1: DWORD,
+                    dwServiceFlags2: DWORD,
+                    dwServiceFlags3: DWORD,
+                    dwServiceFlags4: DWORD,
+                    dwProviderFlags: DWORD,
+                    ProviderId: GUID,
+                    dwCatalogEntryId: DWORD,
+                    ProtocolChain: WSAPROTOCOLCHAIN,
+                    iVersion: c_int,
+                    iAddressFamily: c_int,
+                    iMaxSockAddr: c_int,
+                    iMinSockAddr: c_int,
+                    iSocketType: c_int,
+                    iProtocol: c_int,
+                    iProtocolMaxOffset: c_int,
+                    iNetworkByteOrder: c_int,
+                    iSecurityScheme: c_int,
+                    dwMessageSize: DWORD,
+                    dwProviderReserved: DWORD,
+                    szProtocol: [u8, ..WSAPROTOCOL_LEN+1],
+                }
+
+                pub type LPWSAPROTOCOL_INFO = *mut WSAPROTOCOL_INFO;
+
+                pub type GROUP = c_uint;
             }
         }
     }
@@ -1721,6 +1764,10 @@ pub mod consts {
             pub static FILE_BEGIN: DWORD = 0;
             pub static FILE_CURRENT: DWORD = 1;
             pub static FILE_END: DWORD = 2;
+
+            pub static MAX_PROTOCOL_CHAIN: DWORD = 7;
+            pub static WSAPROTOCOL_LEN: DWORD = 255;
+            pub static INVALID_SOCKET: DWORD = !0;
         }
         pub mod sysconf {
         }
@@ -4098,6 +4145,8 @@ pub mod funcs {
                             lpFrequency: *mut LARGE_INTEGER) -> BOOL;
                 pub fn QueryPerformanceCounter(
                             lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL;
+
+                pub fn GetCurrentProcessId() -> DWORD;
             }
         }
 
diff --git a/src/libstd/option.rs b/src/libstd/option.rs
index 39b516aeb12..7bb29fdfacf 100644
--- a/src/libstd/option.rs
+++ b/src/libstd/option.rs
@@ -480,7 +480,6 @@ mod tests {
 
     use iter::range;
     use str::StrSlice;
-    use util;
     use kinds::marker;
     use vec::ImmutableVector;
 
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 35b1e21df06..8d02048d55c 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -203,6 +203,7 @@ pub trait RtioTcpStream : RtioSocket {
     fn nodelay(&mut self) -> Result<(), IoError>;
     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
     fn letdie(&mut self) -> Result<(), IoError>;
+    fn clone(&self) -> ~RtioTcpStream;
 }
 
 pub trait RtioSocket {
@@ -224,6 +225,8 @@ pub trait RtioUdpSocket : RtioSocket {
 
     fn hear_broadcasts(&mut self) -> Result<(), IoError>;
     fn ignore_broadcasts(&mut self) -> Result<(), IoError>;
+
+    fn clone(&self) -> ~RtioUdpSocket;
 }
 
 pub trait RtioTimer {
@@ -253,6 +256,7 @@ pub trait RtioProcess {
 pub trait RtioPipe {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
+    fn clone(&self) -> ~RtioPipe;
 }
 
 pub trait RtioUnixListener {
diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs
index 4804de75687..82957cd93ce 100644
--- a/src/libstd/unstable/mutex.rs
+++ b/src/libstd/unstable/mutex.rs
@@ -380,7 +380,6 @@ mod test {
 
     use super::{Mutex, MUTEX_INIT};
     use rt::thread::Thread;
-    use task;
 
     #[test]
     fn somke_lock() {
diff --git a/src/libstd/util.rs b/src/libstd/util.rs
index c075f9b4ba8..715a10b9112 100644
--- a/src/libstd/util.rs
+++ b/src/libstd/util.rs
@@ -69,7 +69,6 @@ impl Void {
 mod tests {
     use super::*;
     use prelude::*;
-    use mem::size_of;
 
     #[test]
     fn identity_crisis() {
diff --git a/src/libstd/vec.rs b/src/libstd/vec.rs
index 4a6a4d54ae3..d53c2dceba2 100644
--- a/src/libstd/vec.rs
+++ b/src/libstd/vec.rs
@@ -4253,7 +4253,7 @@ mod tests {
         let h = x.mut_last();
         assert_eq!(*h.unwrap(), 5);
 
-        let mut y: &mut [int] = [];
+        let y: &mut [int] = [];
         assert!(y.mut_last().is_none());
     }
 }