about summary refs log tree commit diff
path: root/src/libnative
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-28 16:40:15 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-31 11:34:22 -0800
commitbba78a2a89d5fae34e22d7a3173d90dffff816f4 (patch)
treed14dd194276ed9b6909f36a485fcfa2011ae88aa /src/libnative
parent292269708701f1dfc663668aa72584617b3d9ccc (diff)
downloadrust-bba78a2a89d5fae34e22d7a3173d90dffff816f4.tar.gz
rust-bba78a2a89d5fae34e22d7a3173d90dffff816f4.zip
Implement native UDP I/O
Diffstat (limited to 'src/libnative')
-rw-r--r--src/libnative/io/mod.rs4
-rw-r--r--src/libnative/io/net.rs349
2 files changed, 264 insertions, 89 deletions
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index 9e76dea5ebf..b936a36cf3a 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -166,8 +166,8 @@ impl rtio::IoFactory for IoFactory {
     fn tcp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioTcpListener> {
         net::TcpListener::bind(addr).map(|s| ~s as ~RtioTcpListener)
     }
-    fn udp_bind(&mut self, _addr: SocketAddr) -> IoResult<~RtioUdpSocket> {
-        Err(unimpl())
+    fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> {
+        net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket)
     }
     fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> {
         Err(unimpl())
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index aaa95ce0cfb..674f02d4a22 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -19,13 +19,13 @@ use std::unstable::intrinsics;
 use super::IoResult;
 use super::file::keep_going;
 
+////////////////////////////////////////////////////////////////////////////////
+// sockaddr and misc bindings
+////////////////////////////////////////////////////////////////////////////////
+
 #[cfg(windows)] pub type sock_t = libc::SOCKET;
 #[cfg(unix)]    pub type sock_t = super::file::fd_t;
 
-pub struct TcpStream {
-    priv fd: sock_t,
-}
-
 #[cfg(target_endian = "big")] pub fn htons(x: u16) -> u16 { x }
 #[cfg(target_endian = "big")] pub fn ntohs(x: u16) -> u16 { x }
 #[cfg(target_endian = "little")]
@@ -37,32 +37,54 @@ pub fn ntohs(u: u16) -> u16 {
     unsafe { intrinsics::bswap16(u as i16) as u16 }
 }
 
+enum InAddr {
+    InAddr(libc::in_addr),
+    In6Addr(libc::in6_addr),
+}
+
+fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
+    match ip {
+        ip::Ipv4Addr(a, b, c, d) => {
+            InAddr(libc::in_addr {
+                s_addr: (d as u32 << 24) |
+                        (c as u32 << 16) |
+                        (b as u32 <<  8) |
+                        (a as u32 <<  0)
+            })
+        }
+        ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
+            In6Addr(libc::in6_addr {
+                s6_addr: [
+                    htons(a),
+                    htons(b),
+                    htons(c),
+                    htons(d),
+                    htons(e),
+                    htons(f),
+                    htons(g),
+                    htons(h),
+                ]
+            })
+        }
+    }
+}
+
 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
     unsafe {
         let storage: libc::sockaddr_storage = intrinsics::init();
-        let len = match addr.ip {
-            ip::Ipv4Addr(a, b, c, d) => {
+        let len = match ip_to_inaddr(addr.ip) {
+            InAddr(inaddr) => {
                 let storage: *mut libc::sockaddr_in = cast::transmute(&storage);
                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
                 (*storage).sin_port = htons(addr.port);
-                (*storage).sin_addr.s_addr = (d as u32 << 24) |
-                                             (c as u32 << 16) |
-                                             (b as u32 <<  8) |
-                                             (a as u32 <<  0);
+                (*storage).sin_addr = inaddr;
                 mem::size_of::<libc::sockaddr_in>()
             }
-            ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
+            In6Addr(inaddr) => {
                 let storage: *mut libc::sockaddr_in6 = cast::transmute(&storage);
                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
                 (*storage).sin6_port = htons(addr.port);
-                (*storage).sin6_addr.s6_addr[0] = htons(a);
-                (*storage).sin6_addr.s6_addr[1] = htons(b);
-                (*storage).sin6_addr.s6_addr[2] = htons(c);
-                (*storage).sin6_addr.s6_addr[3] = htons(d);
-                (*storage).sin6_addr.s6_addr[4] = htons(e);
-                (*storage).sin6_addr.s6_addr[5] = htons(f);
-                (*storage).sin6_addr.s6_addr[6] = htons(g);
-                (*storage).sin6_addr.s6_addr[7] = htons(h);
+                (*storage).sin6_addr = inaddr;
                 mem::size_of::<libc::sockaddr_in6>()
             }
         };
@@ -70,19 +92,33 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
     }
 }
 
-fn socket(addr: ip::SocketAddr) -> IoResult<sock_t> {
+fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
     unsafe {
         let fam = match addr.ip {
             ip::Ipv4Addr(..) => libc::AF_INET,
             ip::Ipv6Addr(..) => libc::AF_INET6,
         };
-        match libc::socket(fam, libc::SOCK_STREAM, 0) {
+        match libc::socket(fam, ty, 0) {
             -1 => Err(super::last_error()),
             fd => Ok(fd),
         }
     }
 }
 
+fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
+                 payload: T) -> IoResult<()> {
+    unsafe {
+        let payload = &payload as *T as *libc::c_void;
+        let ret = libc::setsockopt(fd, opt, val,
+                                   payload,
+                                   mem::size_of::<T>() as libc::socklen_t);
+        super::mkerr_libc(ret)
+    }
+}
+
+#[cfg(windows)] unsafe fn close(sock: sock_t) { libc::closesocket(sock); }
+#[cfg(unix)]    unsafe fn close(sock: sock_t) { libc::close(sock); }
+
 fn sockname(fd: sock_t,
             f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
                                          *mut libc::socklen_t) -> libc::c_int)
