about summary refs log tree commit diff
path: root/src/libstd/sys/windows
diff options
context:
space:
mode:
authorAaron Turon <aturon@mozilla.com>2014-10-10 10:11:49 -0700
committerAaron Turon <aturon@mozilla.com>2014-11-08 20:40:38 -0800
commitd34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41 (patch)
treebdb9af03a1b73d4edc9ae5e6193a010c9b2b4edc /src/libstd/sys/windows
parent0c1e1ff1e300868a29405a334e65eae690df971d (diff)
downloadrust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.tar.gz
rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.zip
Runtime removal: refactor pipes and networking
This patch continues the runtime removal by moving pipe and
networking-related code into `sys`.

Because this eliminates APIs in `libnative` and `librustrt`, it is a:

[breaking-change]

This functionality is likely to be available publicly, in some form,
from `std` in the future.
Diffstat (limited to 'src/libstd/sys/windows')
-rw-r--r--src/libstd/sys/windows/mod.rs12
-rw-r--r--src/libstd/sys/windows/pipe.rs751
-rw-r--r--src/libstd/sys/windows/tcp.rs219
-rw-r--r--src/libstd/sys/windows/udp.rs11
4 files changed, 991 insertions, 2 deletions
diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs
index 5f4129c1484..85fbc6b936c 100644
--- a/src/libstd/sys/windows/mod.rs
+++ b/src/libstd/sys/windows/mod.rs
@@ -33,12 +33,21 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
     };
 ) )
 
+pub mod c;
 pub mod fs;
 pub mod os;
-pub mod c;
+pub mod tcp;
+pub mod udp;
+pub mod pipe;
+
+pub mod addrinfo {
+    pub use sys_common::net::get_host_addresses;
+}
 
+// FIXME: move these to c module
 pub type sock_t = libc::SOCKET;
 pub type wrlen = libc::c_int;
+pub type msglen_t = libc::c_int;
 pub unsafe fn close_sock(sock: sock_t) { let _ = libc::closesocket(sock); }
 
 // windows has zero values as errors
@@ -140,7 +149,6 @@ pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
     }
 }
 
