diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-01-22 19:32:16 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-02-05 11:43:49 -0800 |
| commit | 56080c476712e478ffe4ef8d6d727c0e3d21cfd0 (patch) | |
| tree | 24bdce82bbf3122bf4bd8c0a66e307b667c6184f /src/libnative | |
| parent | ef53b7a97c58f65ac6967dfc6d30a4354afa34a3 (diff) | |
| download | rust-56080c476712e478ffe4ef8d6d727c0e3d21cfd0.tar.gz rust-56080c476712e478ffe4ef8d6d727c0e3d21cfd0.zip | |
Implement clone() for TCP/UDP/Unix sockets
This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.
The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.
The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.
The cloning support is pretty specific per platform/lib combination:
* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
of a handle, this implies that we're allowing simultaneous writes
and reads to happen. It turns out that libuv doesn't support two
simultaneous reads or writes of the same object. It does support
*one* read and *one* write at the same time, however. Some extra
infrastructure was added to just block concurrent writers/readers
until the previous read/write operation was completed.
I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/file.rs | 47 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 130 |
2 files changed, 113 insertions, 64 deletions
diff --git a/src/libnative/io/file.rs b/src/libnative/io/file.rs index cc5b0770d4d..25fb2809e76 100644 --- a/src/libnative/io/file.rs +++ b/src/libnative/io/file.rs @@ -10,6 +10,7 @@ //! Blocking posix-based file I/O +use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; @@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 { pub type fd_t = libc::c_int; +struct Inner { + fd: fd_t, + close_on_drop: bool, +} + pub struct FileDesc { - priv fd: fd_t, - priv close_on_drop: bool, + priv inner: UnsafeArc<Inner> } impl FileDesc { @@ -70,7 +75,10 @@ impl FileDesc { /// Note that all I/O operations done on this object will be *blocking*, but /// they do not require the runtime to be active. pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc { - FileDesc { fd: fd, close_on_drop: close_on_drop } + FileDesc { inner: UnsafeArc::new(Inner { + fd: fd, + close_on_drop: close_on_drop + }) } } // FIXME(#10465) these functions should not be public, but anything in @@ -80,7 +88,7 @@ impl FileDesc { #[cfg(windows)] type rlen = libc::c_uint; #[cfg(not(windows))] type rlen = libc::size_t; let ret = retry(|| unsafe { - libc::read(self.fd, + libc::read(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as rlen) as libc::c_int }); @@ -97,7 +105,7 @@ impl FileDesc { #[cfg(not(windows))] type wlen = libc::size_t; let ret = keep_going(buf, |buf, len| { unsafe { - libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64 + libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64 } }); if ret < 0 { @@ -107,7 +115,11 @@ impl FileDesc { } } - pub fn fd(&self) -> fd_t { self.fd } + pub fn fd(&self) -> fd_t { + // This unsafety is fine because we're just reading off the file + // descriptor, no one is modifying this. + unsafe { (*self.inner.get()).fd } + } } impl io::Reader for FileDesc { @@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc { self.inner_write(buf) } fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> { - return os_pread(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<int> { @@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> { - return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset); + return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset); #[cfg(windows)] fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> { @@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekCur => libc::FILE_CURRENT, }; unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; let mut newpos = 0; match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) { 0 => Err(super::last_error()), @@ -212,7 +224,7 @@ impl rtio::RtioFileStream for FileDesc { io::SeekEnd => libc::SEEK_END, io::SeekCur => libc::SEEK_CUR, }; - let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) }; + let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) }; if n < 0 { Err(super::last_error()) } else { @@ -220,7 +232,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn tell(&self) -> Result<u64, IoError> { - let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) }; + let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) }; if n < 0 { Err(super::last_error()) } else { @@ -228,7 +240,7 @@ impl rtio::RtioFileStream for FileDesc { } } fn fsync(&mut self) -> Result<(), IoError> { - return os_fsync(self.fd); + return os_fsync(self.fd()); #[cfg(windows)] fn os_fsync(fd: c_int) -> IoResult<()> { @@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(not(windows))] fn datasync(&mut self) -> Result<(), IoError> { - return super::mkerr_libc(os_datasync(self.fd)); + return super::mkerr_libc(os_datasync(self.fd())); #[cfg(target_os = "macos")] fn os_datasync(fd: c_int) -> c_int { @@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc { Ok(_) => {}, Err(e) => return Err(e), }; let ret = unsafe { - let handle = libc::get_osfhandle(self.fd) as libc::HANDLE; + let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE; match libc::SetEndOfFile(handle) { 0 => Err(super::last_error()), _ => Ok(()) @@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc { #[cfg(unix)] fn truncate(&mut self, offset: i64) -> Result<(), IoError> { super::mkerr_libc(retry(|| unsafe { - libc::ftruncate(self.fd, offset as libc::off_t) + libc::ftruncate(self.fd(), offset as libc::off_t) })) } } @@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { self.inner_write(buf) } + fn clone(&self) -> ~rtio::RtioPipe { + ~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe + } } impl rtio::RtioTTY for FileDesc { @@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc { fn isatty(&self) -> bool { false } } -impl Drop for FileDesc { +impl Drop for Inner { fn drop(&mut self) { // closing stdio file handles makes no sense, so never do it. Also, note // that errors are ignored when closing a file descriptor. The reason diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index dd916c8f3c4..32cd6337f99 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -14,6 +14,7 @@ use std::io; use std::libc; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use std::unstable::intrinsics; use super::{IoResult, retry}; @@ -108,10 +109,27 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, let ret = libc::setsockopt(fd, opt, val, payload, mem::size_of::<T>() as libc::socklen_t); - super::mkerr_libc(ret) + if ret != 0 { + Err(last_error()) + } else { + Ok(()) + } } } +#[cfg(windows)] +fn last_error() -> io::IoError { + extern "system" { + fn WSAGetLastError() -> libc::c_int; + } + super::translate_error(unsafe { WSAGetLastError() }, true) +} + +#[cfg(not(windows))] +fn last_error() -> io::IoError { + super::last_error() +} + #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -128,7 +146,7 @@ fn sockname(fd: sock_t, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { - return Err(super::last_error()) + return Err(last_error()) } } return sockaddr_to_addr(&storage, len as uint); @@ -222,7 +240,11 @@ pub fn init() { //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { - priv fd: sock_t, + priv inner: UnsafeArc<Inner>, +} + +struct Inner { + fd: sock_t, } impl TcpStream { @@ -231,27 +253,31 @@ impl TcpStream { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpStream { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpStream { inner: UnsafeArc::new(inner) }; match retry(|| { libc::connect(fd, addrp as *libc::sockaddr, len as libc::socklen_t) }) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, nodelay as libc::c_int) } fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { - let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, + let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, seconds.is_some() as libc::c_int); match seconds { Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), @@ -261,12 +287,12 @@ impl TcpStream { #[cfg(target_os = "macos")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, seconds as libc::c_int) } #[cfg(target_os = "freebsd")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, + setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, seconds as libc::c_int) } #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))] @@ -282,7 +308,7 @@ impl rtio::RtioTcpStream for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let ret = retry(|| { unsafe { - libc::recv(self.fd, + libc::recv(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as wrlen, 0) as libc::c_int @@ -291,7 +317,7 @@ impl rtio::RtioTcpStream for TcpStream { if ret == 0 { Err(io::standard_error(io::EndOfFile)) } else if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(ret as uint) } @@ -299,20 +325,20 @@ impl rtio::RtioTcpStream for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { let ret = keep_going(buf, |buf, len| { unsafe { - libc::send(self.fd, + libc::send(self.fd(), buf as *mut libc::c_void, len as wrlen, 0) as i64 } }); if ret < 0 { - Err(super::last_error()) + Err(last_error()) } else { Ok(()) } } fn peer_name(&mut self) -> IoResult<ip::SocketAddr> { - sockname(self.fd, libc::getpeername) + sockname(self.fd(), libc::getpeername) } fn control_congestion(&mut self) -> IoResult<()> { self.set_nodelay(false) @@ -326,15 +352,19 @@ impl rtio::RtioTcpStream for TcpStream { fn letdie(&mut self) -> IoResult<()> { self.set_keepalive(None) } + + fn clone(&self) -> ~rtio::RtioTcpStream { + ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream + } } impl rtio::RtioSocket for TcpStream { fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpStream { +impl Drop for Inner { fn drop(&mut self) { unsafe { close(self.fd); } } } @@ -343,7 +373,7 @@ impl Drop for TcpStream { //////////////////////////////////////////////////////////////////////////////// pub struct TcpListener { - priv fd: sock_t, + priv inner: UnsafeArc<Inner>, } impl TcpListener { @@ -352,7 +382,8 @@ impl TcpListener { socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = TcpListener { fd: fd }; + let inner = Inner { fd: fd }; + let ret = TcpListener { inner: UnsafeArc::new(inner) }; // On platforms with Berkeley-derived sockets, this allows // to quickly rebind a socket, without needing to wait for // the OS to clean up the previous one. @@ -366,18 +397,21 @@ impl TcpListener { } match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // This is just a read-only arc so the unsafety is fine + unsafe { (*self.inner.get()).fd } + } pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { - -1 => Err(super::last_error()), + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { + -1 => Err(last_error()), _ => Ok(TcpAcceptor { listener: self }) } } @@ -391,20 +425,16 @@ impl rtio::RtioTcpListener for TcpListener { impl rtio::RtioSocket for TcpListener { fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } -impl Drop for TcpListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - pub struct TcpAcceptor { priv listener: TcpListener, } impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } + pub fn fd(&self) -> sock_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult<TcpStream> { unsafe { @@ -417,8 +447,8 @@ impl TcpAcceptor { storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int }) as sock_t { - -1 => Err(super::last_error()), - fd => Ok(TcpStream { fd: fd }) + -1 => Err(last_error()), + fd => Ok(TcpStream { inner: UnsafeArc::new(Inner { fd: fd })}) } } } @@ -444,7 +474,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { - priv fd: sock_t, + priv inner: UnsafeArc<Inner>, } impl UdpSocket { @@ -453,25 +483,29 @@ impl UdpSocket { socket(addr, libc::SOCK_DGRAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; - let ret = UdpSocket { fd: fd }; + let inner = Inner { fd: fd }; + let ret = UdpSocket { inner: UnsafeArc::new(inner) }; match libc::bind(fd, addrp as *libc::sockaddr, len as libc::socklen_t) { - -1 => Err(super::last_error()), + -1 => Err(last_error()), _ => Ok(ret), } }) } } - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { + // unsafety is fine because it's just a read-only arc + unsafe { (*self.inner.get()).fd } + } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST, + setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, on as libc::c_int) } pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, on as libc::c_int) } @@ -484,14 +518,14 @@ impl UdpSocket { // interface == INADDR_ANY imr_interface: libc::in_addr { s_addr: 0x0 }, }; - setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) } In6Addr(addr) => { let mreq = libc::ip6_mreq { ipv6mr_multiaddr: addr, ipv6mr_interface: 0, }; - setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq) + setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) } } } @@ -514,14 +548,14 @@ impl rtio::RtioUdpSocket for UdpSocket { let mut addrlen: libc::socklen_t = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; let ret = retry(|| { - libc::recvfrom(self.fd, + libc::recvfrom(self.fd(), buf.as_ptr() as *mut libc::c_void, buf.len() as msglen_t, 0, storagep as *mut libc::sockaddr, &mut addrlen) as libc::c_int }); - if ret < 0 { return Err(super::last_error()) } + if ret < 0 { return Err(last_error()) } sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { Ok((ret as uint, addr)) }) @@ -532,7 +566,7 @@ impl rtio::RtioUdpSocket for UdpSocket { let dstp = &dst as *libc::sockaddr_storage; unsafe { let ret = retry(|| { - libc::sendto(self.fd, + libc::sendto(self.fd(), buf.as_ptr() as *libc::c_void, buf.len() as msglen_t, 0, @@ -540,7 +574,7 @@ impl rtio::RtioUdpSocket for UdpSocket { len as libc::socklen_t) as libc::c_int }); match ret { - -1 => Err(super::last_error()), + -1 => Err(last_error()), n if n as uint != buf.len() => { Err(io::IoError { kind: io::OtherIoError, @@ -582,11 +616,11 @@ impl rtio::RtioUdpSocket for UdpSocket { } fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, ttl as libc::c_int) } fn time_to_live(&mut self, ttl: int) -> IoResult<()> { - setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) + setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) } fn hear_broadcasts(&mut self) -> IoResult<()> { @@ -595,8 +629,8 @@ impl rtio::RtioUdpSocket for UdpSocket { fn ignore_broadcasts(&mut self) -> IoResult<()> { self.set_broadcast(false) } -} -impl Drop for UdpSocket { - fn drop(&mut self) { unsafe { close(self.fd) } } + fn clone(&self) -> ~rtio::RtioUdpSocket { + ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket + } } |
