diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-13 16:56:50 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-24 19:59:53 -0800 |
| commit | 14caf00c92b40e3f62094db54f325196c8a05d5a (patch) | |
| tree | 0b44af85b516274a99d19098eb7506a9d92ca4ae | |
| parent | 3893716390f2c4857b7e8b1705a6344f96b85bb6 (diff) | |
| download | rust-14caf00c92b40e3f62094db54f325196c8a05d5a.tar.gz rust-14caf00c92b40e3f62094db54f325196c8a05d5a.zip | |
rustuv: Write homing tests with SchedPool
Use the previous commit's new scheduler pool abstraction in libgreen to write some homing tests which force an I/O handle to be homed from one event loop to another.
| -rw-r--r-- | src/libgreen/lib.rs | 53 | ||||
| -rw-r--r-- | src/libgreen/sched.rs | 5 | ||||
| -rw-r--r-- | src/libgreen/task.rs | 19 | ||||
| -rw-r--r-- | src/librustuv/homing.rs | 153 |
4 files changed, 84 insertions, 146 deletions
diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index f4903ea38d2..9a3f27f7dbc 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -35,7 +35,6 @@ use std::os; use std::rt::thread::Thread; use std::rt; use std::rt::crate_map; -use std::rt::task::Task; use std::rt::rtio; use std::sync::deque; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; @@ -105,7 +104,7 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { /// This function will not return until all schedulers in the associated pool /// have returned. pub fn run(main: proc()) -> int { - let mut pool = Pool::new(Config::new()); + let mut pool = SchedPool::new(PoolConfig::new()); pool.spawn(TaskOpts::new(), main); unsafe { stdtask::wait_for_completion(); } pool.shutdown(); @@ -113,16 +112,16 @@ pub fn run(main: proc()) -> int { } /// Configuration of how an M:N pool of schedulers is spawned. -pub struct Config { +pub struct PoolConfig { /// The number of schedulers (OS threads) to spawn into this M:N pool. threads: uint, } -impl Config { +impl PoolConfig { /// Returns the default configuration, as determined the the environment /// variables of this process. - pub fn new() -> Config { - Config { + pub fn new() -> PoolConfig { + PoolConfig { threads: rt::default_sched_threads(), } } @@ -130,7 +129,7 @@ impl Config { /// A structure representing a handle to a pool of schedulers. This handle is /// used to keep the pool alive and also reap the status from the pool. -pub struct Pool { +pub struct SchedPool { priv id: uint, priv threads: ~[Thread<()>], priv handles: ~[SchedHandle], @@ -141,19 +140,19 @@ pub struct Pool { priv sleepers: SleeperList, } -impl Pool { +impl SchedPool { /// Execute the main function in a pool of M:N schedulers. /// /// This will configure the pool according to the `config` parameter, and /// initially run `main` inside the pool of schedulers. - pub fn new(config: Config) -> Pool { + pub fn new(config: PoolConfig) -> SchedPool { static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT; - let Config { threads: nscheds } = config; + let PoolConfig { threads: nscheds } = config; assert!(nscheds > 0); // The pool of schedulers that will be returned from this function - let mut pool = Pool { + let mut pool = SchedPool { threads: ~[], handles: ~[], stealers: ~[], @@ -185,10 +184,9 @@ impl Pool { let sched = sched; pool.threads.push(do Thread::start { let mut sched = sched; - let mut task = do GreenTask::new(&mut sched.stack_pool, None) { + let task = do GreenTask::new(&mut sched.stack_pool, None) { rtdebug!("boostraping a non-primary scheduler"); }; - task.put_task(~Task::new()); sched.bootstrap(task); }); } @@ -196,19 +194,12 @@ impl Pool { return pool; } - pub fn shutdown(mut self) { - self.stealers = ~[]; - - for mut handle in util::replace(&mut self.handles, ~[]).move_iter() { - handle.send(Shutdown); - } - for thread in util::replace(&mut self.threads, ~[]).move_iter() { - thread.join(); - } + pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask { + GreenTask::configure(&mut self.stack_pool, opts, f) } pub fn spawn(&mut self, opts: TaskOpts, f: proc()) { - let task = GreenTask::configure(&mut self.stack_pool, opts, f); + let task = self.task(opts, f); // Figure out someone to send this task to let idx = self.next_friend; @@ -250,18 +241,28 @@ impl Pool { let sched = sched; self.threads.push(do Thread::start { let mut sched = sched; - let mut task = do GreenTask::new(&mut sched.stack_pool, None) { + let task = do GreenTask::new(&mut sched.stack_pool, None) { rtdebug!("boostraping a non-primary scheduler"); }; - task.put_task(~Task::new()); sched.bootstrap(task); }); return ret; } + + pub fn shutdown(mut self) { + self.stealers = ~[]; + + for mut handle in util::replace(&mut self.handles, ~[]).move_iter() { + handle.send(Shutdown); + } + for thread in util::replace(&mut self.threads, ~[]).move_iter() { + thread.join(); + } + } } -impl Drop for Pool { +impl Drop for SchedPool { fn drop(&mut self) { if self.threads.len() > 0 { fail!("dropping a M:N scheduler pool that wasn't shut down"); diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index e349ae1e601..70fd6768c8b 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -178,9 +178,8 @@ impl Scheduler { self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb)); // Create a task for the scheduler with an empty context. - let mut sched_task = GreenTask::new_typed(Some(Coroutine::empty()), - TypeSched); - sched_task.put_task(~Task::new()); + let sched_task = GreenTask::new_typed(Some(Coroutine::empty()), + TypeSched); // Before starting our first task, make sure the idle callback // is active. As we do not start in the sleep state this is diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index e07d7f2413f..d37ab8bba57 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -85,7 +85,7 @@ impl GreenTask { sched: None, handle: None, nasty_deschedule_lock: unsafe { Mutex::new() }, - task: None, + task: Some(~Task::new()), } } @@ -101,16 +101,17 @@ impl GreenTask { } = opts; let mut green = GreenTask::new(pool, stack_size, f); - let mut task = ~Task::new(); - task.name = name; - match notify_chan { - Some(chan) => { - let on_exit = proc(task_result) { chan.send(task_result) }; - task.death.on_exit = Some(on_exit); + { + let task = green.task.get_mut_ref(); + task.name = name; + match notify_chan { + Some(chan) => { + let on_exit = proc(task_result) { chan.send(task_result) }; + task.death.on_exit = Some(on_exit); + } + None => {} } - None => {} } - green.put_task(task); return green; } diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs index d09dfae0b29..1f9e3831e20 100644 --- a/src/librustuv/homing.rs +++ b/src/librustuv/homing.rs @@ -145,121 +145,58 @@ impl Drop for HomingMissile { #[cfg(test)] mod test { + use green::sched; + use green::{SchedPool, PoolConfig}; + use std::rt::rtio::RtioUdpSocket; + use std::io::test::next_test_ip4; + use std::task::TaskOpts; + + use net::UdpWatcher; + use super::super::local_loop; + // On one thread, create a udp socket. Then send that socket to another // thread and destroy the socket on the remote thread. This should make sure // that homing kicks in for the socket to go back home to the original // thread, close itself, and then come back to the last thread. - //#[test] - //fn test_homing_closes_correctly() { - // let (port, chan) = Chan::new(); - - // do task::spawn_sched(task::SingleThreaded) { - // let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); - // chan.send(listener); - // } - - // do task::spawn_sched(task::SingleThreaded) { - // port.recv(); - // } - //} - - // This is a bit of a crufty old test, but it has its uses. - //#[test] - //fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { - // use std::cast; - // use std::rt::local::Local; - // use std::rt::rtio::{EventLoop, IoFactory}; - // use std::rt::sched::Scheduler; - // use std::rt::sched::{Shutdown, TaskFromFriend}; - // use std::rt::sleeper_list::SleeperList; - // use std::rt::task::Task; - // use std::rt::task::UnwindResult; - // use std::rt::thread::Thread; - // use std::rt::deque::BufferPool; - // use std::unstable::run_in_bare_thread; - // use uvio::UvEventLoop; - - // do run_in_bare_thread { - // let sleepers = SleeperList::new(); - // let mut pool = BufferPool::new(); - // let (worker1, stealer1) = pool.deque(); - // let (worker2, stealer2) = pool.deque(); - // let queues = ~[stealer1, stealer2]; - - // let loop1 = ~UvEventLoop::new() as ~EventLoop; - // let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(), - // sleepers.clone()); - // let loop2 = ~UvEventLoop::new() as ~EventLoop; - // let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(), - // sleepers.clone()); - - // let handle1 = sched1.make_handle(); - // let handle2 = sched2.make_handle(); - // let tasksFriendHandle = sched2.make_handle(); - - // let on_exit: proc(UnwindResult) = proc(exit_status) { - // let mut handle1 = handle1; - // let mut handle2 = handle2; - // handle1.send(Shutdown); - // handle2.send(Shutdown); - // assert!(exit_status.is_success()); - // }; - - // unsafe fn local_io() -> &'static mut IoFactory { - // let mut sched = Local::borrow(None::<Scheduler>); - // let io = sched.get().event_loop.io(); - // cast::transmute(io.unwrap()) - // } - - // let test_function: proc() = proc() { - // let io = unsafe { local_io() }; - // let addr = next_test_ip4(); - // let maybe_socket = io.udp_bind(addr); - // // this socket is bound to this event loop - // assert!(maybe_socket.is_ok()); - - // // block self on sched1 - // let scheduler: ~Scheduler = Local::take(); - // let mut tasksFriendHandle = Some(tasksFriendHandle); - // scheduler.deschedule_running_task_and_then(|_, task| { - // // unblock task - // task.wake().map(|task| { - // // send self to sched2 - // tasksFriendHandle.take_unwrap() - // .send(TaskFromFriend(task)); - // }); - // // sched1 should now sleep since it has nothing else to do - // }) - // // sched2 will wake up and get the task as we do nothing else, - // // the function ends and the socket goes out of scope sched2 - // // will start to run the destructor the destructor will first - // // block the task, set it's home as sched1, then enqueue it - // // sched2 will dequeue the task, see that it has a home, and - // // send it to sched1 sched1 will wake up, exec the close - // // function on the correct loop, and then we're done - // }; + #[test] + fn test_homing_closes_correctly() { + let (port, chan) = Chan::new(); + let mut pool = SchedPool::new(PoolConfig { threads: 1 }); + + do pool.spawn(TaskOpts::new()) { + let listener = UdpWatcher::bind(local_loop(), next_test_ip4()); + chan.send(listener.unwrap()); + } - // let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, - // test_function); - // main_task.death.on_exit = Some(on_exit); + let task = do pool.task(TaskOpts::new()) { + port.recv(); + }; + pool.spawn_sched().send(sched::TaskFromFriend(task)); - // let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) { - // // nothing - // }; + pool.shutdown(); + } - // let main_task = main_task; - // let sched1 = sched1; - // let thread1 = do Thread::start { - // sched1.bootstrap(main_task); - // }; + #[test] + fn test_homing_read() { + let (port, chan) = Chan::new(); + let mut pool = SchedPool::new(PoolConfig { threads: 1 }); + + do pool.spawn(TaskOpts::new()) { + let addr1 = next_test_ip4(); + let addr2 = next_test_ip4(); + let listener = UdpWatcher::bind(local_loop(), addr2); + chan.send((listener.unwrap(), addr1)); + let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap(); + listener.sendto([1, 2, 3, 4], addr2); + } - // let sched2 = sched2; - // let thread2 = do Thread::start { - // sched2.bootstrap(null_task); - // }; + let task = do pool.task(TaskOpts::new()) { + let (mut watcher, addr) = port.recv(); + let mut buf = [0, ..10]; + assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr)); + }; + pool.spawn_sched().send(sched::TaskFromFriend(task)); - // thread1.join(); - // thread2.join(); - // } - //} + pool.shutdown(); + } } |
