about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorEric Reed <ereed@mozilla.com>2013-06-14 12:04:11 -0700
committerEric Reed <ereed@mozilla.com>2013-06-14 12:04:11 -0700
commita7f92c92ed07588c3bde3cc38e64b9289ea682f5 (patch)
tree2dbfa471a12238f9c06b0ab4cb124821fa1903ac /src/libstd
parent03fe59aefa6ec84531335fd93e8dbf44dee65570 (diff)
downloadrust-a7f92c92ed07588c3bde3cc38e64b9289ea682f5.tar.gz
rust-a7f92c92ed07588c3bde3cc38e64b9289ea682f5.zip
Added a UdpWatcher and UdpSendRequest with associated callbacks
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/uv/mod.rs15
-rw-r--r--src/libstd/rt/uv/net.rs174
2 files changed, 184 insertions, 5 deletions
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs
index 5f9e5660814..f7cc5c6cc8b 100644
--- a/src/libstd/rt/uv/mod.rs
+++ b/src/libstd/rt/uv/mod.rs
@@ -54,7 +54,7 @@ use rt::io::IoError;
 #[cfg(test)] use unstable::run_in_bare_thread;
 
 pub use self::file::FsRequest;
-pub use self::net::{StreamWatcher, TcpWatcher};
+pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
 pub use self::async::AsyncWatcher;
@@ -128,6 +128,8 @@ pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(FsRequest, Option<UvError>);
 pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
 pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
+pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, Ipv4, uint, Option<UvError>);
+pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
 
 
 /// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@@ -139,7 +141,9 @@ struct WatcherData {
     alloc_cb: Option<AllocCallback>,
     idle_cb: Option<IdleCallback>,
     timer_cb: Option<TimerCallback>,
-    async_cb: Option<AsyncCallback>
+    async_cb: Option<AsyncCallback>,
+    udp_recv_cb: Option<UdpReceiveCallback>,
+    udp_send_cb: Option<UdpSendCallback>
 }
 
 pub trait WatcherInterop {
@@ -169,7 +173,9 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
                 alloc_cb: None,
                 idle_cb: None,
                 timer_cb: None,
-                async_cb: None
+                async_cb: None,
+                udp_recv_cb: None,
+                udp_send_cb: None
             };
             let data = transmute::<~WatcherData, *c_void>(data);
             uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -309,6 +315,9 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError>
 /// The uv buffer type
 pub type Buf = uvll::uv_buf_t;
 
+/// The uv IPv4 type
+pub type Ipv4 = uvll::sockaddr_in;
+
 /// Borrow a slice to a Buf
 pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
     let data = vec::raw::to_ptr(v);
diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs
index 68b871e6b31..4079a2f7b77 100644
--- a/src/libstd/rt/uv/net.rs
+++ b/src/libstd/rt/uv/net.rs
@@ -9,10 +9,10 @@
 // except according to those terms.
 
 use prelude::*;
-use libc::{size_t, ssize_t, c_int, c_void};
+use libc::{size_t, ssize_t, c_int, c_void, c_uint};
 use rt::uv::uvll;
 use rt::uv::uvll::*;
-use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
+use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
              status_to_maybe_uv_error};
 use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
@@ -254,6 +254,142 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
     }
 }
 
