diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-02-07 10:37:58 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-02-16 18:46:01 -0800 |
| commit | a526aa139ec1c95cbad4c1d3187c437eb45d4bae (patch) | |
| tree | 6a0092a8ac88e5842f00b74df7314faa9074de04 | |
| parent | 94b2d9dc4dd864b481bcf279921bc7ea796355e5 (diff) | |
| download | rust-a526aa139ec1c95cbad4c1d3187c437eb45d4bae.tar.gz rust-a526aa139ec1c95cbad4c1d3187c437eb45d4bae.zip | |
Implement named pipes for windows, touch up unix
* Implementation of pipe_win32 filled out for libnative * Reorganize pipes to be clone-able * Fix a few file descriptor leaks on error * Factor out some common code into shared functions * Make use of the if_ok!() macro for less indentation Closes #11201
| -rw-r--r-- | src/libnative/io/mod.rs | 9 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 53 | ||||
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 349 | ||||
| -rw-r--r-- | src/libnative/io/pipe_win32.rs | 492 | ||||
| -rw-r--r-- | src/libstd/io/net/mod.rs | 2 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 101 | ||||
| -rw-r--r-- | src/libstd/libc.rs | 57 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 5 |
8 files changed, 780 insertions, 288 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index ad0d7270c1a..0f9439b3eb5 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -61,11 +61,11 @@ pub mod timer; pub mod timer; #[cfg(unix)] -#[path = "path_unix.rs"] +#[path = "pipe_unix.rs"] pub mod pipe; #[cfg(windows)] -#[path = "path_win32.rs"] +#[path = "pipe_win32.rs"] pub mod pipe; mod timer_helper; @@ -85,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError { fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) { match errno { libc::EOF => (io::EndOfFile, "end of file"), + libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"), + libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"), + libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"), libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"), libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"), libc::WSAEACCES => (io::PermissionDenied, "permission denied"), @@ -94,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"), x => { debug!("ignoring {}: {}", x, os::last_os_error()); @@ -116,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::EADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ENOENT => (io::FileNotFound, "no such file or directory"), // These two constants can have the same value on some systems, but // different values on others, so we can't use a match clause diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index dce890dc129..b33b54862dc 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,7 +8,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::c_str::CString; use std::cast; use std::io::net::ip; use std::io; @@ -16,7 +15,6 @@ use std::libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::unstable::intrinsics; use super::{IoResult, retry}; use super::file::keep_going; @@ -90,30 +88,6 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { } } -fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { - // the sun_path length is limited to SUN_LEN (with null) - if addr.len() > libc::sun_len -1 { - return Err(io::IoError { - kind: io::OtherIoError, - desc: "path must be smaller than SUN_LEN", - detail: None, - }) - } - unsafe { - let storage: libc::sockaddr_storage = intrinsics::init(); - let s: *mut libc::sockaddr_un = cast::transmute(&storage); - (*s).sun_family = libc::AF_UNIX as libc::sa_family_t; - let mut i = 0; - for c in addr.iter() { - (*s).sun_path[i] = c; - i += 1; - } - - let len = mem::size_of::<libc::sa_family_t>() + i + 1; //count the null terminator - return Ok((storage, len)); - } -} - fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { unsafe { let fam = match addr.ip { @@ -127,15 +101,6 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { } } -fn unix_socket(ty: libc::c_int) -> IoResult<sock_t> { - unsafe { - match libc::socket(libc::AF_UNIX, ty, 0) { - -1 => Err(super::last_error()), - fd => Ok(fd) - } - } -} - fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { @@ -228,24 +193,6 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult<CString> { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::<libc::sockaddr_un>()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.to_owned().as_ptr(), false)) - } - } - _ => { - Err(io::standard_error(io::OtherIoError)) - } - } -} - #[cfg(unix)] pub fn init() {} diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 1160bc196d8..a6d75d93d67 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,46 +8,124 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use std::c_str::CString; +use std::cast; +use std::io; +use std::libc; +use std::mem; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::{IoResult, retry}; +use super::file::{keep_going, fd_t}; + +fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { + match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { + -1 => Err(super::last_error()), + fd => Ok(fd) + } +} + +fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { + // the sun_path length is limited to SUN_LEN (with null) + assert!(mem::size_of::<libc::sockaddr_storage>() >= + mem::size_of::<libc::sockaddr_un>()); + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) }; + + let len = addr.len(); + if len > s.sun_path.len() - 1 { + return Err(io::IoError { + kind: io::InvalidInput, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + s.sun_family = libc::AF_UNIX as libc::sa_family_t; + for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) { + *slot = value; + } + + // count the null terminator + let len = mem::size_of::<libc::sa_family_t>() + len + 1; + return Ok((storage, len)); +} + +fn sockaddr_to_unix(storage: &libc::sockaddr_storage, + len: uint) -> IoResult<CString> { + match storage.ss_family as libc::c_int { + libc::AF_UNIX => { + assert!(len as uint <= mem::size_of::<libc::sockaddr_un>()); + let storage: &libc::sockaddr_un = unsafe { + cast::transmute(storage) + }; + unsafe { + Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) + } + } + _ => Err(io::standard_error(io::InvalidInput)) + } +} + +struct Inner { + fd: fd_t, +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } +} + +fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| unsafe { + libc::connect(inner.fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + +fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match unsafe { + libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t) + } { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + //////////////////////////////////////////////////////////////////////////////// -// Unix +// Unix Streams //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - priv fd: sock_t, + priv inner: UnsafeArc<Inner>, } impl UnixStream { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixStream> { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixStream{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } + pub fn connect(addr: &CString) -> IoResult<UnixStream> { + connect(addr, libc::SOCK_STREAM).map(|inner| { + UnixStream { inner: UnsafeArc::new(inner) } }) } - pub fn fd(&self) -> sock_t { self.fd } + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } } impl rtio::RtioPipe for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } + let ret = retry(|| unsafe { + libc::recv(self.fd(), + buf.as_ptr() as *mut libc::c_void, + buf.len() as libc::size_t, + 0) as libc::c_int }); if ret == 0 { Err(io::standard_error(io::EndOfFile)) @@ -57,14 +135,13 @@ impl rtio::RtioPipe for UnixStream { Ok(ret as uint) } } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } + let ret = keep_going(buf, |buf, len| unsafe { + libc::send(self.fd(), + buf as *mut libc::c_void, + len as libc::size_t, + 0) as i64 }); if ret < 0 { Err(super::last_error()) @@ -72,10 +149,10 @@ impl rtio::RtioPipe for UnixStream { Ok(()) } } -} -impl Drop for UnixStream { - fn drop(&mut self) { unsafe { close(self.fd); } } + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe + } } //////////////////////////////////////////////////////////////////////////////// @@ -83,176 +160,89 @@ impl Drop for UnixStream { //////////////////////////////////////////////////////////////////////////////// pub struct UnixDatagram { - priv fd: sock_t, + priv inner: UnsafeArc<Inner>, } impl UnixDatagram { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult<UnixDatagram> { - unsafe { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + pub fn connect(addr: &CString) -> IoResult<UnixDatagram> { + connect(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) } pub fn bind(addr: &CString) -> IoResult<UnixDatagram> { - unsafe { - unix_socket(libc::SOCK_DGRAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + bind(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) } - pub fn fd(&self) -> sock_t { self.fd } -} + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } -impl rtio::RtioPipe for UnixDatagram { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, + pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; + let ret = retry(|| unsafe { + libc::recvfrom(self.fd(), buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(super::last_error()) - } else { - Ok(ret as uint) - } - } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } + buf.len() as libc::size_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) - } + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) } -} -impl rtio::RtioDatagramPipe for UnixDatagram { - fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { - unsafe { - let mut storage: libc::sockaddr_storage = intrinsics::init(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; - let ret = retry(|| { - libc::recvfrom(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as msglen_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(super::last_error()) } - sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) + pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + let (dst, len) = if_ok!(addr_to_sockaddr_un(dst)); + let dstp = &dst as *libc::sockaddr_storage; + let ret = retry(|| unsafe { + libc::sendto(self.fd(), + buf.as_ptr() as *libc::c_void, + buf.len() as libc::size_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) } } - fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { - match addr_to_sockaddr_un(dst) { - Err(e) => Err(e), - Ok((dst, len)) => { - let dstp = &dst as *libc::sockaddr_storage; - unsafe { - let ret = retry(|| { - libc::sendto(self.fd, - buf.as_ptr() as *libc::c_void, - buf.len() as msglen_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(super::last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } - } - } - } + pub fn clone(&mut self) -> UnixDatagram { + UnixDatagram { inner: self.inner.clone() } } } -impl Drop for UnixDatagram { - fn drop(&mut self) { unsafe { close(self.fd); } } -} //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// pub struct UnixListener { - priv fd: sock_t, + priv inner: Inner, } impl UnixListener { pub fn bind(addr: &CString) -> IoResult<UnixListener> { - unsafe { - unix_socket(libc::SOCK_STREAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixListener{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd }) } - pub fn fd(&self) -> sock_t { self.fd } + fn fd(&self) -> fd_t { self.inner.fd } pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), _ => Ok(UnixAcceptor { listener: self }) } @@ -265,16 +255,12 @@ impl rtio::RtioUnixListener for UnixListener { } } -impl Drop for UnixListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - pub struct UnixAcceptor { priv listener: UnixListener, } impl UnixAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } + fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult<UnixStream> { let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; @@ -285,9 +271,9 @@ impl UnixAcceptor { libc::accept(self.fd(), storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int - }) as sock_t { + }) { -1 => Err(super::last_error()), - fd => Ok(UnixStream { fd: fd }) + fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) }) } } } @@ -297,4 +283,3 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { self.native_accept().map(|s| ~s as ~rtio::RtioPipe) } } - diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs new file mode 100644 index 00000000000..83731cc02a6 --- /dev/null +++ b/src/libnative/io/pipe_win32.rs @@ -0,0 +1,492 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Named pipes implementation for windows +//! +//! If are unfortunate enough to be reading this code, I would like to first +//! apologize. This was my first encounter with windows named pipes, and it +//! didn't exactly turn out very cleanly. If you, too, are new to named pipes, +//! read on as I'll try to explain some fun things that I ran into. +//! +//! # Unix pipes vs Named pipes +//! +//! As with everything else, named pipes on windows are pretty different from +//! unix pipes on unix. On unix, you use one "server pipe" to accept new client +//! pipes. So long as this server pipe is active, new children pipes can +//! connect. On windows, you instead have a number of "server pipes", and each +//! of these server pipes can throughout their lifetime be attached to a client +//! or not. Once attached to a client, a server pipe may then disconnect at a +//! later date. +//! +//! # Accepting clients +//! +//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces +//! are built around the unix flavors. This means that we have one "server +//! pipe" to which many clients can connect. In order to make this compatible +//! with the windows model, each connected client consumes ownership of a server +//! pipe, and then a new server pipe is created for the next client. +//! +//! Note that the server pipes attached to clients are never given back to the +//! listener for recycling. This could possibly be implemented with a channel so +//! the listener half can re-use server pipes, but for now I err'd on the simple +//! side of things. Each stream accepted by a listener will destroy the server +//! pipe after the stream is dropped. +//! +//! This model ends up having a small race or two, and you can find more details +//! on the `native_accept` method. +//! +//! # Simultaneous reads and writes +//! +//! In testing, I found that two simultaneous writes and two simultaneous reads +//! on a pipe ended up working out just fine, but problems were encountered when +//! a read was executed simultaneously with a write. After some googling around, +//! it sounded like named pipes just weren't built for this kind of interaction, +//! and the suggested solution was to use overlapped I/O. +//! +//! I don't realy know what overlapped I/O is, but my basic understanding after +//! reading about it is that you have an external Event which is used to signal +//! I/O completion, passed around in some OVERLAPPED structures. As to what this +//! is, I'm not exactly sure. +//! +//! This problem implies that all named pipes are created with the +//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is +//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and +//! inside of this structure is a HANDLE from CreateEvent. After the I/O is +//! determined to be pending (may complete in the future), the +//! GetOverlappedResult function is used to block on the event, waiting for the +//! I/O to finish. +//! +//! This scheme ended up working well enough. There were two snags that I ran +//! into, however: +//! +//! * Each UnixStream instance needs its own read/write events to wait on. These +//! can't be shared among clones of the same stream because the documentation +//! states that it unsets the event when the I/O is started (would possibly +//! corrupt other events simultaneously waiting). For convenience's sake, +//! these events are lazily initialized. +//! +//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition +//! to all pipes created through `connect`. Notably this means that the +//! ConnectNamedPipe function is nonblocking, implying that the Listener needs +//! to have yet another event to do the actual blocking. +//! +//! # Conclusion +//! +//! The conclusion here is that I probably don't know the best way to work with +//! windows named pipes, but the solution here seems to work well enough to get +//! the test suite passing (the suite is in libstd), and that's good enough for +//! me! + +use std::c_str::CString; +use std::libc; +use std::os::win32::as_utf16_p; +use std::ptr; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::IoResult; + +struct Event(libc::HANDLE); + +impl Event { + fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> { + let event = unsafe { + libc::CreateEventW(ptr::mut_null(), + manual_reset as libc::BOOL, + initial_state as libc::BOOL, + ptr::null()) + }; + if event as uint == 0 { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle()); } + } +} + +struct Inner { + handle: libc::HANDLE, +} + +impl Drop for Inner { + fn drop(&mut self) { + unsafe { + let _ = libc::FlushFileBuffers(self.handle); + let _ = libc::CloseHandle(self.handle); + } + } +} + +unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE { + libc::CreateNamedPipeW( + name, + libc::PIPE_ACCESS_DUPLEX | + if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} | + libc::FILE_FLAG_OVERLAPPED, + libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT, + libc::PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + ptr::mut_null() + ) +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv inner: UnsafeArc<Inner>, + priv write: Option<Event>, + priv read: Option<Event>, +} + +impl UnixStream { + fn try_connect(p: *u16) -> Option<libc::HANDLE> { + // Note that most of this is lifted from the libuv implementation. + // The idea is that if we fail to open a pipe in read/write mode + // that we try afterwards in just read or just write + let mut result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::GENERIC_WRITE, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + None + } + + pub fn connect(addr: &CString) -> IoResult<UnixStream> { + as_utf16_p(addr.as_str().unwrap(), |p| { + loop { + match UnixStream::try_connect(p) { + Some(handle) => { + let inner = Inner { handle: handle }; + let mut mode = libc::PIPE_TYPE_BYTE | + libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT; + let ret = unsafe { + libc::SetNamedPipeHandleState(inner.handle, + &mut mode, + ptr::mut_null(), + ptr::mut_null()) + }; + return if ret == 0 { + Err(super::last_error()) + } else { + Ok(UnixStream { + inner: UnsafeArc::new(inner), + read: None, + write: None, + }) + } + } + None => {} + } + + // On windows, if you fail to connect, you may need to call the + // `WaitNamedPipe` function, and this is indicated with an error + // code of ERROR_PIPE_BUSY. + let code = unsafe { libc::GetLastError() }; + if code as int != libc::ERROR_PIPE_BUSY as int { + return Err(super::last_error()) + } + + // An example I found on microsoft's website used 20 seconds, + // libuv uses 30 seconds, hence we make the obvious choice of + // waiting for 25 seconds. + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } + }) + } + + fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + if self.read.is_none() { + self.read = Some(if_ok!(Event::new(true, false))); + } + + let mut bytes_read = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.read.get_ref().handle(); + + let ret = unsafe { + libc::ReadFile(self.handle(), + buf.as_ptr() as libc::LPVOID, + buf.len() as libc::DWORD, + &mut bytes_read, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + + Ok(bytes_read as uint) + } + + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + if self.write.is_none() { + self.write = Some(if_ok!(Event::new(true, false))); + } + + let mut offset = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.write.get_ref().handle(); + + while offset < buf.len() { + let mut bytes_written = 0; + let ret = unsafe { + libc::WriteFile(self.handle(), + buf.slice_from(offset).as_ptr() as libc::LPVOID, + (buf.len() - offset) as libc::DWORD, + &mut bytes_written, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + offset += bytes_written as uint; + } + Ok(()) + } + + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { + inner: self.inner.clone(), + read: None, + write: None, + } as ~rtio::RtioPipe + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv handle: libc::HANDLE, + priv name: CString, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult<UnixListener> { + // Although we technically don't need the pipe until much later, we + // create the initial handle up front to test the validity of the name + // and such. + as_utf16_p(addr.as_str().unwrap(), |p| { + let ret = unsafe { pipe(p, true) }; + if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + Err(super::last_error()) + } else { + Ok(UnixListener { handle: ret, name: addr.clone() }) + } + }) + } + + pub fn native_listen(self) -> IoResult<UnixAcceptor> { + Ok(UnixAcceptor { + listener: self, + event: if_ok!(Event::new(true, false)), + }) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle); } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen().map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, + priv event: Event, +} + +impl UnixAcceptor { + pub fn native_accept(&mut self) -> IoResult<UnixStream> { + // This function has some funky implementation details when working with + // unix pipes. On windows, each server named pipe handle can be + // connected to a one or zero clients. To the best of my knowledge, a + // named server is considered active and present if there exists at + // least one server named pipe for it. + // + // The model of this function is to take the current known server + // handle, connect a client to it, and then transfer ownership to the + // UnixStream instance. The next time accept() is invoked, it'll need a + // different server handle to connect a client to. + // + // Note that there is a possible race here. Once our server pipe is + // handed off to a `UnixStream` object, the stream could be closed, + // meaning that there would be no active server pipes, hence even though + // we have a valid `UnixAcceptor`, no one can connect to it. For this + // reason, we generate the next accept call's server pipe at the end of + // this function call. + // + // This provides us an invariant that we always have at least one server + // connection open at a time, meaning that all connects to this acceptor + // should succeed while this is active. + // + // The actual implementation of doing this is a little tricky. Once a + // server pipe is created, a client can connect to it at any time. I + // assume that which server a client connects to is nondeterministic, so + // we also need to guarantee that the only server able to be connected + // to is the one that we're calling ConnectNamedPipe on. This means that + // we have to create the second server pipe *after* we've already + // accepted a connection. In order to at least somewhat gracefully + // handle errors, this means that if the second server pipe creation + // fails that we disconnect the connected client and then just keep + // using the original server pipe. + let handle = self.listener.handle; + + // Once we've got a "server handle", we need to wait for a client to + // connect. The ConnectNamedPipe function will block this thread until + // someone on the other end connects. This function can "fail" if a + // client connects after we created the pipe but before we got down + // here. Thanks windows. + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.event.handle(); + if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { + let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + let mut transfer = 0; + libc::GetOverlappedResult(handle, + &mut overlapped, + &mut transfer, + libc::TRUE) + }; + if ret == 0 { + err = unsafe { libc::GetLastError() }; + } else { + // we succeeded, bypass the check below + err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; + } + } + if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD { + return Err(super::last_error()) + } + } + + // Now that we've got a connected client to our handle, we need to + // create a second server pipe. If this fails, we disconnect the + // connected client and return an error (see comments above). + let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| { + unsafe { pipe(p, false) } + }); + if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + let ret = Err(super::last_error()); + // If our disconnection fails, then there's not really a whole lot + // that we can do, so fail the task. + let err = unsafe { libc::DisconnectNamedPipe(handle) }; + assert!(err != 0); + return ret; + } else { + self.listener.handle = new_handle; + } + + // Transfer ownership of our handle into this stream + Ok(UnixStream { + inner: UnsafeArc::new(Inner { handle: handle }), + read: None, + write: None, + }) + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index cf109167089..436156a1219 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -14,5 +14,5 @@ pub mod addrinfo; pub mod tcp; pub mod udp; pub mod ip; -#[cfg(unix)] +// FIXME(#12093) - this should not be called unix pub mod unix; diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 23c01aa6354..a1f3cbbe326 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -134,7 +134,7 @@ mod tests { use io::*; use io::test::*; - fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { + pub fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { let path1 = next_test_unix(); let path2 = path1.clone(); let (port, chan) = Chan::new(); @@ -149,25 +149,32 @@ mod tests { server(acceptor.accept().unwrap()); } - #[test] - fn bind_error() { - match UnixListener::bind(&("path/to/nowhere")) { + iotest!(fn bind_error() { + let path = "path/to/nowhere"; + match UnixListener::bind(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, PermissionDenied), + Err(e) => { + assert!(e.kind == PermissionDenied || e.kind == FileNotFound || + e.kind == InvalidInput); + } } - } - - #[test] - fn connect_error() { - match UnixStream::connect(&("path/to/nowhere")) { + }) + + iotest!(fn connect_error() { + let path = if cfg!(windows) { + r"\\.\pipe\this_should_not_exist_ever" + } else { + "path/to/nowhere" + }; + match UnixStream::connect(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, - if cfg!(windows) {OtherIoError} else {FileNotFound}) + Err(e) => { + assert!(e.kind == FileNotFound || e.kind == OtherIoError); + } } - } + }) - #[test] - fn smoke() { + iotest!(fn smoke() { smalltest(proc(mut server) { let mut buf = [0]; server.read(buf).unwrap(); @@ -175,10 +182,9 @@ mod tests { }, proc(mut client) { client.write([99]).unwrap(); }) - } + }) - #[test] - fn read_eof() { + iotest!(fn read_eof() { smalltest(proc(mut server) { let mut buf = [0]; assert!(server.read(buf).is_err()); @@ -186,17 +192,18 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn write_begone() { + iotest!(fn write_begone() { smalltest(proc(mut server) { let buf = [0]; loop { match server.write(buf) { Ok(..) => {} Err(e) => { - assert!(e.kind == BrokenPipe || e.kind == NotConnected, + assert!(e.kind == BrokenPipe || + e.kind == NotConnected || + e.kind == ConnectionReset, "unknown error {:?}", e); break; } @@ -205,10 +212,9 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn accept_lots() { + iotest!(fn accept_lots() { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); @@ -218,38 +224,49 @@ mod tests { port.recv(); for _ in range(0, times) { let mut stream = UnixStream::connect(&path2); - stream.write([100]).unwrap(); + match stream.write([100]) { + Ok(..) => {} + Err(e) => fail!("failed write: {}", e) + } } }); - let mut acceptor = UnixListener::bind(&path1).listen(); + let mut acceptor = match UnixListener::bind(&path1).listen() { + Ok(a) => a, + Err(e) => fail!("failed listen: {}", e), + }; chan.send(()); for _ in range(0, times) { let mut client = acceptor.accept(); let mut buf = [0]; - client.read(buf).unwrap(); + match client.read(buf) { + Ok(..) => {} + Err(e) => fail!("failed read/accept: {}", e), + } assert_eq!(buf[0], 100); } - } + }) - #[test] - fn path_exists() { + #[cfg(unix)] + iotest!(fn path_exists() { let path = next_test_unix(); let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); - } + }) - #[test] - fn unix_clone_smoke() { + iotest!(fn unix_clone_smoke() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); spawn(proc() { let mut s = UnixStream::connect(&addr); let mut buf = [0, 0]; + debug!("client reading"); assert_eq!(s.read(buf), Ok(1)); assert_eq!(buf[0], 1); + debug!("client writing"); s.write([2]).unwrap(); + debug!("client dropping"); }); let mut s1 = acceptor.accept().unwrap(); @@ -260,17 +277,20 @@ mod tests { spawn(proc() { let mut s2 = s2; p1.recv(); + debug!("writer writing"); s2.write([1]).unwrap(); + debug!("writer done"); c2.send(()); }); c1.send(()); let mut buf = [0, 0]; + debug!("reader reading"); assert_eq!(s1.read(buf), Ok(1)); + debug!("reader done"); p2.recv(); - } + }) - #[test] - fn unix_clone_two_read() { + iotest!(fn unix_clone_two_read() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); let (p, c) = Chan::new(); @@ -300,10 +320,9 @@ mod tests { c.send(()); p.recv(); - } + }) - #[test] - fn unix_clone_two_write() { + iotest!(fn unix_clone_two_write() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); @@ -326,5 +345,5 @@ mod tests { s1.write([2]).unwrap(); p.recv(); - } + }) } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index c42a8896053..73bf4a1e69a 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -269,7 +269,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 108; pub type socklen_t = u32; pub type sa_family_t = u16; pub type in_port_t = u16; @@ -641,7 +640,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 104; pub type socklen_t = u32; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -844,7 +842,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; - pub static sun_len:uint = 108; pub type SOCKET = c_uint; pub type socklen_t = c_int; pub type sa_family_t = u16; @@ -1213,7 +1210,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 104; pub type socklen_t = c_int; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -1627,11 +1623,19 @@ pub mod consts { pub static O_NOINHERIT: c_int = 128; pub static ERROR_SUCCESS : c_int = 0; + pub static ERROR_FILE_NOT_FOUND: c_int = 2; + pub static ERROR_ACCESS_DENIED: c_int = 5; pub static ERROR_INVALID_HANDLE : c_int = 6; + pub static ERROR_BROKEN_PIPE: c_int = 109; pub static ERROR_DISK_FULL : c_int = 112; pub static ERROR_INSUFFICIENT_BUFFER : c_int = 122; + pub static ERROR_INVALID_NAME : c_int = 123; pub static ERROR_ALREADY_EXISTS : c_int = 183; + pub static ERROR_PIPE_BUSY: c_int = 231; + pub static ERROR_NO_DATA: c_int = 232; pub static ERROR_INVALID_ADDRESS : c_int = 487; + pub static ERROR_PIPE_CONNECTED: c_int = 535; + pub static ERROR_IO_PENDING: c_int = 997; pub static ERROR_FILE_INVALID : c_int = 1006; pub static INVALID_HANDLE_VALUE : c_int = -1; @@ -1770,6 +1774,7 @@ pub mod consts { pub static FILE_FLAG_SESSION_AWARE: DWORD = 0x00800000; pub static FILE_FLAG_SEQUENTIAL_SCAN: DWORD = 0x08000000; pub static FILE_FLAG_WRITE_THROUGH: DWORD = 0x80000000; + pub static FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; pub static FILE_NAME_NORMALIZED: DWORD = 0x0; pub static FILE_NAME_OPENED: DWORD = 0x8; @@ -1783,6 +1788,8 @@ pub mod consts { pub static GENERIC_WRITE: DWORD = 0x40000000; pub static GENERIC_EXECUTE: DWORD = 0x20000000; pub static GENERIC_ALL: DWORD = 0x10000000; + pub static FILE_WRITE_ATTRIBUTES: DWORD = 0x00000100; + pub static FILE_READ_ATTRIBUTES: DWORD = 0x00000080; pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; @@ -1794,6 +1801,19 @@ pub mod consts { pub static DETACHED_PROCESS: DWORD = 0x00000008; pub static CREATE_NEW_PROCESS_GROUP: DWORD = 0x00000200; + + pub static PIPE_ACCESS_DUPLEX: DWORD = 0x00000003; + pub static PIPE_ACCESS_INBOUND: DWORD = 0x00000001; + pub static PIPE_ACCESS_OUTBOUND: DWORD = 0x00000002; + pub static PIPE_TYPE_BYTE: DWORD = 0x00000000; + pub static PIPE_TYPE_MESSAGE: DWORD = 0x00000004; + pub static PIPE_READMODE_BYTE: DWORD = 0x00000000; + pub static PIPE_READMODE_MESSAGE: DWORD = 0x00000002; + pub static PIPE_WAIT: DWORD = 0x00000000; + pub static PIPE_NOWAIT: DWORD = 0x00000001; + pub static PIPE_ACCEPT_REMOTE_CLIENTS: DWORD = 0x00000000; + pub static PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; + pub static PIPE_UNLIMITED_INSTANCES: DWORD = 255; } pub mod sysconf { } @@ -2784,6 +2804,7 @@ pub mod consts { pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 28; + pub static AF_UNIX: c_int = 1; pub static SOCK_STREAM: c_int = 1; pub static SOCK_DGRAM: c_int = 2; pub static IPPROTO_TCP: c_int = 6; @@ -4177,6 +4198,34 @@ pub mod funcs { lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; pub fn GetCurrentProcessId() -> DWORD; + pub fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES + ) -> HANDLE; + pub fn ConnectNamedPipe(hNamedPipe: HANDLE, + lpOverlapped: LPOVERLAPPED) -> BOOL; + pub fn WaitNamedPipeW(lpNamedPipeName: LPCWSTR, + nTimeOut: DWORD) -> BOOL; + pub fn SetNamedPipeHandleState(hNamedPipe: HANDLE, + lpMode: LPDWORD, + lpMaxCollectionCount: LPDWORD, + lpCollectDataTimeout: LPDWORD) + -> BOOL; + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; + pub fn DisconnectNamedPipe(hNamedPipe: HANDLE) -> BOOL; } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 578ace2ba86..5573f8ec02e 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -260,11 +260,6 @@ pub trait RtioPipe { fn clone(&self) -> ~RtioPipe; } -pub trait RtioDatagramPipe : RtioPipe { - fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>; - fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>; -} - pub trait RtioUnixListener { fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; } |