@@ -99,11 +135,16 @@ fn sockname(fd: sock_t,
             return Err(super::last_error())
         }
     }
+    return sockaddr_to_addr(&storage, len as uint);
+}
+
+fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
+                    len: uint) -> IoResult<ip::SocketAddr> {
     match storage.ss_family as libc::c_int {
         libc::AF_INET => {
             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
-            let storage: &mut libc::sockaddr_in = unsafe {
-                cast::transmute(&mut storage)
+            let storage: &libc::sockaddr_in = unsafe {
+                cast::transmute(storage)
             };
             let addr = storage.sin_addr.s_addr as u32;
             let a = (addr >>  0) as u8;
@@ -117,8 +158,8 @@ fn sockname(fd: sock_t,
         }
         libc::AF_INET6 => {
             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
-            let storage: &mut libc::sockaddr_in6 = unsafe {
-                cast::transmute(&mut storage)
+            let storage: &libc::sockaddr_in6 = unsafe {
+                cast::transmute(storage)
             };
             let a = ntohs(storage.sin6_addr.s6_addr[0]);
             let b = ntohs(storage.sin6_addr.s6_addr[1]);
@@ -180,10 +221,18 @@ pub fn init() {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// TCP streams
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpStream {
+    priv fd: sock_t,
+}
+
 impl TcpStream {
     pub fn connect(addr: ip::SocketAddr) -> IoResult<TcpStream> {
         unsafe {
-            socket(addr).and_then(|fd| {
+            socket(addr, libc::SOCK_STREAM).and_then(|fd| {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
                 let ret = TcpStream { fd: fd };
@@ -199,63 +248,31 @@ impl TcpStream {
     pub fn fd(&self) -> sock_t { self.fd }
 
     fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
-        unsafe {
-            let on = nodelay as libc::c_int;
-            let on = &on as *libc::c_int;
-            super::mkerr_libc(libc::setsockopt(self.fd,
-                                               libc::IPPROTO_TCP,
-                                               libc::TCP_NODELAY,
-                                               on as *libc::c_void,
-                                               mem::size_of::<libc::c_void>()
-                                                    as libc::socklen_t))
-        }
+        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_NODELAY,
+                   nodelay as libc::c_int)
     }
 
     fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
-        unsafe {
-            let on = seconds.is_some() as libc::c_int;
-            let on = &on as *libc::c_int;
-            let ret = libc::setsockopt(self.fd,
-                                       libc::SOL_SOCKET,
-                                       libc::SO_KEEPALIVE,
-                                       on as *libc::c_void,
-                                       mem::size_of::<libc::c_void>()
-                                            as libc::socklen_t);
-            if ret != 0 { return Err(super::last_error()) }
-
-            match seconds {
-                Some(n) => self.set_tcp_keepalive(n),
-                None => Ok(())
-            }
+        let ret = setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE,
+                             seconds.is_some() as libc::c_int);
+        match seconds {
+            Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
+            None => ret,
         }
     }
 
     #[cfg(target_os = "macos")]
-    unsafe fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
-        let delay = seconds as libc::c_uint;
-        let delay = &delay as *libc::c_uint;
-        let ret = libc::setsockopt(self.fd,
-                                   libc::IPPROTO_TCP,
-                                   libc::TCP_KEEPALIVE,
-                                   delay as *libc::c_void,
-                                   mem::size_of::<libc::c_uint>()
-                                        as libc::socklen_t);
-        super::mkerr_libc(ret)
+    fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
+        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
+                   seconds as libc::c_int)
     }
     #[cfg(target_os = "freebsd")]
-    unsafe fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
-        let delay = seconds as libc::c_uint;
-        let delay = &delay as *libc::c_uint;
-        let ret = libc::setsockopt(self.fd,
-                                   libc::IPPROTO_TCP,
-                                   libc::TCP_KEEPIDLE,
-                                   delay as *libc::c_void,
-                                   mem::size_of::<libc::c_uint>()
-                                        as libc::socklen_t);
-        super::mkerr_libc(ret)
+    fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
+        setsockopt(self.fd, libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
+                   seconds as libc::c_int)
     }
     #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
-    unsafe fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
+    fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
         Ok(())
     }
 }
@@ -320,17 +337,13 @@ impl rtio::RtioSocket for TcpStream {
 }
 
 impl Drop for TcpStream {
-    #[cfg(unix)]
-    fn drop(&mut self) {
-        unsafe { libc::close(self.fd); }
-    }
-
-    #[cfg(windows)]
-    fn drop(&mut self) {
-        unsafe { libc::closesocket(self.fd); }
-    }
+    fn drop(&mut self) { unsafe { close(self.fd); } }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// TCP listeners
+////////////////////////////////////////////////////////////////////////////////
+
 pub struct TcpListener {
     priv fd: sock_t,
 }
@@ -338,7 +351,7 @@ pub struct TcpListener {
 impl TcpListener {
     pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
         unsafe {
-            socket(addr).and_then(|fd| {
+            socket(addr, libc::SOCK_STREAM).and_then(|fd| {
                 let (addr, len) = addr_to_sockaddr(addr);
                 let addrp = &addr as *libc::sockaddr_storage;
                 let ret = TcpListener { fd: fd };
@@ -356,7 +369,7 @@ impl TcpListener {
     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
         match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(TcpAcceptor { fd: self.fd })
+            _ => Ok(TcpAcceptor { listener: self })
         }
     }
 }
@@ -373,12 +386,16 @@ impl rtio::RtioSocket for TcpListener {
     }
 }
 
+impl Drop for TcpListener {
+    fn drop(&mut self) { unsafe { close(self.fd); } }
+}
+
 pub struct TcpAcceptor {
-    priv fd: sock_t,
+    priv listener: TcpListener,
 }
 
 impl TcpAcceptor {
-    pub fn fd(&self) -> sock_t { self.fd }
+    pub fn fd(&self) -> sock_t { self.listener.fd }
 
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
         unsafe {
@@ -386,7 +403,7 @@ impl TcpAcceptor {
             let storagep = &mut storage as *mut libc::sockaddr_storage;
             let size = mem::size_of::<libc::sockaddr_storage>();
             let mut size = size as libc::socklen_t;
-            match libc::accept(self.fd,
+            match libc::accept(self.fd(),
                                storagep as *mut libc::sockaddr,
                                &mut size as *mut libc::socklen_t) {
                 -1 => Err(super::last_error()),
@@ -398,7 +415,7 @@ impl TcpAcceptor {
 
 impl rtio::RtioSocket for TcpAcceptor {
     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
-        sockname(self.fd, libc::getsockname)
+        sockname(self.fd(), libc::getsockname)
     }
 }
 
@@ -410,3 +427,161 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+// UDP
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UdpSocket {
+    priv fd: sock_t,
+}
+
+impl UdpSocket {
+    pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
+        unsafe {
+            socket(addr, libc::SOCK_DGRAM).and_then(|fd| {
+                let (addr, len) = addr_to_sockaddr(addr);
+                let addrp = &addr as *libc::sockaddr_storage;
+                let ret = UdpSocket { fd: fd };
+                match libc::bind(fd, addrp as *libc::sockaddr,
+                                 len as libc::socklen_t) {
+                    -1 => Err(super::last_error()),
+                    _ => Ok(ret),
+                }
+            })
+        }
+    }
+
+    pub fn fd(&self) -> sock_t { self.fd }
+
+    pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
+        setsockopt(self.fd, libc::SOL_SOCKET, libc::SO_BROADCAST,
+                   on as libc::c_int)
+    }
+
+    pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
+        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
+                   on as libc::c_int)
+    }
+
+    pub fn set_membership(&mut self, addr: ip::IpAddr,
+                          opt: libc::c_int) -> IoResult<()> {
+        match ip_to_inaddr(addr) {
+            InAddr(addr) => {
+                let mreq = libc::ip_mreq {
+                    imr_multiaddr: addr,
+                    // interface == INADDR_ANY
+                    imr_interface: libc::in_addr { s_addr: 0x0 },
+                };
+                setsockopt(self.fd, libc::IPPROTO_IP, opt, mreq)
+            }
+            In6Addr(addr) => {
+                let mreq = libc::ip6_mreq {
+                    ipv6mr_multiaddr: addr,
+                    ipv6mr_interface: 0,
+                };
+                setsockopt(self.fd, libc::IPPROTO_IPV6, opt, mreq)
+            }
+        }
+    }
+}
+
+impl rtio::RtioSocket for UdpSocket {
+    fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
+        sockname(self.fd(), libc::getsockname)
+    }
+}
+
+#[cfg(windows)] type msglen_t = libc::c_int;
+#[cfg(unix)]    type msglen_t = libc::size_t;
+
+impl rtio::RtioUdpSocket for UdpSocket {
+    fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
+        unsafe {
+            let mut storage: libc::sockaddr_storage = intrinsics::init();
+            let storagep = &mut storage as *mut libc::sockaddr_storage;
+            let mut addrlen: libc::socklen_t =
+                    mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+            let ret = libc::recvfrom(self.fd,
+                                     buf.as_ptr() as *mut libc::c_void,
+                                     buf.len() as msglen_t,
+                                     0,
+                                     storagep as *mut libc::sockaddr,
+                                     &mut addrlen);
+            if ret < 0 { return Err(super::last_error()) }
+            sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
+                Ok((ret as uint, addr))
+            })
+        }
+    }
+    fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
+        let (dst, len) = addr_to_sockaddr(dst);
+        let dstp = &dst as *libc::sockaddr_storage;
+        unsafe {
+            let ret = libc::sendto(self.fd,
+                                   buf.as_ptr() as *libc::c_void,
+                                   buf.len() as msglen_t,
+                                   0,
+                                   dstp as *libc::sockaddr,
+                                   len as libc::socklen_t);
+            match ret {
+                -1 => Err(super::last_error()),
+                n if n as uint != buf.len() => {
+                    Err(io::IoError {
+                        kind: io::OtherIoError,
+                        desc: "couldn't send entire packet at once",
+                        detail: None,
+                    })
+                }
+                _ => Ok(())
+            }
+        }
+    }
+
+    fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
+        match multi {
+            ip::Ipv4Addr(..) => {
+                self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
+            }
+            ip::Ipv6Addr(..) => {
+                self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
+            }
+        }
+    }
+    fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
+        match multi {
+            ip::Ipv4Addr(..) => {
+                self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
+            }
+            ip::Ipv6Addr(..) => {
+                self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
+            }
+        }
+    }
+
+    fn loop_multicast_locally(&mut self) -> IoResult<()> {
+        self.set_multicast_loop(true)
+    }
+    fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
+        self.set_multicast_loop(false)
+    }
+
+    fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
+        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
+                   ttl as libc::c_int)
+    }
+    fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
+        setsockopt(self.fd, libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
+    }
+
+    fn hear_broadcasts(&mut self) -> IoResult<()> {
+        self.set_broadcast(true)
+    }
+    fn ignore_broadcasts(&mut self) -> IoResult<()> {
+        self.set_broadcast(false)
+    }
+}
+
+impl Drop for UdpSocket {
+    fn drop(&mut self) { unsafe { close(self.fd) } }
+}