+pub struct UdpWatcher(*uvll::uv_udp_t);
+impl Watcher for UdpWatcher { }
+
+pub impl UdpWatcher {
+    fn new(loop_: &mut Loop) -> UdpWatcher {
+        unsafe {
+            let handle = malloc_handle(UV_UDP);
+            assert!(handle.is_not_null());
+            assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
+            let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher;
+        }
+    }
+
+    fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
+        match address {
+            Ipv4(*) => {
+                do ip4_as_uv_ip4(address) |addr| {
+                    let result = unsafe {
+                        uvll::udp_bind(self.native_handle(), addr, 0u32)
+                    };
+                    if result == 0 {
+                        Ok(())
+                    } else {
+                        Err(last_uv_error(self))
+                    }
+                }
+            }
+            _ => fail!() // TODO ipv6
+        }
+    }
+
+    fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.alloc_cb = Some(alloc);
+            data.udp_recv_cb = Some(cb);
+        }
+
+        let handle = self.native_handle();
+        unsafe { uvll::read_start(handle, alloc_cb, recv_cb); }
+
+        extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
+            let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
+            let data = udp_watcher.get_watcher_data();
+            let alloc_cb = data.alloc_cb.get_ref();
+            return (*alloc_cb)(suggested_size as uint);
+        }
+
+        /* TODO the socket address should actually be a pointer to either a sockaddr_in or sockaddr_in6.
+           In libuv, the udp_recv callback takes a struct *sockaddr */
+        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, 
+                          address: *uvll::sockaddr_in, flags: c_uint) {
+            rtdebug!("buf addr: %x", buf.base as uint);
+            rtdebug!("buf len: %d", buf.len as int);
+            let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
+            let data = udp_watcher.get_watcher_data();
+            let cb = data.udp_recv_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, nread as c_int);
+            unsafe { (*cb)(udp_watcher, nread as int, buf, *address, flags as uint, status) };
+        }
+    }
+
+    fn recv_stop(&mut self) {
+        let handle = self.native_handle();
+        unsafe { uvll::udp_recv_stop(handle); }
+    }
+
+    fn send(&mut self, buf: Buf, address: IpAddr, cb: UdpSendCallback) {
+        {
+            let data = self.get_watcher_data();
+            assert!(data.udp_send_cb.is_none());
+            data.udp_send_cb = Some(cb);
+        }
+
+        let req = UdpSendRequest::new();
+        let bufs = [buf];
+        match address {
+            Ipv4(*) => {
+                do ip4_as_uv_ip4(address) |addr| {
+                    unsafe {
+                        assert!(0 == uvll::udp_send(req.native_handle(),
+                                                    self.native_handle(),
+                                                    bufs, addr, send_cb));
+                    }
+                }
+            }
+            _ => fail!() // TODO ipv6
+        }
+
+        extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
+            let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
+            let mut udp_watcher = send_request.handle();
+            send_request.delete();
+            let cb = {
+                let data = udp_watcher.get_watcher_data();
+                let cb = data.udp_send_cb.swap_unwrap();
+                cb
+            };
+            let status = status_to_maybe_uv_error(udp_watcher.native_handle(), status);
+            cb(udp_watcher, status);
+        }
+    }
+
+    fn close(self, cb: NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb); }
+
+        extern fn close_cb(handle: *uvll::uv_udp_t) {
+            let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = udp_watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            udp_watcher.drop_watcher_data();
+            unsafe { free_handle(handle as *c_void) }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
+    fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
+        UdpWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_udp_t {
+        match self { &UdpWatcher(ptr) => ptr }
+    }
+}
+
 // uv_connect_t is a subclass of uv_req_t
 struct ConnectRequest(*uvll::uv_connect_t);
 impl Request for ConnectRequest { }
@@ -327,6 +463,40 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
     }
 }
 
+pub struct UdpSendRequest(*uvll::uv_udp_send_t);
+
+impl Request for UdpSendRequest { }
+
+pub impl UdpSendRequest {
+    fn new() -> UdpSendRequest {
+        let send_handle = unsafe {
+            malloc_req(UV_UDP_SEND)
+        };
+        assert!(send_handle.is_not_null());
+        let send_handle = send_handle as *uvll::uv_udp_send_t;
+        UdpSendRequest(send_handle)
+    }
+
+    fn handle(&self) -> UdpWatcher {
+        unsafe {
+            let udp_handle = uvll::get_udp_handle_from_send_req(self.native_handle());
+            NativeHandle::from_native_handle(udp_handle)
+        }
+    }
+
+    fn delete(self) {
+        unsafe { free_req(self.native_handle() as *c_void) }
+    }
+}
+
+impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
+    fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
+        UdpSendRequest(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_udp_send_t {
+        match self { &UdpSendRequest(ptr) => ptr }
+    }
+}
 
 #[cfg(test)]
 mod test {