diff options
| author | Erick Tryzelaar <erick.tryzelaar@gmail.com> | 2013-08-08 19:27:03 -0700 |
|---|---|---|
| committer | Erick Tryzelaar <erick.tryzelaar@gmail.com> | 2013-08-08 19:27:03 -0700 |
| commit | 56730c094cf95be58fb05b0e423673aca2a98b88 (patch) | |
| tree | 096a652b16d38a6f4ff65dd39657ccf308249909 /src/libstd/rt | |
| parent | 03cc757fe90b88895fcf911d9cce5c04a008b127 (diff) | |
| parent | 936f70bd878327d867b6f8f82061d738355a47c9 (diff) | |
| download | rust-56730c094cf95be58fb05b0e423673aca2a98b88.tar.gz rust-56730c094cf95be58fb05b0e423673aca2a98b88.zip | |
Merge remote-tracking branch 'remotes/origin/master' into remove-str-trailing-nulls
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 40 | ||||
| -rw-r--r-- | src/libstd/rt/env.rs | 28 | ||||
| -rw-r--r-- | src/libstd/rt/io/mem.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 23 | ||||
| -rw-r--r-- | src/libstd/rt/local.rs | 11 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 40 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 177 | ||||
| -rw-r--r-- | src/libstd/rt/select.rs | 6 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 45 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 33 |
10 files changed, 281 insertions, 124 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index a060059f5fc..936a6526508 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -159,7 +159,7 @@ impl<T> ChanOne<T> { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); if do_resched { - do recvr.wake().map_consume |woken_task| { + do recvr.wake().map_move |woken_task| { Scheduler::run_task(woken_task); }; } else { @@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> { fn optimistic_check(&mut self) -> bool { // The optimistic check is never necessary for correctness. For testing // purposes, making it randomly return false simulates a racing sender. - use rand::{Rand, rng}; - let mut rng = rng(); - let actually_check = Rand::rand(&mut rng); + use rand::{Rand}; + let actually_check = do Local::borrow::<Scheduler, bool> |sched| { + Rand::rand(&mut sched.rng) + }; if actually_check { unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } } else { @@ -381,7 +382,7 @@ impl<T> Drop for ChanOne<T> { // The port is blocked waiting for a message we will never send. Wake it. assert!((*this.packet()).payload.is_none()); let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { + do recvr.wake().map_move |woken_task| { Scheduler::run_task(woken_task); }; } @@ -508,7 +509,11 @@ impl<T> Peekable<T> for Port<T> { } } -impl<T> Select for Port<T> { +// XXX: Kind of gross. A Port<T> should be selectable so you can make an array +// of them, but a &Port<T> should also be selectable so you can select2 on it +// alongside a PortOne<U> without passing the port by value in recv_ready. + +impl<'self, T> Select for &'self Port<T> { #[inline] fn optimistic_check(&mut self) -> bool { do self.next.with_mut_ref |pone| { pone.optimistic_check() } @@ -526,12 +531,29 @@ impl<T> Select for Port<T> { } } -impl<T> SelectPort<(T, Port<T>)> for Port<T> { - fn recv_ready(self) -> Option<(T, Port<T>)> { +impl<T> Select for Port<T> { + #[inline] + fn optimistic_check(&mut self) -> bool { + (&*self).optimistic_check() + } + + #[inline] + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + (&*self).block_on(sched, task) + } + + #[inline] + fn unblock_from(&mut self) -> bool { + (&*self).unblock_from() + } +} + +impl<'self, T> SelectPort<T> for &'self Port<T> { + fn recv_ready(self) -> Option<T> { match self.next.take().recv_ready() { Some(StreamPayload { val, next }) => { self.next.put_back(next); - Some((val, self)) + Some(val) } None => None } diff --git a/src/libstd/rt/env.rs b/src/libstd/rt/env.rs index 1d7ff173149..6e671742fb6 100644 --- a/src/libstd/rt/env.rs +++ b/src/libstd/rt/env.rs @@ -10,7 +10,12 @@ //! Runtime environment settings +use from_str::FromStr; use libc::{size_t, c_char, c_int}; +use option::{Some, None}; +use os; + +// OLD RT stuff pub struct Environment { /// The number of threads to use by default @@ -47,3 +52,26 @@ pub fn get() -> &Environment { extern { fn rust_get_rt_env() -> &Environment; } + +// NEW RT stuff + +// Note that these are all accessed without any synchronization. +// They are expected to be initialized once then left alone. + +static mut MIN_STACK: uint = 2000000; + +pub fn init() { + unsafe { + match os::getenv("RUST_MIN_STACK") { + Some(s) => match FromStr::from_str(s) { + Some(i) => MIN_STACK = i, + None => () + }, + None => () + } + } +} + +pub fn min_stack() -> uint { + unsafe { MIN_STACK } +} diff --git a/src/libstd/rt/io/mem.rs b/src/libstd/rt/io/mem.rs index c93945a6a9a..277897e5d2e 100644 --- a/src/libstd/rt/io/mem.rs +++ b/src/libstd/rt/io/mem.rs @@ -26,7 +26,7 @@ pub struct MemWriter { } impl MemWriter { - pub fn new() -> MemWriter { MemWriter { buf: ~[] } } + pub fn new() -> MemWriter { MemWriter { buf: vec::with_capacity(128) } } } impl Writer for MemWriter { diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 3372c13b877..e07cb1425bf 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -402,10 +402,10 @@ impl KillHandle { || { // Prefer to check tombstones that were there first, // being "more fair" at the expense of tail-recursion. - others.take().map_consume_default(true, |f| f()) && { + others.take().map_move_default(true, |f| f()) && { let mut inner = this.take().unwrap(); (!inner.any_child_failed) && - inner.child_tombstones.take_map_default(true, |f| f()) + inner.child_tombstones.take().map_move_default(true, |f| f()) } } } @@ -424,7 +424,7 @@ impl KillHandle { let others = Cell::new(other_tombstones); // :( || { // Prefer fairness to tail-recursion, as in above case. - others.take().map_consume_default(true, |f| f()) && + others.take().map_move_default(true, |f| f()) && f.take()() } } @@ -493,7 +493,7 @@ impl Death { { use util; util::ignore(group); } // Step 1. Decide if we need to collect child failures synchronously. - do self.on_exit.take_map |on_exit| { + do self.on_exit.take().map_move |on_exit| { if success { // We succeeded, but our children might not. Need to wait for them. let mut inner = self.kill_handle.take_unwrap().unwrap(); @@ -501,7 +501,7 @@ impl Death { success = false; } else { // Lockless access to tombstones protected by unwrap barrier. - success = inner.child_tombstones.take_map_default(true, |f| f()); + success = inner.child_tombstones.take().map_move_default(true, |f| f()); } } on_exit(success); @@ -510,12 +510,12 @@ impl Death { // Step 2. Possibly alert possibly-watching parent to failure status. // Note that as soon as parent_handle goes out of scope, the parent // can successfully unwrap its handle and collect our reported status. - do self.watching_parent.take_map |mut parent_handle| { + do self.watching_parent.take().map_move |mut parent_handle| { if success { // Our handle might be None if we had an exit callback, and // already unwrapped it. But 'success' being true means no // child failed, so there's nothing to do (see below case). - do self.kill_handle.take_map |own_handle| { + do self.kill_handle.take().map_move |own_handle| { own_handle.reparent_children_to(&mut parent_handle); }; } else { @@ -590,7 +590,8 @@ impl Death { #[inline] pub fn assert_may_sleep(&self) { if self.wont_sleep != 0 { - rtabort!("illegal atomic-sleep: can't deschedule inside atomically()"); + rtabort!("illegal atomic-sleep: attempt to reschedule while \ + using an Exclusive or LittleLock"); } } } @@ -614,6 +615,7 @@ mod test { // Test cases don't care about the spare killed flag. fn make_kill_handle() -> KillHandle { let (h,_) = KillHandle::new(); h } + #[ignore(reason = "linked failure")] #[test] fn no_tombstone_success() { do run_in_newsched_task { @@ -819,6 +821,7 @@ mod test { } } + #[ignore(reason = "linked failure")] #[test] fn block_and_get_killed() { do with_test_task |mut task| { @@ -830,6 +833,7 @@ mod test { } } + #[ignore(reason = "linked failure")] #[test] fn block_already_killed() { do with_test_task |mut task| { @@ -839,6 +843,7 @@ mod test { } } + #[ignore(reason = "linked failure")] #[test] fn block_unkillably_and_get_killed() { do with_test_task |mut task| { @@ -856,6 +861,7 @@ mod test { } } + #[ignore(reason = "linked failure")] #[test] fn block_on_pipe() { // Tests the "killable" path of casting to/from uint. @@ -869,6 +875,7 @@ mod test { } } + #[ignore(reason = "linked failure")] #[test] fn block_unkillably_on_pipe() { // Tests the "indestructible" path of casting to/from uint. diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 131507196b1..7154066e7b7 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -126,6 +126,7 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { + use option::None; use unstable::run_in_bare_thread; use rt::test::*; use super::*; @@ -137,7 +138,7 @@ mod test { do run_in_bare_thread { local_ptr::init_tls_key(); let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, || {}); + let task = ~Task::new_root(&mut sched.stack_pool, None, || {}); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); @@ -149,11 +150,11 @@ mod test { do run_in_bare_thread { local_ptr::init_tls_key(); let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, || {}); + let task = ~Task::new_root(&mut sched.stack_pool, None, || {}); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); - let task = ~Task::new_root(&mut sched.stack_pool, || {}); + let task = ~Task::new_root(&mut sched.stack_pool, None, || {}); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); @@ -166,7 +167,7 @@ mod test { do run_in_bare_thread { local_ptr::init_tls_key(); let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, || {}); + let task = ~Task::new_root(&mut sched.stack_pool, None, || {}); Local::put(task); unsafe { @@ -182,7 +183,7 @@ mod test { do run_in_bare_thread { local_ptr::init_tls_key(); let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, || {}); + let task = ~Task::new_root(&mut sched.stack_pool, None, || {}); Local::put(task); let res = do Local::borrow::<Task,bool> |_task| { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 760ca8a9ada..01a52892f63 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`: use cell::Cell; use clone::Clone; use container::Container; -use iter::Times; -use iterator::{Iterator, IteratorUtil}; +use iterator::{Iterator, IteratorUtil, range}; use option::{Some, None}; use ptr::RawPtr; use rt::local::Local; @@ -212,6 +211,7 @@ pub fn init(argc: int, argv: **u8, crate_map: *u8) { // Need to propagate the unsafety to `start`. unsafe { args::init(argc, argv); + env::init(); logging::init(crate_map); rust_update_gc_metadata(crate_map); } @@ -246,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let main = Cell::new(main); - // The shared list of sleeping schedulers. Schedulers wake each other - // occassionally to do new work. + // The shared list of sleeping schedulers. let sleepers = SleeperList::new(); - // The shared work queue. Temporary until work stealing is implemented. - let work_queue = WorkQueue::new(); + + // Create a work queue for each scheduler, ntimes. Create an extra + // for the main thread if that flag is set. We won't steal from it. + let mut work_queues = ~[]; + for _ in range(0u, nscheds) { + let work_queue: WorkQueue<~Task> = WorkQueue::new(); + work_queues.push(work_queue); + } // The schedulers. let mut scheds = ~[]; @@ -258,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // sent the Shutdown message to terminate the schedulers. let mut handles = ~[]; - do nscheds.times { + for i in range(0u, nscheds) { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let mut sched = ~Scheduler::new(loop_, + work_queues[i].clone(), + work_queues.clone(), + sleepers.clone()); let handle = sched.make_handle(); scheds.push(sched); @@ -279,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let friend_handle = friend_sched.make_handle(); scheds.push(friend_sched); + // This scheduler needs a queue that isn't part of the stealee + // set. + let work_queue = WorkQueue::new(); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, - work_queue.clone(), + work_queue, + work_queues.clone(), sleepers.clone(), false, Some(friend_handle)); @@ -330,8 +343,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // In the case where we do not use a main_thread scheduler we // run the main task in one of our threads. - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - main.take()); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, main.take()); main_task.death.on_exit = Some(on_exit.take()); let main_task_cell = Cell::new(main_task); @@ -351,7 +363,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let sched_cell = Cell::new(sched); let thread = do Thread::start { let mut sched = sched_cell.take(); - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || { rtdebug!("boostraping a non-primary scheduler"); }; sched.bootstrap(bootstrap_task); @@ -368,10 +380,10 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut main_sched = main_sched.unwrap(); let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, + let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None, home, main.take()); main_task.death.on_exit = Some(on_exit.take()); - rtdebug!("boostrapping main_task"); + rtdebug!("bootstrapping main_task"); main_sched.bootstrap(main_task); } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 1a75f2569b5..ce4e64c47d2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,7 +13,6 @@ use option::{Option, Some, None}; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; - use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; @@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; +use rand::{XorShiftRng, RngUtil}; +use iterator::{range}; +use vec::{OwnedVector}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -37,9 +39,11 @@ use cell::Cell; /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. pub struct Scheduler { - /// A queue of available work. Under a work-stealing policy there - /// is one per Scheduler. - work_queue: WorkQueue<~Task>, + /// There are N work queues, one per scheduler. + priv work_queue: WorkQueue<~Task>, + /// Work queues for the other schedulers. These are created by + /// cloning the core work queues. + work_queues: ~[WorkQueue<~Task>], /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. @@ -70,7 +74,10 @@ pub struct Scheduler { run_anything: bool, /// If the scheduler shouldn't run some tasks, a friend to send /// them to. - friend_handle: Option<SchedHandle> + friend_handle: Option<SchedHandle>, + /// A fast XorShift rng for scheduler use + rng: XorShiftRng + } pub struct SchedHandle { @@ -97,10 +104,13 @@ impl Scheduler { pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) -> Scheduler { - Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None) + Scheduler::new_special(event_loop, work_queue, + work_queues, + sleeper_list, true, None) } @@ -108,6 +118,7 @@ impl Scheduler { // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, run_anything: bool, friend: Option<SchedHandle>) @@ -120,12 +131,14 @@ impl Scheduler { no_sleep: false, event_loop: event_loop, work_queue: work_queue, + work_queues: work_queues, stack_pool: StackPool::new(), sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything, - friend_handle: friend + friend_handle: friend, + rng: XorShiftRng::new() } } @@ -248,7 +261,7 @@ impl Scheduler { // Second activity is to try resuming a task from the queue. - let result = sched.resume_task_from_queue(); + let result = sched.do_work(); let mut sched = match result { Some(sched) => { // Failed to dequeue a task, so we return. @@ -325,7 +338,7 @@ impl Scheduler { /// As enqueue_task, but with the possibility for the blocked task to /// already have been killed. pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { - do blocked_task.wake().map_consume |task| { + do blocked_task.wake().map_move |task| { self.enqueue_task(task); }; } @@ -415,47 +428,98 @@ impl Scheduler { } } - // Resume a task from the queue - but also take into account that - // it might not belong here. + // Workstealing: In this iteration of the runtime each scheduler + // thread has a distinct work queue. When no work is available + // locally, make a few attempts to steal work from the queues of + // other scheduler threads. If a few steals fail we end up in the + // old "no work" path which is fine. + + // First step in the process is to find a task. This function does + // that by first checking the local queue, and if there is no work + // there, trying to steal from the remote work queues. + fn find_work(&mut self) -> Option<~Task> { + rtdebug!("scheduler looking for work"); + match self.work_queue.pop() { + Some(task) => { + rtdebug!("found a task locally"); + return Some(task) + } + None => { + // Our naive stealing, try kinda hard. + rtdebug!("scheduler trying to steal"); + let _len = self.work_queues.len(); + return self.try_steals(2); + } + } + } + + // With no backoff try stealing n times from the queues the + // scheduler knows about. This naive implementation can steal from + // our own queue or from other special schedulers. + fn try_steals(&mut self, n: uint) -> Option<~Task> { + for _ in range(0, n) { + let index = self.rng.gen_uint_range(0, self.work_queues.len()); + let work_queues = &mut self.work_queues; + match work_queues[index].steal() { + Some(task) => { + rtdebug!("found task by stealing"); return Some(task) + } + None => () + } + }; + rtdebug!("giving up on stealing"); + return None; + } - // If we perform a scheduler action we give away the scheduler ~ - // pointer, if it is still available we return it. + // Given a task, execute it correctly. + fn process_task(~self, task: ~Task) -> Option<~Scheduler> { + let mut this = self; + let mut task = task; - fn resume_task_from_queue(~self) -> Option<~Scheduler> { + rtdebug!("processing a task"); + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + rtdebug!("sending task home"); + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + rtdebug!("running task here"); + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; + } + } + AnySched if this.run_anything => { + rtdebug!("running anysched task here"); + task.give_home(AnySched); + this.resume_task_immediately(task); + return None; + } + AnySched => { + rtdebug!("sending task to friend"); + task.give_home(AnySched); + this.send_to_friend(task); + return Some(this); + } + } + } + + // Bundle the helpers together. + fn do_work(~self) -> Option<~Scheduler> { let mut this = self; - match this.work_queue.pop() { + rtdebug!("scheduler calling do work"); + match this.find_work() { Some(task) => { - let mut task = task; - let home = task.take_unwrap_home(); - match home { - Sched(home_handle) => { - if home_handle.sched_id != this.sched_id() { - task.give_home(Sched(home_handle)); - Scheduler::send_task_home(task); - return Some(this); - } else { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(Sched(home_handle)); - this.resume_task_immediately(task); - return None; - } - } - AnySched if this.run_anything => { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(AnySched); - this.resume_task_immediately(task); - return None; - } - AnySched => { - task.give_home(AnySched); - this.send_to_friend(task); - return Some(this); - } - } + rtdebug!("found some work! processing the task"); + return this.process_task(task); } None => { + rtdebug!("no work was found, returning the scheduler struct"); return Some(this); } } @@ -533,7 +597,7 @@ impl Scheduler { sched.enqueue_blocked_task(last_task); } }; - opt.map_consume(Local::put); + opt.map_move(Local::put); } // The primary function for changing contexts. In the current @@ -711,7 +775,6 @@ impl Scheduler { GiveTask(task, f) => f.to_fn()(self, task) } } - } // The cases for the below function. @@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver { #[cfg(test)] mod test { + extern mod extra; + use prelude::*; use rt::test::*; use unstable::run_in_bare_thread; @@ -833,7 +898,7 @@ mod test { let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); - let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None, Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; assert!(Task::on_appropriate_sched()); @@ -862,12 +927,15 @@ mod test { do run_in_bare_thread { let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); + let normal_queue = WorkQueue::new(); + let special_queue = WorkQueue::new(); + let queues = ~[normal_queue.clone(), special_queue.clone()]; // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), - work_queue.clone(), + normal_queue, + queues.clone(), sleepers.clone()); let normal_handle = Cell::new(normal_sched.make_handle()); @@ -877,7 +945,8 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), - work_queue.clone(), + special_queue.clone(), + queues.clone(), sleepers.clone(), false, Some(friend_handle)); @@ -893,21 +962,21 @@ mod test { // 3) task not homed, sched requeues // 4) task not home, send home - let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, Sched(t1_handle)) || { rtassert!(Task::on_appropriate_sched()); }; rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtassert!(Task::on_appropriate_sched()); }; - let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtassert!(Task::on_appropriate_sched()); }; - let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, Sched(t4_handle)) { rtassert!(Task::on_appropriate_sched()); }; @@ -923,7 +992,7 @@ mod test { let port = Cell::new(port); let chan = Cell::new(chan); - let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtdebug!("*about to submit task2*"); Scheduler::run_task(task2.take()); rtdebug!("*about to submit task4*"); @@ -938,7 +1007,7 @@ mod test { rtdebug!("normal task: %u", borrow::to_uint(normal_task)); - let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) { rtdebug!("*about to submit task1*"); Scheduler::run_task(task1.take()); rtdebug!("*about to submit task3*"); diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs index 006b777b71b..0e8d26e9482 100644 --- a/src/libstd/rt/select.rs +++ b/src/libstd/rt/select.rs @@ -182,6 +182,7 @@ mod test { fn select_stream() { use util; use comm::GenericChan; + use iter::Times; // Sends 10 buffered packets, and uses select to retrieve them all. // Puts the port in a different spot in the vector each time. @@ -199,9 +200,7 @@ mod test { // get it back out util::swap(port.get_mut_ref(), &mut ports[index]); // NB. Not recv(), because optimistic_check randomly fails. - let (data, new_port) = port.take_unwrap().recv_ready().unwrap(); - assert!(data == 31337); - port = Some(new_port); + assert!(port.get_ref().recv_ready().unwrap() == 31337); } } } @@ -265,6 +264,7 @@ mod test { fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { use rt::test::spawntask_random; + use iter::Times; do run_in_newsched_task { // A bit of stress, since ordinarily this is just smoke and mirrors. diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 4c5e4bdc3c1..364439a4526 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -20,6 +20,7 @@ use libc::{c_void, uintptr_t}; use ptr; use prelude::*; use option::{Option, Some, None}; +use rt::env; use rt::kill::Death; use rt::local::Local; use rt::logging::StdErrLogger; @@ -85,12 +86,13 @@ impl Task { // A helper to build a new task using the dynamically found // scheduler and task. Only works in GreenTask context. - pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + pub fn build_homed_child(stack_size: Option<uint>, f: ~fn(), home: SchedHome) -> ~Task { let f = Cell::new(f); let home = Cell::new(home); do Local::borrow::<Task, ~Task> |running_task| { let mut sched = running_task.sched.take_unwrap(); let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + stack_size, home.take(), f.take()); running_task.sched = Some(sched); @@ -98,25 +100,26 @@ impl Task { } } - pub fn build_child(f: ~fn()) -> ~Task { - Task::build_homed_child(f, AnySched) + pub fn build_child(stack_size: Option<uint>, f: ~fn()) -> ~Task { + Task::build_homed_child(stack_size, f, AnySched) } - pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + pub fn build_homed_root(stack_size: Option<uint>, f: ~fn(), home: SchedHome) -> ~Task { let f = Cell::new(f); let home = Cell::new(home); do Local::borrow::<Task, ~Task> |running_task| { let mut sched = running_task.sched.take_unwrap(); let new_task = ~Task::new_root_homed(&mut sched.stack_pool, - home.take(), - f.take()); + stack_size, + home.take(), + f.take()); running_task.sched = Some(sched); new_task } } - pub fn build_root(f: ~fn()) -> ~Task { - Task::build_homed_root(f, AnySched) + pub fn build_root(stack_size: Option<uint>, f: ~fn()) -> ~Task { + Task::build_homed_root(stack_size, f, AnySched) } pub fn new_sched_task() -> Task { @@ -137,17 +140,20 @@ impl Task { } pub fn new_root(stack_pool: &mut StackPool, + stack_size: Option<uint>, start: ~fn()) -> Task { - Task::new_root_homed(stack_pool, AnySched, start) + Task::new_root_homed(stack_pool, stack_size, AnySched, start) } pub fn new_child(&mut self, stack_pool: &mut StackPool, + stack_size: Option<uint>, start: ~fn()) -> Task { - self.new_child_homed(stack_pool, AnySched, start) + self.new_child_homed(stack_pool, stack_size, AnySched, start) } pub fn new_root_homed(stack_pool: &mut StackPool, + stack_size: Option<uint>, home: SchedHome, start: ~fn()) -> Task { Task { @@ -160,7 +166,7 @@ impl Task { death: Death::new(), destroyed: false, name: None, - coroutine: Some(Coroutine::new(stack_pool, start)), + coroutine: Some(Coroutine::new(stack_pool, stack_size, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -168,6 +174,7 @@ impl Task { pub fn new_child_homed(&mut self, stack_pool: &mut StackPool, + stack_size: Option<uint>, home: SchedHome, start: ~fn()) -> Task { Task { @@ -181,7 +188,7 @@ impl Task { death: self.death.new_child(), destroyed: false, name: None, - coroutine: Some(Coroutine::new(stack_pool, start)), + coroutine: Some(Coroutine::new(stack_pool, stack_size, start)), sched: None, task_type: GreenTask(Some(~home)) } @@ -325,11 +332,13 @@ impl Drop for Task { impl Coroutine { - pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - static MIN_STACK_SIZE: uint = 3000000; // XXX: Too much stack - + pub fn new(stack_pool: &mut StackPool, stack_size: Option<uint>, start: ~fn()) -> Coroutine { + let stack_size = match stack_size { + Some(size) => size, + None => env::min_stack() + }; let start = Coroutine::build_start_wrapper(start); - let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); + let mut stack = stack_pool.take_segment(stack_size); let initial_context = Context::new(start, &mut stack); Coroutine { current_stack_segment: stack, @@ -465,10 +474,10 @@ mod test { do run_in_newsched_task() { static key: local_data::Key<@~str> = &local_data::Key; local_data::set(key, @~"data"); - assert!(*local_data::get(key, |k| k.map(|&k| *k)).unwrap() == ~"data"); + assert!(*local_data::get(key, |k| k.map_move(|k| *k)).unwrap() == ~"data"); static key2: local_data::Key<@~str> = &local_data::Key; local_data::set(key2, @~"data"); - assert!(*local_data::get(key2, |k| k.map(|&k| *k)).unwrap() == ~"data"); + assert!(*local_data::get(key2, |k| k.map_move(|k| *k)).unwrap() == ~"data"); } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 8b5215ae969..92366d5187f 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -15,8 +15,8 @@ use cell::Cell; use clone::Clone; use container::Container; use iterator::{Iterator, range}; -use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use vec::{OwnedVector, MutableVector, ImmutableVector}; use rt::sched::Scheduler; use unstable::run_in_bare_thread; use rt::thread::Thread; @@ -29,8 +29,12 @@ use result::{Result, Ok, Err}; pub fn new_test_uv_sched() -> Scheduler { + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + let mut sched = Scheduler::new(~UvEventLoop::new(), - WorkQueue::new(), + queue, + queues, SleeperList::new()); // Don't wait for the Shutdown message @@ -57,7 +61,7 @@ pub fn run_in_newsched_task_core(f: ~fn()) { exit_handle.take().send(Shutdown); rtassert!(exit_status); }; - let mut task = ~Task::new_root(&mut sched.stack_pool, f); + let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); task.death.on_exit = Some(on_exit); sched.bootstrap(task); @@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { }; let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); let mut handles = ~[]; let mut scheds = ~[]; + let mut work_queues = ~[]; for _ in range(0u, nthreads) { + let work_queue = WorkQueue::new(); + work_queues.push(work_queue); + } + + for i in range(0u, nthreads) { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, - work_queue.clone(), + work_queues[i].clone(), + work_queues.clone(), sleepers.clone()); let handle = sched.make_handle(); @@ -190,8 +200,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f.take()); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, f.take()); main_task.death.on_exit = Some(on_exit); let mut threads = ~[]; @@ -209,7 +218,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { while !scheds.is_empty() { let mut sched = scheds.pop(); - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || { rtdebug!("bootstrapping non-primary scheduler"); }; let bootstrap_task_cell = Cell::new(bootstrap_task); @@ -232,12 +241,12 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - Scheduler::run_task(Task::build_child(f)); + Scheduler::run_task(Task::build_child(None, f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - Scheduler::run_task_later(Task::build_child(f)); + Scheduler::run_task_later(Task::build_child(None, f)); } pub fn spawntask_random(f: ~fn()) { @@ -259,7 +268,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = Task::build_root(f); + let mut new_task = Task::build_root(None, f); new_task.death.on_exit = Some(on_exit); Scheduler::run_task(new_task); @@ -285,7 +294,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let task = blk(~Task::new_root(&mut sched.stack_pool, ||{})); + let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{})); cleanup_task(task); } } |
