diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-28 16:40:15 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-31 11:34:22 -0800 |
| commit | bba78a2a89d5fae34e22d7a3173d90dffff816f4 (patch) | |
| tree | d14dd194276ed9b6909f36a485fcfa2011ae88aa /src/libnative | |
| parent | 292269708701f1dfc663668aa72584617b3d9ccc (diff) | |
| download | rust-bba78a2a89d5fae34e22d7a3173d90dffff816f4.tar.gz rust-bba78a2a89d5fae34e22d7a3173d90dffff816f4.zip | |
Implement native UDP I/O
Diffstat (limited to 'src/libnative')
| -rw-r--r-- | src/libnative/io/mod.rs | 4 | ||||
| -rw-r--r-- | src/libnative/io/net.rs | 349 |
2 files changed, 264 insertions, 89 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 9e76dea5ebf..b936a36cf3a 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -166,8 +166,8 @@ impl rtio::IoFactory for IoFactory { fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener> { net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener) } - fn udp_bind(&mut self, _addr: SocketAddr) -> IoResult<~RtioUdpSocket> { - Err(unimpl()) + fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> { + net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket) } fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> { Err(unimpl()) diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index aaa95ce0cfb..674f02d4a22 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -19,13 +19,13 @@ use std::unstable::intrinsics; use super::IoResult; use super::file::keep_going; +//////////////////////////////////////////////////////////////////////////////// +// sockaddr and misc bindings +//////////////////////////////////////////////////////////////////////////////// + #[cfg(windows)] pub type sock_t = libc::SOCKET; #[cfg(unix)] pub type sock_t = super::file::fd_t; -pub struct TcpStream { - priv fd: sock_t, -} - #[cfg(target_endian = "big")] pub fn htons(x: u16) -> u16 { x } #[cfg(target_endian = "big")] pub fn ntohs(x: u16) -> u16 { x } #[cfg(target_endian = "little")] @@ -37,32 +37,54 @@ pub fn ntohs(u: u16) -> u16 { unsafe { intrinsics::bswap16(u as i16) as u16 } } +enum InAddr { + InAddr(libc::in_addr), + In6Addr(libc::in6_addr), +} + +fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr { + match ip { + ip::Ipv4Addr(a, b, c, d) => { + InAddr(libc::in_addr { + s_addr: (d as u32 << 24) | + (c as u32 << 16) | + (b as u32 << 8) | + (a as u32 << 0) + }) + } + ip::Ipv6Addr(a, b, c, d, e, f, g, h) => { + In6Addr(libc::in6_addr { + s6_addr: [ + htons(a), + htons(b), + htons(c), + htons(d), + htons(e), + htons(f), + htons(g), + htons(h), + ] + }) + } + } +} + fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { unsafe { let storage: libc::sockaddr_storage = intrinsics::init(); - let len = match addr.ip { - ip::Ipv4Addr(a, b, c, d) => { + let len = match ip_to_inaddr(addr.ip) { + InAddr(inaddr) => { let storage: *mut libc::sockaddr_in = cast::transmute(&storage); (*storage).sin_family = libc::AF_INET as libc::sa_family_t; (*storage).sin_port = htons(addr.port); - (*storage).sin_addr.s_addr = (d as u32 << 24) | - (c as u32 << 16) | - (b as u32 << 8) | - (a as u32 << 0); + (*storage).sin_addr = inaddr; mem::size_of::<libc::sockaddr_in>() } - ip::Ipv6Addr(a, b, c, d, e, f, g, h) => { + In6Addr(inaddr) => { let storage: *mut libc::sockaddr_in6 = cast::transmute(&storage); (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t; (*storage).sin6_port = htons(addr.port); - (*storage).sin6_addr.s6_addr[0] = htons(a); - (*storage).sin6_addr.s6_addr[1] = htons(b); - (*storage).sin6_addr.s6_addr[2] = htons(c); - (*storage).sin6_addr.s6_addr[3] = htons(d); - (*storage).sin6_addr.s6_addr[4] = htons(e); - (*storage).sin6_addr.s6_addr[5] = htons(f); - (*storage).sin6_addr.s6_addr[6] = htons(g); - (*storage).sin6_addr.s6_addr[7] = htons(h); + (*storage).sin6_addr = inaddr; mem::size_of::<libc::sockaddr_in6>() } }; @@ -70,19 +92,33 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { } } -fn socket(addr: ip::SocketAddr) -> IoResult<sock_t> { +fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> { unsafe { let fam = match addr.ip { ip::Ipv4Addr(..) => libc::AF_INET, ip::Ipv6Addr(..) => libc::AF_INET6, }; - match libc::socket(fam, libc::SOCK_STREAM, 0) { + match libc::socket(fam, ty, 0) { -1 => Err(super::last_error()), fd => Ok(fd), } } } +fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int, + payload: T) -> IoResult<()> { + unsafe { + let payload = &payload as *T as *libc::c_void; + let ret = libc::setsockopt(fd, opt, val, + payload, + mem::size_of::<T>() as libc::socklen_t); + super::mkerr_libc(ret) + } +} + +#[cfg(windows)] unsafe fn close(sock: sock_t) { libc::closesocket(sock); } +#[cfg(unix)] unsafe fn close(sock: sock_t) { libc::close(sock); } + fn sockname(fd: sock_t, f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr, *mut libc::socklen_t) -> libc::c_int) @@ -99,11 +135,16 @@ fn sockname(fd: sock_t, return Err(super::last_error()) } } + return sockaddr_to_addr(&storage, len as uint); +} + +fn sockaddr_to_addr(storage: &libc::sockaddr_storage, + len: uint) -> IoResult<ip::SocketAddr> { match storage.ss_family as libc::c_int { libc::AF_INET => { assert!(len as uint >= mem::size_of::<libc::sockaddr_in>()); - let storage: &mut libc::sockaddr_in = unsafe { - cast::transmute(&mut storage) + let storage: &libc::sockaddr_in = unsafe { + cast::transmute(storage) }; let addr = storage.sin_addr.s_addr as u32; let a = (addr >> 0) as u8; @@ -117,8 +158,8 @@ fn sockname(fd: sock_t, } libc::AF_INET6 => { assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>()); - let storage: &mut libc::sockaddr_in6 = unsafe { - cast::transmute(&mut storage) + let storage: &libc::sockaddr_in6 = unsafe { + cast::transmute(storage) }; let a = ntohs(storage.sin6_addr.s6_addr[0]); let b = ntohs(storage.sin6_addr.s6_addr[1]); @@ -180,10 +221,18 @@ pub fn init() { } } +//////////////////////////////////////////////////////////////////////////////// +// TCP streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpStream { + priv fd: sock_t, +} + impl TcpStream { pub fn connect(addr: ip::SocketAddr) -> IoResult<TcpStream> { unsafe { - socket(addr).and_then(|fd| { + socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; let ret = TcpStream { fd: fd }; @@ -199,63 +248,31 @@ impl TcpStream { pub fn fd(&self) -> sock_t { self.fd } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - unsafe { - let on = nodelay as libc::c_int; - let on = &on as *libc::c_int; - super::mkerr_libc(libc::setsockopt(self.fd, - libc::IPPROTO_TCP, - libc::TCP_NODELAY, - on as *libc::c_void, - mem::size_of::<libc::c_void>() - as libc::socklen_t)) - } + setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY, + nodelay as libc::c_int) } fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> { - unsafe { - let on = seconds.is_some() as libc::c_int; - let on = &on as *libc::c_int; - let ret = libc::setsockopt(self.fd, - libc::SOL_SOCKET, - libc::SO_KEEPALIVE, - on as *libc::c_void, - mem::size_of::<libc::c_void>() - as libc::socklen_t); - if ret != 0 { return Err(super::last_error()) } - - match seconds { - Some(n) => self.set_tcp_keepalive(n), - None => Ok(()) - } + let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, + seconds.is_some() as libc::c_int); + match seconds { + Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), + None => ret, } } #[cfg(target_os = "macos")] - unsafe fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - let delay = seconds as libc::c_uint; - let delay = &delay as *libc::c_uint; - let ret = libc::setsockopt(self.fd, - libc::IPPROTO_TCP, - libc::TCP_KEEPALIVE, - delay as *libc::c_void, - mem::size_of::<libc::c_uint>() - as libc::socklen_t); - super::mkerr_libc(ret) + fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { + setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, + seconds as libc::c_int) } #[cfg(target_os = "freebsd")] - unsafe fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { - let delay = seconds as libc::c_uint; - let delay = &delay as *libc::c_uint; - let ret = libc::setsockopt(self.fd, - libc::IPPROTO_TCP, - libc::TCP_KEEPIDLE, - delay as *libc::c_void, - mem::size_of::<libc::c_uint>() - as libc::socklen_t); - super::mkerr_libc(ret) + fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { + setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, + seconds as libc::c_int) } #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))] - unsafe fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { + fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { Ok(()) } } @@ -320,17 +337,13 @@ impl rtio::RtioSocket for TcpStream { } impl Drop for TcpStream { - #[cfg(unix)] - fn drop(&mut self) { - unsafe { libc::close(self.fd); } - } - - #[cfg(windows)] - fn drop(&mut self) { - unsafe { libc::closesocket(self.fd); } - } + fn drop(&mut self) { unsafe { close(self.fd); } } } +//////////////////////////////////////////////////////////////////////////////// +// TCP listeners +//////////////////////////////////////////////////////////////////////////////// + pub struct TcpListener { priv fd: sock_t, } @@ -338,7 +351,7 @@ pub struct TcpListener { impl TcpListener { pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> { unsafe { - socket(addr).and_then(|fd| { + socket(addr, libc::SOCK_STREAM).and_then(|fd| { let (addr, len) = addr_to_sockaddr(addr); let addrp = &addr as *libc::sockaddr_storage; let ret = TcpListener { fd: fd }; @@ -356,7 +369,7 @@ impl TcpListener { pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> { match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(TcpAcceptor { fd: self.fd }) + _ => Ok(TcpAcceptor { listener: self }) } } } @@ -373,12 +386,16 @@ impl rtio::RtioSocket for TcpListener { } } +impl Drop for TcpListener { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + pub struct TcpAcceptor { - priv fd: sock_t, + priv listener: TcpListener, } impl TcpAcceptor { - pub fn fd(&self) -> sock_t { self.fd } + pub fn fd(&self) -> sock_t { self.listener.fd } pub fn native_accept(&mut self) -> IoResult<TcpStream> { unsafe { @@ -386,7 +403,7 @@ impl TcpAcceptor { let storagep = &mut storage as *mut libc::sockaddr_storage; let size = mem::size_of::<libc::sockaddr_storage>(); let mut size = size as libc::socklen_t; - match libc::accept(self.fd, + match libc::accept(self.fd(), storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) { -1 => Err(super::last_error()), @@ -398,7 +415,7 @@ impl TcpAcceptor { impl rtio::RtioSocket for TcpAcceptor { fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { - sockname(self.fd, libc::getsockname) + sockname(self.fd(), libc::getsockname) } } @@ -410,3 +427,161 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } } + +//////////////////////////////////////////////////////////////////////////////// +// UDP +//////////////////////////////////////////////////////////////////////////////// + +pub struct UdpSocket { + priv fd: sock_t, +} + +impl UdpSocket { + pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> { + unsafe { + socket(addr, libc::SOCK_DGRAM).and_then(|fd| { + let (addr, len) = addr_to_sockaddr(addr); + let addrp = &addr as *libc::sockaddr_storage; + let ret = UdpSocket { fd: fd }; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => Err(super::last_error()), + _ => Ok(ret), + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } + + pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { + setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST, + on as libc::c_int) + } + + pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { + setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, + on as libc::c_int) + } + + pub fn set_membership(&mut self, addr: ip::IpAddr, + opt: libc::c_int) -> IoResult<()> { + match ip_to_inaddr(addr) { + InAddr(addr) => { + let mreq = libc::ip_mreq { + imr_multiaddr: addr, + // interface == INADDR_ANY + imr_interface: libc::in_addr { s_addr: 0x0 }, + }; + setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq) + } + In6Addr(addr) => { + let mreq = libc::ip6_mreq { + ipv6mr_multiaddr: addr, + ipv6mr_interface: 0, + }; + setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq) + } + } + } +} + +impl rtio::RtioSocket for UdpSocket { + fn socket_name(&mut self) -> IoResult<ip::SocketAddr> { + sockname(self.fd(), libc::getsockname) + } +} + +#[cfg(windows)] type msglen_t = libc::c_int; +#[cfg(unix)] type msglen_t = libc::size_t; + +impl rtio::RtioUdpSocket for UdpSocket { + fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t; + let ret = libc::recvfrom(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen); + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) + } + } + fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> { + let (dst, len) = addr_to_sockaddr(dst); + let dstp = &dst as *libc::sockaddr_storage; + unsafe { + let ret = libc::sendto(self.fd, + buf.as_ptr() as *libc::c_void, + buf.len() as msglen_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) + } + } + } + + fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> { + match multi { + ip::Ipv4Addr(..) => { + self.set_membership(multi, libc::IP_ADD_MEMBERSHIP) + } + ip::Ipv6Addr(..) => { + self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP) + } + } + } + fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> { + match multi { + ip::Ipv4Addr(..) => { + self.set_membership(multi, libc::IP_DROP_MEMBERSHIP) + } + ip::Ipv6Addr(..) => { + self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP) + } + } + } + + fn loop_multicast_locally(&mut self) -> IoResult<()> { + self.set_multicast_loop(true) + } + fn dont_loop_multicast_locally(&mut self) -> IoResult<()> { + self.set_multicast_loop(false) + } + + fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { + setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, + ttl as libc::c_int) + } + fn time_to_live(&mut self, ttl: int) -> IoResult<()> { + setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) + } + + fn hear_broadcasts(&mut self) -> IoResult<()> { + self.set_broadcast(true) + } + fn ignore_broadcasts(&mut self) -> IoResult<()> { + self.set_broadcast(false) + } +} + +impl Drop for UdpSocket { + fn drop(&mut self) { unsafe { close(self.fd) } } +} |
