about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-13 16:56:50 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-24 19:59:53 -0800
commit14caf00c92b40e3f62094db54f325196c8a05d5a (patch)
tree0b44af85b516274a99d19098eb7506a9d92ca4ae
parent3893716390f2c4857b7e8b1705a6344f96b85bb6 (diff)
downloadrust-14caf00c92b40e3f62094db54f325196c8a05d5a.tar.gz
rust-14caf00c92b40e3f62094db54f325196c8a05d5a.zip
rustuv: Write homing tests with SchedPool
Use the previous commit's new scheduler pool abstraction in libgreen to write
some homing tests which force an I/O handle to be homed from one event loop to
another.
-rw-r--r--src/libgreen/lib.rs53
-rw-r--r--src/libgreen/sched.rs5
-rw-r--r--src/libgreen/task.rs19
-rw-r--r--src/librustuv/homing.rs153
4 files changed, 84 insertions, 146 deletions
diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs
index f4903ea38d2..9a3f27f7dbc 100644
--- a/src/libgreen/lib.rs
+++ b/src/libgreen/lib.rs
@@ -35,7 +35,6 @@ use std::os;
 use std::rt::thread::Thread;
 use std::rt;
 use std::rt::crate_map;
-use std::rt::task::Task;
 use std::rt::rtio;
 use std::sync::deque;
 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
