diff options
| author | Eric Reed <ereed@mozilla.com> | 2013-06-14 12:04:11 -0700 |
|---|---|---|
| committer | Eric Reed <ereed@mozilla.com> | 2013-06-14 12:04:11 -0700 |
| commit | a7f92c92ed07588c3bde3cc38e64b9289ea682f5 (patch) | |
| tree | 2dbfa471a12238f9c06b0ab4cb124821fa1903ac /src/libstd | |
| parent | 03fe59aefa6ec84531335fd93e8dbf44dee65570 (diff) | |
| download | rust-a7f92c92ed07588c3bde3cc38e64b9289ea682f5.tar.gz rust-a7f92c92ed07588c3bde3cc38e64b9289ea682f5.zip | |
Added a UdpWatcher and UdpSendRequest with associated callbacks
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/rt/uv/mod.rs | 15 | ||||
| -rw-r--r-- | src/libstd/rt/uv/net.rs | 174 |
2 files changed, 184 insertions, 5 deletions
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 5f9e5660814..f7cc5c6cc8b 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -54,7 +54,7 @@ use rt::io::IoError; #[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; -pub use self::net::{StreamWatcher, TcpWatcher}; +pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; @@ -128,6 +128,8 @@ pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>); pub type FsCallback = ~fn(FsRequest, Option<UvError>); pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>); pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>); +pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, Ipv4, uint, Option<UvError>); +pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle @@ -139,7 +141,9 @@ struct WatcherData { alloc_cb: Option<AllocCallback>, idle_cb: Option<IdleCallback>, timer_cb: Option<TimerCallback>, - async_cb: Option<AsyncCallback> + async_cb: Option<AsyncCallback>, + udp_recv_cb: Option<UdpReceiveCallback>, + udp_send_cb: Option<UdpSendCallback> } pub trait WatcherInterop { @@ -169,7 +173,9 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { alloc_cb: None, idle_cb: None, timer_cb: None, - async_cb: None + async_cb: None, + udp_recv_cb: None, + udp_send_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -309,6 +315,9 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> /// The uv buffer type pub type Buf = uvll::uv_buf_t; +/// The uv IPv4 type +pub type Ipv4 = uvll::sockaddr_in; + /// Borrow a slice to a Buf pub fn slice_to_uv_buf(v: &[u8]) -> Buf { let data = vec::raw::to_ptr(v); diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index 68b871e6b31..4079a2f7b77 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -9,10 +9,10 @@ // except according to those terms. use prelude::*; -use libc::{size_t, ssize_t, c_int, c_void}; +use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use rt::uv::uvll; use rt::uv::uvll::*; -use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback}; +use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback}; use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, status_to_maybe_uv_error}; use rt::io::net::ip::{IpAddr, Ipv4, Ipv6}; @@ -254,6 +254,142 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { } } +pub struct UdpWatcher(*uvll::uv_udp_t); +impl Watcher for UdpWatcher { } + +pub impl UdpWatcher { + fn new(loop_: &mut Loop) -> UdpWatcher { + unsafe { + let handle = malloc_handle(UV_UDP); + assert!(handle.is_not_null()); + assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle)); + let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher; + } + } + + fn bind(&mut self, address: IpAddr) -> Result<(), UvError> { + match address { + Ipv4(*) => { + do ip4_as_uv_ip4(address) |addr| { + let result = unsafe { + uvll::udp_bind(self.native_handle(), addr, 0u32) + }; + if result == 0 { + Ok(()) + } else { + Err(last_uv_error(self)) + } + } + } + _ => fail!() // TODO ipv6 + } + } + + fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) { + { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.udp_recv_cb = Some(cb); + } + + let handle = self.native_handle(); + unsafe { uvll::read_start(handle, alloc_cb, recv_cb); } + + extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf { + let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + let data = udp_watcher.get_watcher_data(); + let alloc_cb = data.alloc_cb.get_ref(); + return (*alloc_cb)(suggested_size as uint); + } + + /* TODO the socket address should actually be a pointer to either a sockaddr_in or sockaddr_in6. + In libuv, the udp_recv callback takes a struct *sockaddr */ + extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, + address: *uvll::sockaddr_in, flags: c_uint) { + rtdebug!("buf addr: %x", buf.base as uint); + rtdebug!("buf len: %d", buf.len as int); + let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + let data = udp_watcher.get_watcher_data(); + let cb = data.udp_recv_cb.get_ref(); + let status = status_to_maybe_uv_error(handle, nread as c_int); + unsafe { (*cb)(udp_watcher, nread as int, buf, *address, flags as uint, status) }; + } + } + + fn recv_stop(&mut self) { + let handle = self.native_handle(); + unsafe { uvll::udp_recv_stop(handle); } + } + + fn send(&mut self, buf: Buf, address: IpAddr, cb: UdpSendCallback) { + { + let data = self.get_watcher_data(); + assert!(data.udp_send_cb.is_none()); + data.udp_send_cb = Some(cb); + } + + let req = UdpSendRequest::new(); + let bufs = [buf]; + match address { + Ipv4(*) => { + do ip4_as_uv_ip4(address) |addr| { + unsafe { + assert!(0 == uvll::udp_send(req.native_handle(), + self.native_handle(), + bufs, addr, send_cb)); + } + } + } + _ => fail!() // TODO ipv6 + } + + extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { + let send_request: UdpSendRequest = NativeHandle::from_native_handle(req); + let mut udp_watcher = send_request.handle(); + send_request.delete(); + let cb = { + let data = udp_watcher.get_watcher_data(); + let cb = data.udp_send_cb.swap_unwrap(); + cb + }; + let status = status_to_maybe_uv_error(udp_watcher.native_handle(), status); + cb(udp_watcher, status); + } + } + + fn close(self, cb: NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_udp_t) { + let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + { + let data = udp_watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + udp_watcher.drop_watcher_data(); + unsafe { free_handle(handle as *c_void) } + } + } +} + +impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { + fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher { + UdpWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_udp_t { + match self { &UdpWatcher(ptr) => ptr } + } +} + // uv_connect_t is a subclass of uv_req_t struct ConnectRequest(*uvll::uv_connect_t); impl Request for ConnectRequest { } @@ -327,6 +463,40 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest { } } +pub struct UdpSendRequest(*uvll::uv_udp_send_t); + +impl Request for UdpSendRequest { } + +pub impl UdpSendRequest { + fn new() -> UdpSendRequest { + let send_handle = unsafe { + malloc_req(UV_UDP_SEND) + }; + assert!(send_handle.is_not_null()); + let send_handle = send_handle as *uvll::uv_udp_send_t; + UdpSendRequest(send_handle) + } + + fn handle(&self) -> UdpWatcher { + unsafe { + let udp_handle = uvll::get_udp_handle_from_send_req(self.native_handle()); + NativeHandle::from_native_handle(udp_handle) + } + } + + fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest { + fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest { + UdpSendRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_udp_send_t { + match self { &UdpSendRequest(ptr) => ptr } + } +} #[cfg(test)] mod test { |
