about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authortoddaaro <github@opprobrio.us>2013-08-05 13:06:24 -0700
committertoddaaro <github@opprobrio.us>2013-08-08 14:13:41 -0700
commitaf2e03998d4d06f2781ca72ec005f6913148f8bb (patch)
treefc65de2220eb4735ef6fdf554b5c7204ec41d6b1 /src/libstd/rt
parenta0080f4e07891c89aa1f9851f8b0a3c754734fe8 (diff)
downloadrust-af2e03998d4d06f2781ca72ec005f6913148f8bb.tar.gz
rust-af2e03998d4d06f2781ca72ec005f6913148f8bb.zip
Enabled workstealing in the scheduler. Previously we had one global work queue shared by each scheduler. Now there is a separate work queue for each scheduler, and work is "stolen" from other queues when it is exhausted locally.
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs7
-rw-r--r--src/libstd/rt/mod.rs32
-rw-r--r--src/libstd/rt/sched.rs159
-rw-r--r--src/libstd/rt/select.rs2
-rw-r--r--src/libstd/rt/test.rs18
5 files changed, 156 insertions, 62 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 0cf223f3029..33b4b307af8 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> {
     fn optimistic_check(&mut self) -> bool {
         // The optimistic check is never necessary for correctness. For testing
         // purposes, making it randomly return false simulates a racing sender.
-        use rand::{Rand, rng};
-        let mut rng = rng();
-        let actually_check = Rand::rand(&mut rng);
+        use rand::{Rand};
+        let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
+            Rand::rand(&mut sched.rng)
+        };
         if actually_check {
             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
         } else {
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 147c75e5c41..01a52892f63 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`:
 use cell::Cell;
 use clone::Clone;
 use container::Container;
-use iter::Times;
-use iterator::{Iterator, IteratorUtil};
+use iterator::{Iterator, IteratorUtil, range};
 use option::{Some, None};
 use ptr::RawPtr;
 use rt::local::Local;
@@ -247,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
 
     let main = Cell::new(main);
 
-    // The shared list of sleeping schedulers. Schedulers wake each other
-    // occassionally to do new work.
+    // The shared list of sleeping schedulers.
     let sleepers = SleeperList::new();
-    // The shared work queue. Temporary until work stealing is implemented.
-    let work_queue = WorkQueue::new();
+
+    // Create a work queue for each scheduler, ntimes. Create an extra
+    // for the main thread if that flag is set. We won't steal from it.
+    let mut work_queues = ~[];
+    for _ in range(0u, nscheds) {
+        let work_queue: WorkQueue<~Task> = WorkQueue::new();
+        work_queues.push(work_queue);
+    }
 
     // The schedulers.
     let mut scheds = ~[];
@@ -259,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
     // sent the Shutdown message to terminate the schedulers.
     let mut handles = ~[];
 
-    do nscheds.times {
+    for i in range(0u, nscheds) {
         rtdebug!("inserting a regular scheduler");
 
         // Every scheduler is driven by an I/O event loop.
         let loop_ = ~UvEventLoop::new();
-        let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+        let mut sched = ~Scheduler::new(loop_,
+                                        work_queues[i].clone(),
+                                        work_queues.clone(),
+                                        sleepers.clone());
         let handle = sched.make_handle();
 
         scheds.push(sched);
@@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let friend_handle = friend_sched.make_handle();
         scheds.push(friend_sched);
 
+        // This scheduler needs a queue that isn't part of the stealee
+        // set.
+        let work_queue = WorkQueue::new();
+
         let main_loop = ~UvEventLoop::new();
         let mut main_sched = ~Scheduler::new_special(main_loop,
-                                                     work_queue.clone(),
+                                                     work_queue,
+                                                     work_queues.clone(),
                                                      sleepers.clone(),
                                                      false,
                                                      Some(friend_handle));
@@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
                                                   home, main.take());
         main_task.death.on_exit = Some(on_exit.take());
-        rtdebug!("boostrapping main_task");
+        rtdebug!("bootstrapping main_task");
 
         main_sched.bootstrap(main_task);
     }
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 990e1a4a3de..ce4e64c47d2 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -13,7 +13,6 @@ use option::{Option, Some, None};
 use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
 use clone::Clone;
 use unstable::raw;
-
 use super::sleeper_list::SleeperList;
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool};
@@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback;
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
 use cell::Cell;
+use rand::{XorShiftRng, RngUtil};
+use iterator::{range};
+use vec::{OwnedVector};
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
@@ -37,9 +39,11 @@ use cell::Cell;
 /// XXX: This creates too many callbacks to run_sched_once, resulting
 /// in too much allocation and too many events.
 pub struct Scheduler {
-    /// A queue of available work. Under a work-stealing policy there
-    /// is one per Scheduler.
-    work_queue: WorkQueue<~Task>,
+    /// There are N work queues, one per scheduler.
+    priv work_queue: WorkQueue<~Task>,
+    /// Work queues for the other schedulers. These are created by
+    /// cloning the core work queues.
+    work_queues: ~[WorkQueue<~Task>],
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
@@ -70,7 +74,10 @@ pub struct Scheduler {
     run_anything: bool,
     /// If the scheduler shouldn't run some tasks, a friend to send
     /// them to.
-    friend_handle: Option<SchedHandle>
+    friend_handle: Option<SchedHandle>,
+    /// A fast XorShift rng for scheduler use
+    rng: XorShiftRng
+
 }
 
 pub struct SchedHandle {
@@ -97,10 +104,13 @@ impl Scheduler {
 
     pub fn new(event_loop: ~EventLoopObject,
                work_queue: WorkQueue<~Task>,
+               work_queues: ~[WorkQueue<~Task>],
                sleeper_list: SleeperList)
         -> Scheduler {
 
-        Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
+        Scheduler::new_special(event_loop, work_queue,
+                               work_queues,
+                               sleeper_list, true, None)
 
     }
 
@@ -108,6 +118,7 @@ impl Scheduler {
     // task field is None.
     pub fn new_special(event_loop: ~EventLoopObject,
                        work_queue: WorkQueue<~Task>,
+                       work_queues: ~[WorkQueue<~Task>],
                        sleeper_list: SleeperList,
                        run_anything: bool,
                        friend: Option<SchedHandle>)
@@ -120,12 +131,14 @@ impl Scheduler {
             no_sleep: false,
             event_loop: event_loop,
             work_queue: work_queue,
+            work_queues: work_queues,
             stack_pool: StackPool::new(),
             sched_task: None,
             cleanup_job: None,
             metrics: SchedMetrics::new(),
             run_anything: run_anything,
-            friend_handle: friend
+            friend_handle: friend,
+            rng: XorShiftRng::new()
         }
     }
 
@@ -248,7 +261,7 @@ impl Scheduler {
 
         // Second activity is to try resuming a task from the queue.
 
-        let result = sched.resume_task_from_queue();
+        let result = sched.do_work();
         let mut sched = match result {
             Some(sched) => {
                 // Failed to dequeue a task, so we return.
@@ -415,47 +428,98 @@ impl Scheduler {
         }
     }
 
-    // Resume a task from the queue - but also take into account that
-    // it might not belong here.
+    // Workstealing: In this iteration of the runtime each scheduler
+    // thread has a distinct work queue. When no work is available
+    // locally, make a few attempts to steal work from the queues of
+    // other scheduler threads. If a few steals fail we end up in the
+    // old "no work" path which is fine.
+
+    // First step in the process is to find a task. This function does
+    // that by first checking the local queue, and if there is no work
+    // there, trying to steal from the remote work queues.
+    fn find_work(&mut self) -> Option<~Task> {
+        rtdebug!("scheduler looking for work");
+        match self.work_queue.pop() {
+            Some(task) => {
+                rtdebug!("found a task locally");
+                return Some(task)
+            }
+            None => {
+                // Our naive stealing, try kinda hard.
+                rtdebug!("scheduler trying to steal");
+                let _len = self.work_queues.len();
+                return self.try_steals(2);
+            }
+        }
+    }
+
+    // With no backoff try stealing n times from the queues the
+    // scheduler knows about. This naive implementation can steal from
+    // our own queue or from other special schedulers.
+    fn try_steals(&mut self, n: uint) -> Option<~Task> {
+        for _ in range(0, n) {
+            let index = self.rng.gen_uint_range(0, self.work_queues.len());
+            let work_queues = &mut self.work_queues;
+            match work_queues[index].steal() {
+                Some(task) => {
+                    rtdebug!("found task by stealing"); return Some(task)
+                }
+                None => ()
+            }
+        };
+        rtdebug!("giving up on stealing");
+        return None;
+    }
 
-    // If we perform a scheduler action we give away the scheduler ~
-    // pointer, if it is still available we return it.
+    // Given a task, execute it correctly.
+    fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
+        let mut this = self;
+        let mut task = task;
 
-    fn resume_task_from_queue(~self) -> Option<~Scheduler> {
+        rtdebug!("processing a task");
 
+        let home = task.take_unwrap_home();
+        match home {
+            Sched(home_handle) => {
+                if home_handle.sched_id != this.sched_id() {
+                    rtdebug!("sending task home");
+                    task.give_home(Sched(home_handle));
+                    Scheduler::send_task_home(task);
+                    return Some(this);
+                } else {
+                    rtdebug!("running task here");
+                    task.give_home(Sched(home_handle));
+                    this.resume_task_immediately(task);
+                    return None;
+                }
+            }
+            AnySched if this.run_anything => {
+                rtdebug!("running anysched task here");
+                task.give_home(AnySched);
+                this.resume_task_immediately(task);
+                return None;
+            }
+            AnySched => {
+                rtdebug!("sending task to friend");
+                task.give_home(AnySched);
+                this.send_to_friend(task);
+                return Some(this);
+            }
+        }
+    }
+
+    // Bundle the helpers together.
+    fn do_work(~self) -> Option<~Scheduler> {
         let mut this = self;
 
-        match this.work_queue.pop() {
+        rtdebug!("scheduler calling do work");
+        match this.find_work() {
             Some(task) => {
-                let mut task = task;
-                let home = task.take_unwrap_home();
-                match home {
-                    Sched(home_handle) => {
-                        if home_handle.sched_id != this.sched_id() {
-                            task.give_home(Sched(home_handle));
-                            Scheduler::send_task_home(task);
-                            return Some(this);
-                        } else {
-                            this.event_loop.callback(Scheduler::run_sched_once);
-                            task.give_home(Sched(home_handle));
-                            this.resume_task_immediately(task);
-                            return None;
-                        }
-                    }
-                    AnySched if this.run_anything => {
-                        this.event_loop.callback(Scheduler::run_sched_once);
-                        task.give_home(AnySched);
-                        this.resume_task_immediately(task);
-                        return None;
-                    }
-                    AnySched => {
-                        task.give_home(AnySched);
-                        this.send_to_friend(task);
-                        return Some(this);
-                    }
-                }
+                rtdebug!("found some work! processing the task");
+                return this.process_task(task);
             }
             None => {
+                rtdebug!("no work was found, returning the scheduler struct");
                 return Some(this);
             }
         }
@@ -711,7 +775,6 @@ impl Scheduler {
             GiveTask(task, f) => f.to_fn()(self, task)
         }
     }
-
 }
 
 // The cases for the below function.
@@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver {
 
 #[cfg(test)]
 mod test {
+    extern mod extra;
+
     use prelude::*;
     use rt::test::*;
     use unstable::run_in_bare_thread;
@@ -862,12 +927,15 @@ mod test {
         do run_in_bare_thread {
 
             let sleepers = SleeperList::new();
-            let work_queue = WorkQueue::new();
+            let normal_queue = WorkQueue::new();
+            let special_queue = WorkQueue::new();
+            let queues = ~[normal_queue.clone(), special_queue.clone()];
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                normal_queue,
+                queues.clone(),
                 sleepers.clone());
 
             let normal_handle = Cell::new(normal_sched.make_handle());
@@ -877,7 +945,8 @@ mod test {
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                special_queue.clone(),
+                queues.clone(),
                 sleepers.clone(),
                 false,
                 Some(friend_handle));
diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs
index 006b777b71b..07f8ca77d9d 100644
--- a/src/libstd/rt/select.rs
+++ b/src/libstd/rt/select.rs
@@ -182,6 +182,7 @@ mod test {
     fn select_stream() {
         use util;
         use comm::GenericChan;
+        use iter::Times;
 
         // Sends 10 buffered packets, and uses select to retrieve them all.
         // Puts the port in a different spot in the vector each time.
@@ -265,6 +266,7 @@ mod test {
 
         fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
             use rt::test::spawntask_random;
+            use iter::Times;
 
             do run_in_newsched_task {
                 // A bit of stress, since ordinarily this is just smoke and mirrors.
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index 792ea5eb33f..92366d5187f 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -15,8 +15,8 @@ use cell::Cell;
 use clone::Clone;
 use container::Container;
 use iterator::{Iterator, range};
-use vec::{OwnedVector, MutableVector};
 use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use vec::{OwnedVector, MutableVector, ImmutableVector};
 use rt::sched::Scheduler;
 use unstable::run_in_bare_thread;
 use rt::thread::Thread;
@@ -29,8 +29,12 @@ use result::{Result, Ok, Err};
 
 pub fn new_test_uv_sched() -> Scheduler {
 
+    let queue = WorkQueue::new();
+    let queues = ~[queue.clone()];
+
     let mut sched = Scheduler::new(~UvEventLoop::new(),
-                                   WorkQueue::new(),
+                                   queue,
+                                   queues,
                                    SleeperList::new());
 
     // Don't wait for the Shutdown message
@@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
         };
 
         let sleepers = SleeperList::new();
-        let work_queue = WorkQueue::new();
 
         let mut handles = ~[];
         let mut scheds = ~[];
+        let mut work_queues = ~[];
 
         for _ in range(0u, nthreads) {
+            let work_queue = WorkQueue::new();
+            work_queues.push(work_queue);
+        }
+
+        for i in range(0u, nthreads) {
             let loop_ = ~UvEventLoop::new();
             let mut sched = ~Scheduler::new(loop_,
-                                            work_queue.clone(),
+                                            work_queues[i].clone(),
+                                            work_queues.clone(),
                                             sleepers.clone());
             let handle = sched.make_handle();