@@ -105,7 +104,7 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
 /// This function will not return until all schedulers in the associated pool
 /// have returned.
 pub fn run(main: proc()) -> int {
-    let mut pool = Pool::new(Config::new());
+    let mut pool = SchedPool::new(PoolConfig::new());
     pool.spawn(TaskOpts::new(), main);
     unsafe { stdtask::wait_for_completion(); }
     pool.shutdown();
@@ -113,16 +112,16 @@ pub fn run(main: proc()) -> int {
 }
 
 /// Configuration of how an M:N pool of schedulers is spawned.
-pub struct Config {
+pub struct PoolConfig {
     /// The number of schedulers (OS threads) to spawn into this M:N pool.
     threads: uint,
 }
 
-impl Config {
+impl PoolConfig {
     /// Returns the default configuration, as determined the the environment
     /// variables of this process.
-    pub fn new() -> Config {
-        Config {
+    pub fn new() -> PoolConfig {
+        PoolConfig {
             threads: rt::default_sched_threads(),
         }
     }
@@ -130,7 +129,7 @@ impl Config {
 
 /// A structure representing a handle to a pool of schedulers. This handle is
 /// used to keep the pool alive and also reap the status from the pool.
-pub struct Pool {
+pub struct SchedPool {
     priv id: uint,
     priv threads: ~[Thread<()>],
     priv handles: ~[SchedHandle],
@@ -141,19 +140,19 @@ pub struct Pool {
     priv sleepers: SleeperList,
 }
 
-impl Pool {
+impl SchedPool {
     /// Execute the main function in a pool of M:N schedulers.
     ///
     /// This will configure the pool according to the `config` parameter, and
     /// initially run `main` inside the pool of schedulers.
-    pub fn new(config: Config) -> Pool {
+    pub fn new(config: PoolConfig) -> SchedPool {
         static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
 
-        let Config { threads: nscheds } = config;
+        let PoolConfig { threads: nscheds } = config;
         assert!(nscheds > 0);
 
         // The pool of schedulers that will be returned from this function
-        let mut pool = Pool {
+        let mut pool = SchedPool {
             threads: ~[],
             handles: ~[],
             stealers: ~[],
@@ -185,10 +184,9 @@ impl Pool {
             let sched = sched;
             pool.threads.push(do Thread::start {
                 let mut sched = sched;
-                let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
+                let task = do GreenTask::new(&mut sched.stack_pool, None) {
                     rtdebug!("boostraping a non-primary scheduler");
                 };
-                task.put_task(~Task::new());
                 sched.bootstrap(task);
             });
         }
@@ -196,19 +194,12 @@ impl Pool {
         return pool;
     }
 
-    pub fn shutdown(mut self) {
-        self.stealers = ~[];
-
-        for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
-            handle.send(Shutdown);
-        }
-        for thread in util::replace(&mut self.threads, ~[]).move_iter() {
-            thread.join();
-        }
+    pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
+        GreenTask::configure(&mut self.stack_pool, opts, f)
     }
 
     pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
-        let task = GreenTask::configure(&mut self.stack_pool, opts, f);
+        let task = self.task(opts, f);
 
         // Figure out someone to send this task to
         let idx = self.next_friend;
@@ -250,18 +241,28 @@ impl Pool {
         let sched = sched;
         self.threads.push(do Thread::start {
             let mut sched = sched;
-            let mut task = do GreenTask::new(&mut sched.stack_pool, None) {
+            let task = do GreenTask::new(&mut sched.stack_pool, None) {
                 rtdebug!("boostraping a non-primary scheduler");
             };
-            task.put_task(~Task::new());
             sched.bootstrap(task);
         });
 
         return ret;
     }
+
+    pub fn shutdown(mut self) {
+        self.stealers = ~[];
+
+        for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
+            handle.send(Shutdown);
+        }
+        for thread in util::replace(&mut self.threads, ~[]).move_iter() {
+            thread.join();
+        }
+    }
 }
 
-impl Drop for Pool {
+impl Drop for SchedPool {
     fn drop(&mut self) {
         if self.threads.len() > 0 {
             fail!("dropping a M:N scheduler pool that wasn't shut down");
diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs
index e349ae1e601..70fd6768c8b 100644
--- a/src/libgreen/sched.rs
+++ b/src/libgreen/sched.rs
@@ -178,9 +178,8 @@ impl Scheduler {
         self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
 
         // Create a task for the scheduler with an empty context.
-        let mut sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
-                                                  TypeSched);
-        sched_task.put_task(~Task::new());
+        let sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
+                                              TypeSched);
 
         // Before starting our first task, make sure the idle callback
         // is active. As we do not start in the sleep state this is
diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs
index e07d7f2413f..d37ab8bba57 100644
--- a/src/libgreen/task.rs
+++ b/src/libgreen/task.rs
@@ -85,7 +85,7 @@ impl GreenTask {
             sched: None,
             handle: None,
             nasty_deschedule_lock: unsafe { Mutex::new() },
-            task: None,
+            task: Some(~Task::new()),
         }
     }
 
@@ -101,16 +101,17 @@ impl GreenTask {
         } = opts;
 
         let mut green = GreenTask::new(pool, stack_size, f);
-        let mut task = ~Task::new();
-        task.name = name;
-        match notify_chan {
-            Some(chan) => {
-                let on_exit = proc(task_result) { chan.send(task_result) };
-                task.death.on_exit = Some(on_exit);
+        {
+            let task = green.task.get_mut_ref();
+            task.name = name;
+            match notify_chan {
+                Some(chan) => {
+                    let on_exit = proc(task_result) { chan.send(task_result) };
+                    task.death.on_exit = Some(on_exit);
+                }
+                None => {}
             }
-            None => {}
         }
-        green.put_task(task);
         return green;
     }
 
diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs
index d09dfae0b29..1f9e3831e20 100644
--- a/src/librustuv/homing.rs
+++ b/src/librustuv/homing.rs
@@ -145,121 +145,58 @@ impl Drop for HomingMissile {
 
 #[cfg(test)]
 mod test {
+    use green::sched;
+    use green::{SchedPool, PoolConfig};
+    use std::rt::rtio::RtioUdpSocket;
+    use std::io::test::next_test_ip4;
+    use std::task::TaskOpts;
+
+    use net::UdpWatcher;
+    use super::super::local_loop;
+
     // On one thread, create a udp socket. Then send that socket to another
     // thread and destroy the socket on the remote thread. This should make sure
     // that homing kicks in for the socket to go back home to the original
     // thread, close itself, and then come back to the last thread.
-    //#[test]
-    //fn test_homing_closes_correctly() {
-    //    let (port, chan) = Chan::new();
-
-    //    do task::spawn_sched(task::SingleThreaded) {
-    //        let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
-    //        chan.send(listener);
-    //    }
-
-    //    do task::spawn_sched(task::SingleThreaded) {
-    //        port.recv();
-    //    }
-    //}
-
-    // This is a bit of a crufty old test, but it has its uses.
-    //#[test]
-    //fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
-    //    use std::cast;
-    //    use std::rt::local::Local;
-    //    use std::rt::rtio::{EventLoop, IoFactory};
-    //    use std::rt::sched::Scheduler;
-    //    use std::rt::sched::{Shutdown, TaskFromFriend};
-    //    use std::rt::sleeper_list::SleeperList;
-    //    use std::rt::task::Task;
-    //    use std::rt::task::UnwindResult;
-    //    use std::rt::thread::Thread;
-    //    use std::rt::deque::BufferPool;
-    //    use std::unstable::run_in_bare_thread;
-    //    use uvio::UvEventLoop;
-
-    //    do run_in_bare_thread {
-    //        let sleepers = SleeperList::new();
-    //        let mut pool = BufferPool::new();
-    //        let (worker1, stealer1) = pool.deque();
-    //        let (worker2, stealer2) = pool.deque();
-    //        let queues = ~[stealer1, stealer2];
-
-    //        let loop1 = ~UvEventLoop::new() as ~EventLoop;
-    //        let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
-    //                                         sleepers.clone());
-    //        let loop2 = ~UvEventLoop::new() as ~EventLoop;
-    //        let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
-    //                                         sleepers.clone());
-
-    //        let handle1 = sched1.make_handle();
-    //        let handle2 = sched2.make_handle();
-    //        let tasksFriendHandle = sched2.make_handle();
-
-    //        let on_exit: proc(UnwindResult) = proc(exit_status) {
-    //            let mut handle1 = handle1;
-    //            let mut handle2 = handle2;
-    //            handle1.send(Shutdown);
-    //            handle2.send(Shutdown);
-    //            assert!(exit_status.is_success());
-    //        };
-
-    //        unsafe fn local_io() -> &'static mut IoFactory {
-    //            let mut sched = Local::borrow(None::<Scheduler>);
-    //            let io = sched.get().event_loop.io();
-    //            cast::transmute(io.unwrap())
-    //        }
-
-    //        let test_function: proc() = proc() {
-    //            let io = unsafe { local_io() };
-    //            let addr = next_test_ip4();
-    //            let maybe_socket = io.udp_bind(addr);
-    //            // this socket is bound to this event loop
-    //            assert!(maybe_socket.is_ok());
-
-    //            // block self on sched1
-    //            let scheduler: ~Scheduler = Local::take();
-    //            let mut tasksFriendHandle = Some(tasksFriendHandle);
-    //            scheduler.deschedule_running_task_and_then(|_, task| {
-    //                // unblock task
-    //                task.wake().map(|task| {
-    //                    // send self to sched2
-    //                    tasksFriendHandle.take_unwrap()
-    //                                     .send(TaskFromFriend(task));
-    //                });
-    //                // sched1 should now sleep since it has nothing else to do
-    //            })
-    //            // sched2 will wake up and get the task as we do nothing else,
-    //            // the function ends and the socket goes out of scope sched2
-    //            // will start to run the destructor the destructor will first
-    //            // block the task, set it's home as sched1, then enqueue it
-    //            // sched2 will dequeue the task, see that it has a home, and
-    //            // send it to sched1 sched1 will wake up, exec the close
-    //            // function on the correct loop, and then we're done
-    //        };
+    #[test]
+    fn test_homing_closes_correctly() {
+        let (port, chan) = Chan::new();
+        let mut pool = SchedPool::new(PoolConfig { threads: 1 });
+
+        do pool.spawn(TaskOpts::new()) {
+            let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
+            chan.send(listener.unwrap());
+        }
 
-    //        let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
-    //                                            test_function);
-    //        main_task.death.on_exit = Some(on_exit);
+        let task = do pool.task(TaskOpts::new()) {
+            port.recv();
+        };
+        pool.spawn_sched().send(sched::TaskFromFriend(task));
 
-    //        let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
-    //            // nothing
-    //        };
+        pool.shutdown();
+    }
 
-    //        let main_task = main_task;
-    //        let sched1 = sched1;
-    //        let thread1 = do Thread::start {
-    //            sched1.bootstrap(main_task);
-    //        };
+    #[test]
+    fn test_homing_read() {
+        let (port, chan) = Chan::new();
+        let mut pool = SchedPool::new(PoolConfig { threads: 1 });
+
+        do pool.spawn(TaskOpts::new()) {
+            let addr1 = next_test_ip4();
+            let addr2 = next_test_ip4();
+            let listener = UdpWatcher::bind(local_loop(), addr2);
+            chan.send((listener.unwrap(), addr1));
+            let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
+            listener.sendto([1, 2, 3, 4], addr2);
+        }
 
-    //        let sched2 = sched2;
-    //        let thread2 = do Thread::start {
-    //            sched2.bootstrap(null_task);
-    //        };
+        let task = do pool.task(TaskOpts::new()) {
+            let (mut watcher, addr) = port.recv();
+            let mut buf = [0, ..10];
+            assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
+        };
+        pool.spawn_sched().send(sched::TaskFromFriend(task));
 
-    //        thread1.join();
-    //        thread2.join();
-    //    }
-    //}
+        pool.shutdown();
+    }
 }