diff options
Diffstat (limited to 'src/libstd/rt/test.rs')
| -rw-r--r-- | src/libstd/rt/test.rs | 359 |
1 files changed, 296 insertions, 63 deletions
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index fe08d85c947..6e4fb9b1d94 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -9,13 +9,33 @@ // except according to those terms. use uint; -use option::*; +use option::{Some, None}; use cell::Cell; +use clone::Clone; +use container::Container; +use iterator::IteratorUtil; +use vec::{OwnedVector, MutableVector}; use result::{Result, Ok, Err}; +use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; +use rt::comm::oneshot; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; +use rt::sched::{Scheduler, Coroutine}; +use rt::sleeper_list::SleeperList; +use rt::work_queue::WorkQueue; + +pub fn new_test_uv_sched() -> Scheduler { + use rt::uv::uvio::UvEventLoop; + use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; + + let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; +} /// Creates a new scheduler in a new thread and runs a task in it, /// then waits for the scheduler to exit. Failure of the task @@ -23,48 +43,240 @@ use rt::local::Local; pub fn run_in_newsched_task(f: ~fn()) { use super::sched::*; use unstable::run_in_bare_thread; - use rt::uv::uvio::UvEventLoop; let f = Cell::new(f); do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + new_task.on_exit = Some(on_exit); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + new_task, f.take()); sched.enqueue_task(task); sched.run(); } } +/// Create more than one scheduler and run a function in a task +/// in one of the schedulers. The schedulers will stay alive +/// until the function `f` returns. +pub fn run_in_mt_newsched_task(f: ~fn()) { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + let f_cell = Cell::new(f); + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code + // to force the OS to preempt them frequently. + // Assuming that this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, nthreads) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + + handles.push(handle); + scheds.push(sched); + } + + let f_cell = Cell::new(f_cell.take()); + let handles = Cell::new(handles); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| { + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + + rtassert!(exit_status); + }; + new_task.on_exit = Some(on_exit); + let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, + new_task, f_cell.take()); + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + +// THIS IS AWFUL. Copy-pasted the above initialization function but +// with a number of hacks to make it spawn tasks on a variety of +// schedulers with a variety of homes using the new spawn. + +pub fn run_in_mt_newsched_task_random_homed() { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code to force + // the OS to preempt them frequently. Assuming that + // this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + // create a few special schedulers, those with even indicies + // will be pinned-only + for uint::range(0, nthreads) |i| { + let special = (i % 2) == 0; + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new_special( + loop_, work_queue.clone(), sleepers.clone(), special); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + // Schedule a pile o tasks + let n = 5*stress_factor(); + for uint::range(0,n) |_i| { + rtdebug!("creating task: %u", _i); + let hf: ~fn() = || { assert!(true) }; + spawntask_homed(&mut scheds, hf); + } + + // Now we want another pile o tasks that do not ever run on a + // special scheduler, because they are normal tasks. Because + // we can we put these in the "main" task. + + let n = 5*stress_factor(); + + let f: ~fn() = || { + for uint::range(0,n) |_| { + let f: ~fn() = || { + // Borrow the scheduler we run on and check if it is + // privileged. + do Local::borrow::<Scheduler,()> |sched| { + assert!(sched.run_anything); + }; + }; + spawntask_random(f); + }; + }; + + let f_cell = Cell::new(f); + let handles = Cell::new(handles); + + rtdebug!("creating main task"); + + let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { + f_cell.take()(); + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + }; + + rtdebug!("queuing main task") + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + rtdebug!("running sched: %u", sched.sched_id()); + sched.run(); + }; + + threads.push(thread); + } + + rtdebug!("waiting on scheduler threads"); + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; + rtdebug!("spawntask taking the scheduler from TLS") + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - let sched = Local::take::<Scheduler>(); - sched.schedule_new_task(task.take()); - } + task, f); + rtdebug!("spawntask scheduling the new task"); + sched.schedule_task(task); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + task, f); + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -72,10 +284,13 @@ pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); sched.enqueue_task(task); Local::put(sched); @@ -86,20 +301,20 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); + + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -107,57 +322,75 @@ pub fn spawntask_random(f: ~fn()) { } } +/// Spawn a task, with the current scheduler as home, and queue it to +/// run later. +pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { + use super::sched::*; + use rand::{rng, RngUtil}; + let mut rng = rng(); + + let task = { + let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + let handle = sched.make_handle(); + let home_id = handle.sched_id; + + // now that we know where this is going, build a new function + // that can assert it is in the right place + let af: ~fn() = || { + do Local::borrow::<Scheduler,()>() |sched| { + rtdebug!("home_id: %u, runtime loc: %u", + home_id, + sched.sched_id()); + assert!(home_id == sched.sched_id()); + }; + f() + }; + + ~Coroutine::with_task_homed(&mut sched.stack_pool, + ~Task::new_root(), + af, + Sched(handle)) + }; + let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + // enqueue it for future execution + dest_sched.enqueue_task(task); +} /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; use super::sched::*; - use task; - use unstable::finally::Finally; - - // Our status variables will be filled in from the scheduler context - let mut failed = false; - let failed_ptr: *mut bool = &mut failed; - - // Switch to the scheduler - let f = Cell::new(Cell::new(f)); - let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |old_task| { - let old_task = Cell::new(old_task); - let f = f.take(); - let mut sched = Local::take::<Scheduler>(); - let new_task = ~do Coroutine::new(&mut sched.stack_pool) { - do (|| { - (f.take())() - }).finally { - // Check for failure then resume the parent task - unsafe { *failed_ptr = task::failing(); } - let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell::new(new_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(new_task.take()); - } - } - } - }; - sched.resume_task_immediately(new_task); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); + new_task.on_exit = Some(on_exit); + let mut sched = Local::take::<Scheduler>(); + let new_task = ~Coroutine::with_task(&mut sched.stack_pool, + new_task, f); + do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { + sched.enqueue_task(old_task); } - if !failed { Ok(()) } else { Err(()) } + let exit_status = port.recv(); + if exit_status { Ok(()) } else { Err(()) } } // Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; - use rt::uv::uvio::UvEventLoop; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + + let task = Cell::new(task); let f = Cell::new(f); let thread = do Thread::start { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.take(), f.take()); sched.enqueue_task(task); sched.run(); |
