about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorEric Reed <ecreed@cs.washington.edu>2013-08-07 02:57:33 -0700
committerEric Reed <ecreed@cs.washington.edu>2013-08-19 16:26:50 -0700
commitd7b6fcba2978cbbcfccce83e6f4f54c5eec998f3 (patch)
tree5c32b1b39f4e67259b05c9491da58540160d2b8d /src/libstd
parent88f718341ec279738c07f83289058aadf7c5d235 (diff)
downloadrust-d7b6fcba2978cbbcfccce83e6f4f54c5eec998f3.tar.gz
rust-d7b6fcba2978cbbcfccce83e6f4f54c5eec998f3.zip
Working homing UDP socket prototype.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/uv/uvio.rs152
1 files changed, 151 insertions, 1 deletions
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index d4794da9b0f..07ba44101c8 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr};
 use rt::io::{standard_error, OtherIoError};
 use rt::local::Local;
 use rt::rtio::*;
-use rt::sched::Scheduler;
+use rt::sched::{Scheduler, SchedHandle};
 use rt::tube::Tube;
 use rt::uv::*;
 use rt::uv::idle::IdleWatcher;
@@ -239,6 +239,27 @@ impl UvIoFactory {
     pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
         match self { &UvIoFactory(ref mut ptr) => ptr }
     }
+
+    pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
+        let mut watcher = UdpWatcher::new(self.uv_loop());
+        match watcher.bind(addr) {
+            Ok(_) => {
+                let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
+                Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
+            }
+            Err(uverr) => {
+                let scheduler = Local::take::<Scheduler>();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    let task_cell = Cell::new(task);
+                    do watcher.close {
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
+    }
 }
 
 impl IoFactory for UvIoFactory {
@@ -582,6 +603,135 @@ impl RtioTcpStream for UvTcpStream {
     }
 }
 
+pub struct HomedUvUdpSocket {
+    watcher: UdpWatcher,
+    home: SchedHandle,
+}
+
+impl HomedUvUdpSocket {
+    fn go_home(&mut self) {
+        use rt::sched::PinnedTask;
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            do task.wake().map_move |task| { self.home.send(PinnedTask(task)); };
+        }
+    }
+}
+
+impl Drop for HomedUvUdpSocket {
+    fn drop(&self) {
+        rtdebug!("closing homed udp socket");
+        // first go home
+        // XXX need mutable finalizer
+        let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) };
+        this.go_home();
+        // now we're home so block the task and start IO
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell::new(task);
+            do this.watcher.close {
+                // now IO is finished so resume the blocked task
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_blocked_task_immediately(task_cell.take());
+            }
+        }
+    }
+}
+
+impl RtioSocket for HomedUvUdpSocket {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        self.go_home();
+        socket_name(Udp, self.watcher)
+    }
+}
+
+#[test]
+fn test_simple_homed_udp_io_bind_only() {
+    do run_in_newsched_task {
+        unsafe {
+            let io = Local::unsafe_borrow::<IoFactoryObject>();
+            let addr = next_test_ip4();
+            let maybe_socket = (*io).homed_udp_bind(addr);
+            assert!(maybe_socket.is_ok());
+        }
+    }
+}
+
+#[test]
+fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::sched::{Shutdown, TaskFromFriend};
+    do run_in_bare_thread {
+	let sleepers = SleeperList::new();
+	let work_queue1 = WorkQueue::new();
+	let work_queue2 = WorkQueue::new();
+	let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+	let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
+					 sleepers.clone());
+	let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
+					 sleepers.clone());
+
+	let handle1 = Cell::new(sched1.make_handle());
+	let handle2 = Cell::new(sched2.make_handle());
+	let tasksFriendHandle = Cell::new(sched2.make_handle());
+
+	let on_exit: ~fn(bool) = |exit_status| {
+	    handle1.take().send(Shutdown);
+	    handle2.take().send(Shutdown);
+	    rtassert!(exit_status);
+	};
+
+	let test_function: ~fn() = || {
+	    let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+	    let addr = next_test_ip4();
+	    let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
+	    // this socket is bound to this event loop
+	    assert!(maybe_socket.is_ok());
+
+	    // block self on sched1
+	    let scheduler = Local::take::<Scheduler>();
+	    do scheduler.deschedule_running_task_and_then |_, task| {
+		// unblock task
+		do task.wake().map_move |task| {
+		  // send self to sched2
+		  tasksFriendHandle.take().send(TaskFromFriend(task));
+		};
+		// sched1 should now sleep since it has nothing else to do
+	    }
+	    // sched2 will wake up and get the task
+	    // as we do nothing else, the function ends and the socket goes out of scope
+	    // sched2 will start to run the destructor
+	    // the destructor will first block the task, set it's home as sched1, then enqueue it
+	    // sched2 will dequeue the task, see that it has a home, and send it to sched1
+	    // sched1 will wake up, execute the close function on the correct loop, and then we're done
+	};
+
+	let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
+	main_task.death.on_exit = Some(on_exit);
+	let main_task = Cell::new(main_task);
+
+	let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
+
+	let sched1 = Cell::new(sched1);
+	let sched2 = Cell::new(sched2);
+
+	// XXX could there be a race on the threads that causes a crash?
+	let thread1 = do Thread::start {
+	    sched1.take().bootstrap(main_task.take());
+	};
+	let thread2 = do Thread::start {
+	    sched2.take().bootstrap(null_task.take());
+	};
+
+	thread1.join();
+	thread2.join();
+    }
+}
+
 pub struct UvUdpSocket(UdpWatcher);
 
 impl Drop for UvUdpSocket {