about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorEric Reed <ecreed@cs.washington.edu>2013-08-07 04:05:06 -0700
committerEric Reed <ecreed@cs.washington.edu>2013-08-19 16:26:50 -0700
commitd09412ab893f54ef5309cf63d17bcb6110d582b9 (patch)
treea671032792b1fa050e395dc05c0d9aea5c8c8f3f /src/libstd
parentd7b6fcba2978cbbcfccce83e6f4f54c5eec998f3 (diff)
downloadrust-d09412ab893f54ef5309cf63d17bcb6110d582b9.tar.gz
rust-d09412ab893f54ef5309cf63d17bcb6110d582b9.zip
Homed UDP sockets
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/rtio.rs2
-rw-r--r--src/libstd/rt/uv/uvio.rs235
2 files changed, 213 insertions, 24 deletions
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index a7c794fb5f1..2bec782847b 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback;
 pub type IoFactoryObject = uvio::UvIoFactory;
 pub type RtioTcpStreamObject = uvio::UvTcpStream;
 pub type RtioTcpListenerObject = uvio::UvTcpListener;
-pub type RtioUdpSocketObject = uvio::UvUdpSocket;
+pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket;
 pub type RtioTimerObject = uvio::UvTimer;
 
 pub trait EventLoop {
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 07ba44101c8..43be09434a4 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -239,27 +239,6 @@ impl UvIoFactory {
     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
         match self { &UvIoFactory(ref mut ptr) => ptr }
     }
-
-    pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
-        let mut watcher = UdpWatcher::new(self.uv_loop());
-        match watcher.bind(addr) {
-            Ok(_) => {
-                let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
-                Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
-            }
-            Err(uverr) => {
-                let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    let task_cell = Cell::new(task);
-                    do watcher.close {
-                        let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_blocked_task_immediately(task_cell.take());
-                    }
-                }
-                Err(uv_error_to_io_error(uverr))
-            }
-        }
-    }
 }
 
 impl IoFactory for UvIoFactory {
@@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory {
         }
     }
 
+    /*
     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
         let mut watcher = UdpWatcher::new(self.uv_loop());
         match watcher.bind(addr) {
@@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory {
             }
         }
     }
+    */
+
+    pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> {
+        let mut watcher = UdpWatcher::new(self.uv_loop());
+        match watcher.bind(addr) {
+            Ok(_) => {
+                let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
+                Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
+            }
+            Err(uverr) => {
+                let scheduler = Local::take::<Scheduler>();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    let task_cell = Cell::new(task);
+                    do watcher.close {
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
+    }
 
     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
         Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
@@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket {
 
 impl RtioSocket for HomedUvUdpSocket {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+	// first go home
         self.go_home();
         socket_name(Udp, self.watcher)
     }
 }
 
+impl RtioUdpSocket for HomedUvUdpSocket {
+    fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
+	// first go home
+	self.go_home();
+
+	let result_cell = Cell::new_empty();
+	let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
+
+	let scheduler = Local::take::<Scheduler>();
+	let buf_ptr: *&mut [u8] = &buf;
+	do scheduler.deschedule_running_task_and_then |_, task| {
+	    rtdebug!("recvfrom: entered scheduler context");
+	    let task_cell = Cell::new(task);
+	    let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
+	    do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
+		let _ = flags; // /XXX add handling for partials?
+
+		watcher.recv_stop();
+
+		let result = match status {
+		    None => {
+			assert!(nread >= 0);
+			Ok((nread as uint, addr))
+		    }
+		    Some(err) => Err(uv_error_to_io_error(err)),
+		};
+
+		unsafe { (*result_cell_ptr).put_back(result); }
+
+		let scheduler = Local::take::<Scheduler>();
+		scheduler.resume_blocked_task_immediately(task_cell.take());
+	    }
+	}
+
+	assert!(!result_cell.is_empty());
+	return result_cell.take();
+    }
+
+    fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+	let result_cell = Cell::new_empty();
+	let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+        let scheduler = Local::take::<Scheduler>();
+        let buf_ptr: *&[u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+            do self.watcher.send(buf, dst) |_watcher, status| {
+
+                let result = match status {
+                    None => Ok(()),
+                    Some(err) => Err(uv_error_to_io_error(err)),
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        return result_cell.take();
+    }
+
+    fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            do multi.to_str().as_c_str |m_addr| {
+                uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
+                                         ptr::null(), uvll::UV_JOIN_GROUP)
+            }
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            do multi.to_str().as_c_str |m_addr| {
+                uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
+                                         ptr::null(), uvll::UV_LEAVE_GROUP)
+            }
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn hear_broadcasts(&mut self) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+
+    fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
+	// first go home
+	self.go_home();
+
+        let r = unsafe {
+            uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int)
+        };
+
+        match status_to_maybe_uv_error(self.watcher, r) {
+            Some(err) => Err(uv_error_to_io_error(err)),
+            None => Ok(())
+        }
+    }
+}
+
 #[test]
 fn test_simple_homed_udp_io_bind_only() {
     do run_in_newsched_task {
         unsafe {
             let io = Local::unsafe_borrow::<IoFactoryObject>();
             let addr = next_test_ip4();
-            let maybe_socket = (*io).homed_udp_bind(addr);
+            let maybe_socket = (*io)./*homed_*/udp_bind(addr);
             assert!(maybe_socket.is_ok());
         }
     }
@@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
 	let test_function: ~fn() = || {
 	    let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
 	    let addr = next_test_ip4();
-	    let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
+	    let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) };
 	    // this socket is bound to this event loop
 	    assert!(maybe_socket.is_ok());