diff options
| author | toddaaro <github@opprobrio.us> | 2013-08-05 13:06:24 -0700 |
|---|---|---|
| committer | toddaaro <github@opprobrio.us> | 2013-08-08 14:13:41 -0700 |
| commit | af2e03998d4d06f2781ca72ec005f6913148f8bb (patch) | |
| tree | fc65de2220eb4735ef6fdf554b5c7204ec41d6b1 /src/libstd/rt | |
| parent | a0080f4e07891c89aa1f9851f8b0a3c754734fe8 (diff) | |
| download | rust-af2e03998d4d06f2781ca72ec005f6913148f8bb.tar.gz rust-af2e03998d4d06f2781ca72ec005f6913148f8bb.zip | |
Enabled workstealing in the scheduler. Previously we had one global work queue shared by each scheduler. Now there is a separate work queue for each scheduler, and work is "stolen" from other queues when it is exhausted locally.
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 7 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 32 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 159 | ||||
| -rw-r--r-- | src/libstd/rt/select.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 18 |
5 files changed, 156 insertions, 62 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 0cf223f3029..33b4b307af8 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> { fn optimistic_check(&mut self) -> bool { // The optimistic check is never necessary for correctness. For testing // purposes, making it randomly return false simulates a racing sender. - use rand::{Rand, rng}; - let mut rng = rng(); - let actually_check = Rand::rand(&mut rng); + use rand::{Rand}; + let actually_check = do Local::borrow::<Scheduler, bool> |sched| { + Rand::rand(&mut sched.rng) + }; if actually_check { unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } } else { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 147c75e5c41..01a52892f63 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`: use cell::Cell; use clone::Clone; use container::Container; -use iter::Times; -use iterator::{Iterator, IteratorUtil}; +use iterator::{Iterator, IteratorUtil, range}; use option::{Some, None}; use ptr::RawPtr; use rt::local::Local; @@ -247,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let main = Cell::new(main); - // The shared list of sleeping schedulers. Schedulers wake each other - // occassionally to do new work. + // The shared list of sleeping schedulers. let sleepers = SleeperList::new(); - // The shared work queue. Temporary until work stealing is implemented. - let work_queue = WorkQueue::new(); + + // Create a work queue for each scheduler, ntimes. Create an extra + // for the main thread if that flag is set. We won't steal from it. + let mut work_queues = ~[]; + for _ in range(0u, nscheds) { + let work_queue: WorkQueue<~Task> = WorkQueue::new(); + work_queues.push(work_queue); + } // The schedulers. let mut scheds = ~[]; @@ -259,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // sent the Shutdown message to terminate the schedulers. let mut handles = ~[]; - do nscheds.times { + for i in range(0u, nscheds) { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let mut sched = ~Scheduler::new(loop_, + work_queues[i].clone(), + work_queues.clone(), + sleepers.clone()); let handle = sched.make_handle(); scheds.push(sched); @@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let friend_handle = friend_sched.make_handle(); scheds.push(friend_sched); + // This scheduler needs a queue that isn't part of the stealee + // set. + let work_queue = WorkQueue::new(); + let main_loop = ~UvEventLoop::new(); let mut main_sched = ~Scheduler::new_special(main_loop, - work_queue.clone(), + work_queue, + work_queues.clone(), sleepers.clone(), false, Some(friend_handle)); @@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None, home, main.take()); main_task.death.on_exit = Some(on_exit.take()); - rtdebug!("boostrapping main_task"); + rtdebug!("bootstrapping main_task"); main_sched.bootstrap(main_task); } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 990e1a4a3de..ce4e64c47d2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -13,7 +13,6 @@ use option::{Option, Some, None}; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; - use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; @@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; +use rand::{XorShiftRng, RngUtil}; +use iterator::{range}; +use vec::{OwnedVector}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -37,9 +39,11 @@ use cell::Cell; /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. pub struct Scheduler { - /// A queue of available work. Under a work-stealing policy there - /// is one per Scheduler. - work_queue: WorkQueue<~Task>, + /// There are N work queues, one per scheduler. + priv work_queue: WorkQueue<~Task>, + /// Work queues for the other schedulers. These are created by + /// cloning the core work queues. + work_queues: ~[WorkQueue<~Task>], /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. @@ -70,7 +74,10 @@ pub struct Scheduler { run_anything: bool, /// If the scheduler shouldn't run some tasks, a friend to send /// them to. - friend_handle: Option<SchedHandle> + friend_handle: Option<SchedHandle>, + /// A fast XorShift rng for scheduler use + rng: XorShiftRng + } pub struct SchedHandle { @@ -97,10 +104,13 @@ impl Scheduler { pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) -> Scheduler { - Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None) + Scheduler::new_special(event_loop, work_queue, + work_queues, + sleeper_list, true, None) } @@ -108,6 +118,7 @@ impl Scheduler { // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, + work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, run_anything: bool, friend: Option<SchedHandle>) @@ -120,12 +131,14 @@ impl Scheduler { no_sleep: false, event_loop: event_loop, work_queue: work_queue, + work_queues: work_queues, stack_pool: StackPool::new(), sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything, - friend_handle: friend + friend_handle: friend, + rng: XorShiftRng::new() } } @@ -248,7 +261,7 @@ impl Scheduler { // Second activity is to try resuming a task from the queue. - let result = sched.resume_task_from_queue(); + let result = sched.do_work(); let mut sched = match result { Some(sched) => { // Failed to dequeue a task, so we return. @@ -415,47 +428,98 @@ impl Scheduler { } } - // Resume a task from the queue - but also take into account that - // it might not belong here. + // Workstealing: In this iteration of the runtime each scheduler + // thread has a distinct work queue. When no work is available + // locally, make a few attempts to steal work from the queues of + // other scheduler threads. If a few steals fail we end up in the + // old "no work" path which is fine. + + // First step in the process is to find a task. This function does + // that by first checking the local queue, and if there is no work + // there, trying to steal from the remote work queues. + fn find_work(&mut self) -> Option<~Task> { + rtdebug!("scheduler looking for work"); + match self.work_queue.pop() { + Some(task) => { + rtdebug!("found a task locally"); + return Some(task) + } + None => { + // Our naive stealing, try kinda hard. + rtdebug!("scheduler trying to steal"); + let _len = self.work_queues.len(); + return self.try_steals(2); + } + } + } + + // With no backoff try stealing n times from the queues the + // scheduler knows about. This naive implementation can steal from + // our own queue or from other special schedulers. + fn try_steals(&mut self, n: uint) -> Option<~Task> { + for _ in range(0, n) { + let index = self.rng.gen_uint_range(0, self.work_queues.len()); + let work_queues = &mut self.work_queues; + match work_queues[index].steal() { + Some(task) => { + rtdebug!("found task by stealing"); return Some(task) + } + None => () + } + }; + rtdebug!("giving up on stealing"); + return None; + } - // If we perform a scheduler action we give away the scheduler ~ - // pointer, if it is still available we return it. + // Given a task, execute it correctly. + fn process_task(~self, task: ~Task) -> Option<~Scheduler> { + let mut this = self; + let mut task = task; - fn resume_task_from_queue(~self) -> Option<~Scheduler> { + rtdebug!("processing a task"); + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + rtdebug!("sending task home"); + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + rtdebug!("running task here"); + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; + } + } + AnySched if this.run_anything => { + rtdebug!("running anysched task here"); + task.give_home(AnySched); + this.resume_task_immediately(task); + return None; + } + AnySched => { + rtdebug!("sending task to friend"); + task.give_home(AnySched); + this.send_to_friend(task); + return Some(this); + } + } + } + + // Bundle the helpers together. + fn do_work(~self) -> Option<~Scheduler> { let mut this = self; - match this.work_queue.pop() { + rtdebug!("scheduler calling do work"); + match this.find_work() { Some(task) => { - let mut task = task; - let home = task.take_unwrap_home(); - match home { - Sched(home_handle) => { - if home_handle.sched_id != this.sched_id() { - task.give_home(Sched(home_handle)); - Scheduler::send_task_home(task); - return Some(this); - } else { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(Sched(home_handle)); - this.resume_task_immediately(task); - return None; - } - } - AnySched if this.run_anything => { - this.event_loop.callback(Scheduler::run_sched_once); - task.give_home(AnySched); - this.resume_task_immediately(task); - return None; - } - AnySched => { - task.give_home(AnySched); - this.send_to_friend(task); - return Some(this); - } - } + rtdebug!("found some work! processing the task"); + return this.process_task(task); } None => { + rtdebug!("no work was found, returning the scheduler struct"); return Some(this); } } @@ -711,7 +775,6 @@ impl Scheduler { GiveTask(task, f) => f.to_fn()(self, task) } } - } // The cases for the below function. @@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver { #[cfg(test)] mod test { + extern mod extra; + use prelude::*; use rt::test::*; use unstable::run_in_bare_thread; @@ -862,12 +927,15 @@ mod test { do run_in_bare_thread { let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); + let normal_queue = WorkQueue::new(); + let special_queue = WorkQueue::new(); + let queues = ~[normal_queue.clone(), special_queue.clone()]; // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), - work_queue.clone(), + normal_queue, + queues.clone(), sleepers.clone()); let normal_handle = Cell::new(normal_sched.make_handle()); @@ -877,7 +945,8 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), - work_queue.clone(), + special_queue.clone(), + queues.clone(), sleepers.clone(), false, Some(friend_handle)); diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs index 006b777b71b..07f8ca77d9d 100644 --- a/src/libstd/rt/select.rs +++ b/src/libstd/rt/select.rs @@ -182,6 +182,7 @@ mod test { fn select_stream() { use util; use comm::GenericChan; + use iter::Times; // Sends 10 buffered packets, and uses select to retrieve them all. // Puts the port in a different spot in the vector each time. @@ -265,6 +266,7 @@ mod test { fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { use rt::test::spawntask_random; + use iter::Times; do run_in_newsched_task { // A bit of stress, since ordinarily this is just smoke and mirrors. diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 792ea5eb33f..92366d5187f 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -15,8 +15,8 @@ use cell::Cell; use clone::Clone; use container::Container; use iterator::{Iterator, range}; -use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use vec::{OwnedVector, MutableVector, ImmutableVector}; use rt::sched::Scheduler; use unstable::run_in_bare_thread; use rt::thread::Thread; @@ -29,8 +29,12 @@ use result::{Result, Ok, Err}; pub fn new_test_uv_sched() -> Scheduler { + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + let mut sched = Scheduler::new(~UvEventLoop::new(), - WorkQueue::new(), + queue, + queues, SleeperList::new()); // Don't wait for the Shutdown message @@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { }; let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); let mut handles = ~[]; let mut scheds = ~[]; + let mut work_queues = ~[]; for _ in range(0u, nthreads) { + let work_queue = WorkQueue::new(); + work_queues.push(work_queue); + } + + for i in range(0u, nthreads) { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, - work_queue.clone(), + work_queues[i].clone(), + work_queues.clone(), sleepers.clone()); let handle = sched.make_handle(); |