-// FIXME: call this
 pub fn init_net() {
     unsafe {
         static START: Once = ONCE_INIT;
diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs
new file mode 100644
index 00000000000..f2f7994a005
--- /dev/null
+++ b/src/libstd/sys/windows/pipe.rs
@@ -0,0 +1,751 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Named pipes implementation for windows
+//!
+//! If are unfortunate enough to be reading this code, I would like to first
+//! apologize. This was my first encounter with windows named pipes, and it
+//! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
+//! read on as I'll try to explain some fun things that I ran into.
+//!
+//! # Unix pipes vs Named pipes
+//!
+//! As with everything else, named pipes on windows are pretty different from
+//! unix pipes on unix. On unix, you use one "server pipe" to accept new client
+//! pipes. So long as this server pipe is active, new children pipes can
+//! connect. On windows, you instead have a number of "server pipes", and each
+//! of these server pipes can throughout their lifetime be attached to a client
+//! or not. Once attached to a client, a server pipe may then disconnect at a
+//! later date.
+//!
+//! # Accepting clients
+//!
+//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
+//! are built around the unix flavors. This means that we have one "server
+//! pipe" to which many clients can connect. In order to make this compatible
+//! with the windows model, each connected client consumes ownership of a server
+//! pipe, and then a new server pipe is created for the next client.
+//!
+//! Note that the server pipes attached to clients are never given back to the
+//! listener for recycling. This could possibly be implemented with a channel so
+//! the listener half can re-use server pipes, but for now I err'd on the simple
+//! side of things. Each stream accepted by a listener will destroy the server
+//! pipe after the stream is dropped.
+//!
+//! This model ends up having a small race or two, and you can find more details
+//! on the `native_accept` method.
+//!
+//! # Simultaneous reads and writes
+//!
+//! In testing, I found that two simultaneous writes and two simultaneous reads
+//! on a pipe ended up working out just fine, but problems were encountered when
+//! a read was executed simultaneously with a write. After some googling around,
+//! it sounded like named pipes just weren't built for this kind of interaction,
+//! and the suggested solution was to use overlapped I/O.
+//!
+//! I don't really know what overlapped I/O is, but my basic understanding after
+//! reading about it is that you have an external Event which is used to signal
+//! I/O completion, passed around in some OVERLAPPED structures. As to what this
+//! is, I'm not exactly sure.
+//!
+//! This problem implies that all named pipes are created with the
+//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
+//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
+//! inside of this structure is a HANDLE from CreateEvent. After the I/O is
+//! determined to be pending (may complete in the future), the
+//! GetOverlappedResult function is used to block on the event, waiting for the
+//! I/O to finish.
+//!
+//! This scheme ended up working well enough. There were two snags that I ran
+//! into, however:
+//!
+//! * Each UnixStream instance needs its own read/write events to wait on. These
+//!   can't be shared among clones of the same stream because the documentation
+//!   states that it unsets the event when the I/O is started (would possibly
+//!   corrupt other events simultaneously waiting). For convenience's sake,
+//!   these events are lazily initialized.
+//!
+//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
+//!   to all pipes created through `connect`. Notably this means that the
+//!   ConnectNamedPipe function is nonblocking, implying that the Listener needs
+//!   to have yet another event to do the actual blocking.
+//!
+//! # Conclusion
+//!
+//! The conclusion here is that I probably don't know the best way to work with
+//! windows named pipes, but the solution here seems to work well enough to get
+//! the test suite passing (the suite is in libstd), and that's good enough for
+//! me!
+
+use alloc::arc::Arc;
+use libc;
+use c_str::CString;
+use mem;
+use ptr;
+use sync::atomic;
+use rt::mutex;
+use io::{mod, IoError, IoResult};
+use prelude::*;
+
+use sys_common::{mod, eof};
+
+use super::{c, os, timer, to_utf16, decode_error_detailed};
+
+struct Event(libc::HANDLE);
+
+impl Event {
+    fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
+        let event = unsafe {
+            libc::CreateEventW(ptr::null_mut(),
+                               manual_reset as libc::BOOL,
+                               initial_state as libc::BOOL,
+                               ptr::null())
+        };
+        if event as uint == 0 {
+            Err(super::last_error())
+        } else {
+            Ok(Event(event))
+        }
+    }
+
+    fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
+}
+
+impl Drop for Event {
+    fn drop(&mut self) {
+        unsafe { let _ = libc::CloseHandle(self.handle()); }
+    }
+}
+
+struct Inner {
+    handle: libc::HANDLE,
+    lock: mutex::NativeMutex,
+    read_closed: atomic::AtomicBool,
+    write_closed: atomic::AtomicBool,
+}
+
+impl Inner {
+    fn new(handle: libc::HANDLE) -> Inner {
+        Inner {
+            handle: handle,
+            lock: unsafe { mutex::NativeMutex::new() },
+            read_closed: atomic::AtomicBool::new(false),
+            write_closed: atomic::AtomicBool::new(false),
+        }
+    }
+}
+
+impl Drop for Inner {
+    fn drop(&mut self) {
+        unsafe {
+            let _ = libc::FlushFileBuffers(self.handle);
+            let _ = libc::CloseHandle(self.handle);
+        }
+    }
+}
+
+unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
+    libc::CreateNamedPipeW(
+        name,
+        libc::PIPE_ACCESS_DUPLEX |
+            if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
+            libc::FILE_FLAG_OVERLAPPED,
+        libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
+            libc::PIPE_WAIT,
+        libc::PIPE_UNLIMITED_INSTANCES,
+        65536,
+        65536,
+        0,
+        ptr::null_mut()
+    )
+}
+
+pub fn await(handle: libc::HANDLE, deadline: u64,
+             events: &[libc::HANDLE]) -> IoResult<uint> {
+    use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
+
+    // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
+    // to figure out if we should indeed get the result.
+    let ms = if deadline == 0 {
+        libc::INFINITE as u64
+    } else {
+        let now = timer::now();
+        if deadline < now {0} else {deadline - now}
+    };
+    let ret = unsafe {
+        c::WaitForMultipleObjects(events.len() as libc::DWORD,
+                                  events.as_ptr(),
+                                  libc::FALSE,
+                                  ms as libc::DWORD)
+    };
+    match ret {
+        WAIT_FAILED => Err(super::last_error()),
+        WAIT_TIMEOUT => unsafe {
+            let _ = c::CancelIo(handle);
+            Err(sys_common::timeout("operation timed out"))
+        },
+        n => Ok((n - WAIT_OBJECT_0) as uint)
+    }
+}
+
+fn epipe() -> IoError {
+    IoError {
+        kind: io::EndOfFile,
+        desc: "the pipe has ended",
+        detail: None,
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Unix Streams
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UnixStream {
+    inner: Arc<Inner>,
+    write: Option<Event>,
+    read: Option<Event>,
+    read_deadline: u64,
+    write_deadline: u64,
+}
+
+impl UnixStream {
+    fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
+        // Note that most of this is lifted from the libuv implementation.
+        // The idea is that if we fail to open a pipe in read/write mode
+        // that we try afterwards in just read or just write
+        let mut result = unsafe {
+            libc::CreateFileW(p,
+                libc::GENERIC_READ | libc::GENERIC_WRITE,
+                0,
+                ptr::null_mut(),
+                libc::OPEN_EXISTING,
+                libc::FILE_FLAG_OVERLAPPED,
+                ptr::null_mut())
+        };
+        if result != libc::INVALID_HANDLE_VALUE {
+            return Some(result)
+        }
+
+        let err = unsafe { libc::GetLastError() };
+        if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
+            result = unsafe {
+                libc::CreateFileW(p,
+                    libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
+                    0,
+                    ptr::null_mut(),
+                    libc::OPEN_EXISTING,
+                    libc::FILE_FLAG_OVERLAPPED,
+                    ptr::null_mut())
+            };
+            if result != libc::INVALID_HANDLE_VALUE {
+                return Some(result)
+            }
+        }
+        let err = unsafe { libc::GetLastError() };
+        if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
+            result = unsafe {
+                libc::CreateFileW(p,
+                    libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
+                    0,
+                    ptr::null_mut(),
+                    libc::OPEN_EXISTING,
+                    libc::FILE_FLAG_OVERLAPPED,
+                    ptr::null_mut())
+            };
+            if result != libc::INVALID_HANDLE_VALUE {
+                return Some(result)
+            }
+        }
+        None
+    }
+
+    pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
+        let addr = try!(to_utf16(addr.as_str()));
+        let start = timer::now();
+        loop {
+            match UnixStream::try_connect(addr.as_ptr()) {
+                Some(handle) => {
+                    let inner = Inner::new(handle);
+                    let mut mode = libc::PIPE_TYPE_BYTE |
+                                   libc::PIPE_READMODE_BYTE |
+                                   libc::PIPE_WAIT;
+                    let ret = unsafe {
+                        libc::SetNamedPipeHandleState(inner.handle,
+                                                      &mut mode,
+                                                      ptr::null_mut(),
+                                                      ptr::null_mut())
+                    };
+                    return if ret == 0 {
+                        Err(super::last_error())
+                    } else {
+                        Ok(UnixStream {
+                            inner: Arc::new(inner),
+                            read: None,
+                            write: None,
+                            read_deadline: 0,
+                            write_deadline: 0,
+                        })
+                    }
+                }
+                None => {}
+            }
+
+            // On windows, if you fail to connect, you may need to call the
+            // `WaitNamedPipe` function, and this is indicated with an error
+            // code of ERROR_PIPE_BUSY.
+            let code = unsafe { libc::GetLastError() };
+            if code as int != libc::ERROR_PIPE_BUSY as int {
+                return Err(super::last_error())
+            }
+
+            match timeout {
+                Some(timeout) => {
+                    let now = timer::now();
+                    let timed_out = (now - start) >= timeout || unsafe {
+                        let ms = (timeout - (now - start)) as libc::DWORD;
+                        libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
+                    };
+                    if timed_out {
+                        return Err(sys_common::timeout("connect timed out"))
+                    }
+                }
+
+                // An example I found on Microsoft's website used 20
+                // seconds, libuv uses 30 seconds, hence we make the
+                // obvious choice of waiting for 25 seconds.
+                None => {
+                    if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 {
+                        return Err(super::last_error())
+                    }
+                }
+            }
+        }
+    }
+
+    fn handle(&self) -> libc::HANDLE { self.inner.handle }
+
+    fn read_closed(&self) -> bool {
+        self.inner.read_closed.load(atomic::SeqCst)
+    }
+
+    fn write_closed(&self) -> bool {
+        self.inner.write_closed.load(atomic::SeqCst)
+    }
+
+    fn cancel_io(&self) -> IoResult<()> {
+        match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } {
+            0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
+                Ok(())
+            }
+            0 => Err(super::last_error()),
+            _ => Ok(())
+        }
+    }
+
+    pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
+        if self.read.is_none() {
+            self.read = Some(try!(Event::new(true, false)));
+        }
+
+        let mut bytes_read = 0;
+        let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
+        overlapped.hEvent = self.read.as_ref().unwrap().handle();
+
+        // Pre-flight check to see if the reading half has been closed. This
+        // must be done before issuing the ReadFile request, but after we
+        // acquire the lock.
+        //
+        // See comments in close_read() about why this lock is necessary.
+        let guard = unsafe { self.inner.lock.lock() };
+        if self.read_closed() {
+            return Err(eof())
+        }
+
+        // Issue a nonblocking requests, succeeding quickly if it happened to
+        // succeed.
+        let ret = unsafe {
+            libc::ReadFile(self.handle(),
+                           buf.as_ptr() as libc::LPVOID,
+                           buf.len() as libc::DWORD,
+                           &mut bytes_read,
+                           &mut overlapped)
+        };
+        if ret != 0 { return Ok(bytes_read as uint) }
+
+        // If our errno doesn't say that the I/O is pending, then we hit some
+        // legitimate error and return immediately.
+        if os::errno() != libc::ERROR_IO_PENDING as uint {
+            return Err(super::last_error())
+        }
+
+        // Now that we've issued a successful nonblocking request, we need to
+        // wait for it to finish. This can all be done outside the lock because
+        // we'll see any invocation of CancelIoEx. We also call this in a loop
+        // because we're woken up if the writing half is closed, we just need to
+        // realize that the reading half wasn't closed and we go right back to
+        // sleep.
+        drop(guard);
+        loop {
+            // Process a timeout if one is pending
+            let wait_succeeded = await(self.handle(), self.read_deadline,
+                                       [overlapped.hEvent]);
+
+            let ret = unsafe {
+                libc::GetOverlappedResult(self.handle(),
+                                          &mut overlapped,
+                                          &mut bytes_read,
+                                          libc::TRUE)
+            };
+            // If we succeeded, or we failed for some reason other than
+            // CancelIoEx, return immediately
+            if ret != 0 { return Ok(bytes_read as uint) }
+            if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
+                return Err(super::last_error())
+            }
+
+            // If the reading half is now closed, then we're done. If we woke up
+            // because the writing half was closed, keep trying.
+            if wait_succeeded.is_err() {
+                return Err(sys_common::timeout("read timed out"))
+            }
+            if self.read_closed() {
+                return Err(eof())
+            }
+        }
+    }
+
+    pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
+        if self.write.is_none() {
+            self.write = Some(try!(Event::new(true, false)));
+        }
+
+        let mut offset = 0;
+        let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
+        overlapped.hEvent = self.write.as_ref().unwrap().handle();
+
+        while offset < buf.len() {
+            let mut bytes_written = 0;
+
+            // This sequence below is quite similar to the one found in read().
+            // Some careful looping is done to ensure that if close_write() is
+            // invoked we bail out early, and if close_read() is invoked we keep
+            // going after we woke up.
+            //
+            // See comments in close_read() about why this lock is necessary.
+            let guard = unsafe { self.inner.lock.lock() };
+            if self.write_closed() {
+                return Err(epipe())
+            }
+            let ret = unsafe {
+                libc::WriteFile(self.handle(),
+                                buf[offset..].as_ptr() as libc::LPVOID,
+                                (buf.len() - offset) as libc::DWORD,
+                                &mut bytes_written,
+                                &mut overlapped)
+            };
+            let err = os::errno();
+            drop(guard);
+
+            if ret == 0 {
+                if err != libc::ERROR_IO_PENDING as uint {
+                    return Err(decode_error_detailed(err as i32))
+                }
+                // Process a timeout if one is pending
+                let wait_succeeded = await(self.handle(), self.write_deadline,
+                                           [overlapped.hEvent]);
+                let ret = unsafe {
+                    libc::GetOverlappedResult(self.handle(),
+                                              &mut overlapped,
+                                              &mut bytes_written,
+                                              libc::TRUE)
+                };
+                // If we weren't aborted, this was a legit error, if we were
+                // aborted, then check to see if the write half was actually
+                // closed or whether we woke up from the read half closing.
+                if ret == 0 {
+                    if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
+                        return Err(super::last_error())
+                    }
+                    if !wait_succeeded.is_ok() {
+                        let amt = offset + bytes_written as uint;
+                        return if amt > 0 {
+                            Err(IoError {
+                                kind: io::ShortWrite(amt),
+                                desc: "short write during write",
+                                detail: None,
+                            })
+                        } else {
+                            Err(sys_common::timeout("write timed out"))
+                        }
+                    }
+                    if self.write_closed() {
+                        return Err(epipe())
+                    }
+                    continue // retry
+                }
+            }
+            offset += bytes_written as uint;
+        }
+        Ok(())
+    }
+
+    pub fn close_read(&mut self) -> IoResult<()> {
+        // On windows, there's no actual shutdown() method for pipes, so we're
+        // forced to emulate the behavior manually at the application level. To
+        // do this, we need to both cancel any pending requests, as well as
+        // prevent all future requests from succeeding. These two operations are
+        // not atomic with respect to one another, so we must use a lock to do
+        // so.
+        //
+        // The read() code looks like:
+        //
+        //      1. Make sure the pipe is still open
+        //      2. Submit a read request
+        //      3. Wait for the read request to finish
+        //
+        // The race this lock is preventing is if another thread invokes
+        // close_read() between steps 1 and 2. By atomically executing steps 1
+        // and 2 with a lock with respect to close_read(), we're guaranteed that
+        // no thread will erroneously sit in a read forever.
+        let _guard = unsafe { self.inner.lock.lock() };
+        self.inner.read_closed.store(true, atomic::SeqCst);
+        self.cancel_io()
+    }
+
+    pub fn close_write(&mut self) -> IoResult<()> {
+        // see comments in close_read() for why this lock is necessary
+        let _guard = unsafe { self.inner.lock.lock() };
+        self.inner.write_closed.store(true, atomic::SeqCst);
+        self.cancel_io()
+    }
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+        self.read_deadline = deadline;
+        self.write_deadline = deadline;
+    }
+    pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
+        self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+    pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
+        self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+}
+
+impl Clone for UnixStream {
+    fn clone(&self) -> UnixStream {
+        UnixStream {
+            inner: self.inner.clone(),
+            read: None,
+            write: None,
+            read_deadline: 0,
+            write_deadline: 0,
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Unix Listener
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UnixListener {
+    handle: libc::HANDLE,
+    name: CString,
+}
+
+impl UnixListener {
+    pub fn bind(addr: &CString) -> IoResult<UnixListener> {
+        // Although we technically don't need the pipe until much later, we
+        // create the initial handle up front to test the validity of the name
+        // and such.
+        let addr_v = try!(to_utf16(addr.as_str()));
+        let ret = unsafe { pipe(addr_v.as_ptr(), true) };
+        if ret == libc::INVALID_HANDLE_VALUE {
+            Err(super::last_error())
+        } else {
+            Ok(UnixListener { handle: ret, name: addr.clone() })
+        }
+    }
+
+    pub fn listen(self) -> IoResult<UnixAcceptor> {
+        Ok(UnixAcceptor {
+            listener: self,
+            event: try!(Event::new(true, false)),
+            deadline: 0,
+            inner: Arc::new(AcceptorState {
+                abort: try!(Event::new(true, false)),
+                closed: atomic::AtomicBool::new(false),
+            }),
+        })
+    }
+}
+
+impl Drop for UnixListener {
+    fn drop(&mut self) {
+        unsafe { let _ = libc::CloseHandle(self.handle); }
+    }
+}
+
+pub struct UnixAcceptor {
+    inner: Arc<AcceptorState>,
+    listener: UnixListener,
+    event: Event,
+    deadline: u64,
+}
+
+struct AcceptorState {
+    abort: Event,
+    closed: atomic::AtomicBool,
+}
+
+impl UnixAcceptor {
+    pub fn accept(&mut self) -> IoResult<UnixStream> {
+        // This function has some funky implementation details when working with
+        // unix pipes. On windows, each server named pipe handle can be
+        // connected to a one or zero clients. To the best of my knowledge, a
+        // named server is considered active and present if there exists at
+        // least one server named pipe for it.
+        //
+        // The model of this function is to take the current known server
+        // handle, connect a client to it, and then transfer ownership to the
+        // UnixStream instance. The next time accept() is invoked, it'll need a
+        // different server handle to connect a client to.
+        //
+        // Note that there is a possible race here. Once our server pipe is
+        // handed off to a `UnixStream` object, the stream could be closed,
+        // meaning that there would be no active server pipes, hence even though
+        // we have a valid `UnixAcceptor`, no one can connect to it. For this
+        // reason, we generate the next accept call's server pipe at the end of
+        // this function call.
+        //
+        // This provides us an invariant that we always have at least one server
+        // connection open at a time, meaning that all connects to this acceptor
+        // should succeed while this is active.
+        //
+        // The actual implementation of doing this is a little tricky. Once a
+        // server pipe is created, a client can connect to it at any time. I
+        // assume that which server a client connects to is nondeterministic, so
+        // we also need to guarantee that the only server able to be connected
+        // to is the one that we're calling ConnectNamedPipe on. This means that
+        // we have to create the second server pipe *after* we've already
+        // accepted a connection. In order to at least somewhat gracefully
+        // handle errors, this means that if the second server pipe creation
+        // fails that we disconnect the connected client and then just keep
+        // using the original server pipe.
+        let handle = self.listener.handle;
+
+        // If we've had an artificial call to close_accept, be sure to never
+        // proceed in accepting new clients in the future
+        if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) }
+
+        let name = try!(to_utf16(self.listener.name.as_str()));
+
+        // Once we've got a "server handle", we need to wait for a client to
+        // connect. The ConnectNamedPipe function will block this thread until
+        // someone on the other end connects. This function can "fail" if a
+        // client connects after we created the pipe but before we got down
+        // here. Thanks windows.
+        let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
+        overlapped.hEvent = self.event.handle();
+        if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
+            let mut err = unsafe { libc::GetLastError() };
+
+            if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                // Process a timeout if one is pending
+                let wait_succeeded = await(handle, self.deadline,
+                                           [self.inner.abort.handle(),
+                                            overlapped.hEvent]);
+
+                // This will block until the overlapped I/O is completed. The
+                // timeout was previously handled, so this will either block in
+                // the normal case or succeed very quickly in the timeout case.
+                let ret = unsafe {
+                    let mut transfer = 0;
+                    libc::GetOverlappedResult(handle,
+                                              &mut overlapped,
+                                              &mut transfer,
+                                              libc::TRUE)
+                };
+                if ret == 0 {
+                    if wait_succeeded.is_ok() {
+                        err = unsafe { libc::GetLastError() };
+                    } else {
+                        return Err(sys_common::timeout("accept timed out"))
+                    }
+                } else {
+                    // we succeeded, bypass the check below
+                    err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
+                }
+            }
+            if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
+                return Err(super::last_error())
+            }
+        }
+
+        // Now that we've got a connected client to our handle, we need to
+        // create a second server pipe. If this fails, we disconnect the
+        // connected client and return an error (see comments above).
+        let new_handle = unsafe { pipe(name.as_ptr(), false) };
+        if new_handle == libc::INVALID_HANDLE_VALUE {
+            let ret = Err(super::last_error());
+            // If our disconnection fails, then there's not really a whole lot
+            // that we can do, so panic
+            let err = unsafe { libc::DisconnectNamedPipe(handle) };
+            assert!(err != 0);
+            return ret;
+        } else {
+            self.listener.handle = new_handle;
+        }
+
+        // Transfer ownership of our handle into this stream
+        Ok(UnixStream {
+            inner: Arc::new(Inner::new(handle)),
+            read: None,
+            write: None,
+            read_deadline: 0,
+            write_deadline: 0,
+        })
+    }
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0);
+    }
+
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let ret = unsafe {
+            c::SetEvent(self.inner.abort.handle())
+        };
+        if ret == 0 {
+            Err(super::last_error())
+        } else {
+            Ok(())
+        }
+    }
+}
+
+impl Clone for UnixAcceptor {
+    fn clone(&self) -> UnixAcceptor {
+        let name = to_utf16(self.listener.name.as_str()).ok().unwrap();
+        UnixAcceptor {
+            inner: self.inner.clone(),
+            event: Event::new(true, false).ok().unwrap(),
+            deadline: 0,
+            listener: UnixListener {
+                name: self.listener.name.clone(),
+                handle: unsafe {
+                    let p = pipe(name.as_ptr(), false) ;
+                    assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
+                    p
+                },
+            },
+        }
+    }
+}
diff --git a/src/libstd/sys/windows/tcp.rs b/src/libstd/sys/windows/tcp.rs
new file mode 100644
index 00000000000..3baf2be08d2
--- /dev/null
+++ b/src/libstd/sys/windows/tcp.rs
@@ -0,0 +1,219 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use io::net::ip;
+use io::IoResult;
+use libc;
+use mem;
+use ptr;
+use prelude::*;
+use super::{last_error, last_net_error, retry, sock_t};
+use sync::{Arc, atomic};
+use sys::fs::FileDesc;
+use sys::{mod, c, set_nonblocking, wouldblock, timer};
+use sys_common::{mod, timeout, eof};
+use sys_common::net::*;
+
+pub use sys_common::net::TcpStream;
+
+pub struct Event(c::WSAEVENT);
+
+impl Event {
+    pub fn new() -> IoResult<Event> {
+        let event = unsafe { c::WSACreateEvent() };
+        if event == c::WSA_INVALID_EVENT {
+            Err(super::last_error())
+        } else {
+            Ok(Event(event))
+        }
+    }
+
+    pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
+}
+
+impl Drop for Event {
+    fn drop(&mut self) {
+        unsafe { let _ = c::WSACloseEvent(self.handle()); }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// TCP listeners
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpListener {
+    inner: FileDesc,
+}
+
+impl TcpListener {
+    pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
+        sys::init_net();
+
+        let fd = try!(socket(addr, libc::SOCK_STREAM));
+        let ret = TcpListener { inner: FileDesc::new(fd as libc::c_int, true) };
+
+        let mut storage = unsafe { mem::zeroed() };
+        let len = addr_to_sockaddr(addr, &mut storage);
+        let addrp = &storage as *const _ as *const libc::sockaddr;
+
+        match unsafe { libc::bind(fd, addrp, len) } {
+            -1 => Err(last_net_error()),
+            _ => Ok(ret),
+        }
+    }
+
+    pub fn fd(&self) -> sock_t { self.inner.fd as sock_t }
+
+    pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> {
+        match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
+            -1 => Err(last_net_error()),
+
+            _ => {
+                let accept = try!(Event::new());
+                let ret = unsafe {
+                    c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
+                };
+                if ret != 0 {
+                    return Err(last_net_error())
+                }
+                Ok(TcpAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        abort: try!(Event::new()),
+                        accept: accept,
+                        closed: atomic::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
+        }
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+}
+
+pub struct TcpAcceptor {
+    inner: Arc<AcceptorInner>,
+    deadline: u64,
+}
+
+struct AcceptorInner {
+    listener: TcpListener,
+    abort: Event,
+    accept: Event,
+    closed: atomic::AtomicBool,
+}
+
+impl TcpAcceptor {
+    pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
+
+    pub fn accept(&mut self) -> IoResult<TcpStream> {
+        // Unlink unix, windows cannot invoke `select` on arbitrary file
+        // descriptors like pipes, only sockets. Consequently, windows cannot
+        // use the same implementation as unix for accept() when close_accept()
+        // is considered.
+        //
+        // In order to implement close_accept() and timeouts, windows uses
+        // event handles. An acceptor-specific abort event is created which
+        // will only get set in close_accept(), and it will never be un-set.
+        // Additionally, another acceptor-specific event is associated with the
+        // FD_ACCEPT network event.
+        //
+        // These two events are then passed to WaitForMultipleEvents to see
+        // which one triggers first, and the timeout passed to this function is
+        // the local timeout for the acceptor.
+        //
+        // If the wait times out, then the accept timed out. If the wait
+        // succeeds with the abort event, then we were closed, and if the wait
+        // succeeds otherwise, then we do a nonblocking poll via `accept` to
+        // see if we can accept a connection. The connection is candidate to be
+        // stolen, so we do all of this in a loop as well.
+        let events = [self.inner.abort.handle(), self.inner.accept.handle()];
+
+        while !self.inner.closed.load(atomic::SeqCst) {
+            let ms = if self.deadline == 0 {
+                c::WSA_INFINITE as u64
+            } else {
+                let now = timer::now();
+                if self.deadline < now {0} else {self.deadline - now}
+            };
+            let ret = unsafe {
+                c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
+                                            ms as libc::DWORD, libc::FALSE)
+            };
+            match ret {
+                c::WSA_WAIT_TIMEOUT => {
+                    return Err(timeout("accept timed out"))
+                }
+                c::WSA_WAIT_FAILED => return Err(last_net_error()),
+                c::WSA_WAIT_EVENT_0 => break,
+                n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
+            }
+
+            let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
+            let ret = unsafe {
+                c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
+            };
+            if ret != 0 { return Err(last_net_error()) }
+
+            if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
+            match unsafe {
+                libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
+            } {
+                -1 if wouldblock() => {}
+                -1 => return Err(last_net_error()),
+
+                // Accepted sockets inherit the same properties as the caller,
+                // so we need to deregister our event and switch the socket back
+                // to blocking mode
+                fd => {
+                    let stream = TcpStream::new(fd);
+                    let ret = unsafe {
+                        c::WSAEventSelect(fd, events[1], 0)
+                    };
+                    if ret != 0 { return Err(last_net_error()) }
+                    try!(set_nonblocking(fd, false));
+                    return Ok(stream)
+                }
+            }
+        }
+
+        Err(eof())
+    }
+
+    pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+
+    pub fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
+    }
+
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomic::SeqCst);
+        let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
+        if ret == libc::TRUE {
+            Ok(())
+        } else {
+            Err(last_net_error())
+        }
+    }
+}
+
+impl Clone for TcpAcceptor {
+    fn clone(&self) -> TcpAcceptor {
+        TcpAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        }
+    }
+}
diff --git a/src/libstd/sys/windows/udp.rs b/src/libstd/sys/windows/udp.rs
new file mode 100644
index 00000000000..50f8fb828ad
--- /dev/null
+++ b/src/libstd/sys/windows/udp.rs
@@ -0,0 +1,11 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+pub use sys_common::net::UdpSocket;