about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-09-20 18:49:31 -0700
committerBrian Anderson <banderson@mozilla.com>2013-09-25 20:05:56 -0700
commitd209717ddd260a5b0afd0dd07cddde903281f353 (patch)
tree6c04e6cd4444c87239db877c2a4ca4ba3c05d0a2 /src/libstd
parent00db6f6e7be04882b67660adcf04811511d4b8c2 (diff)
downloadrust-d209717ddd260a5b0afd0dd07cddde903281f353.tar.gz
rust-d209717ddd260a5b0afd0dd07cddde903281f353.zip
std::rt: Implement task yielding. Fix a starvation problem
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/comm.rs11
-rw-r--r--src/libstd/rt/sched.rs157
-rw-r--r--src/libstd/task/mod.rs5
3 files changed, 141 insertions, 32 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 6336b1cbe2e..d7b44469177 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -118,6 +118,17 @@ impl<T> ChanOne<T> {
             rtassert!(!rt::in_sched_context());
         }
 
+        // In order to prevent starvation of other tasks in situations
+        // where a task sends repeatedly without ever receiving, we
+        // occassionally yield instead of doing a send immediately.
+        // Only doing this if we're doing a rescheduling send,
+        // otherwise the caller is expecting not to context switch.
+        if do_resched {
+            // XXX: This TLS hit should be combined with other uses of the scheduler below
+            let sched: ~Scheduler = Local::take();
+            sched.maybe_yield();
+        }
+
         let mut this = self;
         let mut recvr_active = true;
         let packet = this.packet();
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index c3841760de3..a62d3335644 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -26,7 +26,7 @@ use rt::local::Local;
 use rt::rtio::{RemoteCallback, PausibleIdleCallback};
 use borrow::{to_uint};
 use cell::Cell;
-use rand::{XorShiftRng, Rng};
+use rand::{XorShiftRng, Rng, Rand};
 use iter::range;
 use vec::{OwnedVector};
 
@@ -78,7 +78,14 @@ pub struct Scheduler {
     /// A fast XorShift rng for scheduler use
     rng: XorShiftRng,
     /// A toggleable idle callback
-    idle_callback: Option<~PausibleIdleCallback>
+    idle_callback: Option<~PausibleIdleCallback>,
+    /// A countdown that starts at a random value and is decremented
+    /// every time a yield check is performed. When it hits 0 a task
+    /// will yield.
+    yield_check_count: uint,
+    /// A flag to tell the scheduler loop it needs to do some stealing
+    /// in order to introduce randomness as part of a yield
+    steal_for_yield: bool
 }
 
 /// An indication of how hard to work on a given operation, the difference
@@ -89,6 +96,13 @@ enum EffortLevel {
     GiveItYourBest
 }
 
+static MAX_YIELD_CHECKS: uint = 200;
+
+fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
+    let r: uint = Rand::rand(rng);
+    r % MAX_YIELD_CHECKS + 1
+}
+
 impl Scheduler {
 
     // * Initialization Functions
@@ -113,7 +127,7 @@ impl Scheduler {
                        friend: Option<SchedHandle>)
         -> Scheduler {
 
-        Scheduler {
+        let mut sched = Scheduler {
             sleeper_list: sleeper_list,
             message_queue: MessageQueue::new(),
             sleepy: false,
@@ -127,8 +141,14 @@ impl Scheduler {
             run_anything: run_anything,
             friend_handle: friend,
             rng: XorShiftRng::new(),
-            idle_callback: None
-        }
+            idle_callback: None,
+            yield_check_count: 0,
+            steal_for_yield: false
+        };
+
+        sched.yield_check_count = reset_yield_check(&mut sched.rng);
+
+        return sched;
     }
 
     // XXX: This may eventually need to be refactored so that
@@ -307,8 +327,7 @@ impl Scheduler {
             }
             Some(TaskFromFriend(task)) => {
                 rtdebug!("got a task from a friend. lovely!");
-                this.process_task(task,
-                                  Scheduler::resume_task_immediately_cl).map_move(Local::put);
+                this.process_task(task, Scheduler::resume_task_immediately_cl);
                 return None;
             }
             Some(Wake) => {
@@ -352,8 +371,8 @@ impl Scheduler {
         match this.find_work() {
             Some(task) => {
                 rtdebug!("found some work! processing the task");
-                return this.process_task(task,
-                                         Scheduler::resume_task_immediately_cl);
+                this.process_task(task, Scheduler::resume_task_immediately_cl);
+                return None;
             }
             None => {
                 rtdebug!("no work was found, returning the scheduler struct");
@@ -373,14 +392,35 @@ impl Scheduler {
     // there, trying to steal from the remote work queues.
     fn find_work(&mut self) -> Option<~Task> {
         rtdebug!("scheduler looking for work");
-        match self.work_queue.pop() {
-            Some(task) => {
-                rtdebug!("found a task locally");
-                return Some(task)
+        if !self.steal_for_yield {
+            match self.work_queue.pop() {
+                Some(task) => {
+                    rtdebug!("found a task locally");
+                    return Some(task)
+                }
+                None => {
+                    rtdebug!("scheduler trying to steal");
+                    return self.try_steals();
+                }
             }
-            None => {
-                rtdebug!("scheduler trying to steal");
-                return self.try_steals();
+        } else {
+            // During execution of the last task, it performed a 'yield',
+            // so we're doing some work stealing in order to introduce some
+            // scheduling randomness. Otherwise we would just end up popping
+            // that same task again. This is pretty lame and is to work around
+            // the problem that work stealing is not designed for 'non-strict'
+            // (non-fork-join) task parallelism.
+            self.steal_for_yield = false;
+            match self.try_steals() {
+                Some(task) => {
+                    rtdebug!("stole a task after yielding");
+                    return Some(task);
+                }
+                None => {
+                    rtdebug!("did not steal a task after yielding");
+                    // Back to business
+                    return self.find_work();
+                }
             }
         }
     }
@@ -409,7 +449,7 @@ impl Scheduler {
     // place.
 
     fn process_task(~self, task: ~Task,
-                    schedule_fn: SchedulingFn) -> Option<~Scheduler> {
+                    schedule_fn: SchedulingFn) {
         let mut this = self;
         let mut task = task;
 
@@ -422,23 +462,23 @@ impl Scheduler {
                     rtdebug!("sending task home");
                     task.give_home(Sched(home_handle));
                     Scheduler::send_task_home(task);
-                    return Some(this);
+                    Local::put(this);
                 } else {
                     rtdebug!("running task here");
                     task.give_home(Sched(home_handle));
-                    return schedule_fn(this, task);
+                    schedule_fn(this, task);
                 }
             }
             AnySched if this.run_anything => {
                 rtdebug!("running anysched task here");
                 task.give_home(AnySched);
-                return schedule_fn(this, task);
+                schedule_fn(this, task);
             }
             AnySched => {
                 rtdebug!("sending task to friend");
                 task.give_home(AnySched);
                 this.send_to_friend(task);
-                return Some(this);
+                Local::put(this);
             }
         }
     }
@@ -607,15 +647,14 @@ impl Scheduler {
 
     // * Context Swapping Helpers - Here be ugliness!
 
-    pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
+    pub fn resume_task_immediately(~self, task: ~Task) {
         do self.change_task_context(task) |sched, stask| {
             sched.sched_task = Some(stask);
         }
-        return None;
     }
 
     fn resume_task_immediately_cl(sched: ~Scheduler,
-                                  task: ~Task) -> Option<~Scheduler> {
+                                  task: ~Task) {
         sched.resume_task_immediately(task)
     }
 
@@ -662,11 +701,10 @@ impl Scheduler {
         }
     }
 
-    fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
+    fn switch_task(sched: ~Scheduler, task: ~Task) {
         do sched.switch_running_tasks_and_then(task) |sched, last_task| {
             sched.enqueue_blocked_task(last_task);
         };
-        return None;
     }
 
     // * Task Context Helpers
@@ -686,7 +724,7 @@ impl Scheduler {
 
     pub fn run_task(task: ~Task) {
         let sched: ~Scheduler = Local::take();
-        sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
+        sched.process_task(task, Scheduler::switch_task);
     }
 
     pub fn run_task_later(next_task: ~Task) {
@@ -696,6 +734,33 @@ impl Scheduler {
         };
     }
 
+    /// Yield control to the scheduler, executing another task. This is guaranteed
+    /// to introduce some amount of randomness to the scheduler. Currently the
+    /// randomness is a result of performing a round of work stealing (which
+    /// may end up stealing from the current scheduler).
+    pub fn yield_now(~self) {
+        let mut this = self;
+        this.yield_check_count = reset_yield_check(&mut this.rng);
+        // Tell the scheduler to start stealing on the next iteration
+        this.steal_for_yield = true;
+        do this.deschedule_running_task_and_then |sched, task| {
+            sched.enqueue_blocked_task(task);
+        }
+    }
+
+    pub fn maybe_yield(~self) {
+        // The number of times to do the yield check before yielding, chosen arbitrarily.
+        let mut this = self;
+        rtassert!(this.yield_check_count > 0);
+        this.yield_check_count -= 1;
+        if this.yield_check_count == 0 {
+            this.yield_now();
+        } else {
+            Local::put(this);
+        }
+    }
+
+
     // * Utility Functions
 
     pub fn sched_id(&self) -> uint { to_uint(self) }
@@ -718,7 +783,7 @@ impl Scheduler {
 
 // Supporting types
 
-type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
+type SchedulingFn = ~fn(~Scheduler, ~Task);
 
 pub enum SchedMessage {
     Wake,
@@ -1231,4 +1296,40 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn dont_starve_2() {
+        use rt::comm::oneshot;
+
+        do stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot();
+                let (_port2, chan2) = stream();
+
+                // This task should not be able to starve the other task.
+                // The sends should eventually yield.
+                do spawntask {
+                    while !port.peek() {
+                        chan2.send(());
+                    }
+                }
+
+                chan.send(());
+            }
+        }
+    }
+
+    // Regression test for a logic bug that would cause single-threaded schedulers
+    // to sleep forever after yielding and stealing another task.
+    #[test]
+    fn single_threaded_yield() {
+        use task::{spawn, spawn_sched, SingleThreaded, deschedule};
+        use num::Times;
+
+        do spawn_sched(SingleThreaded) {
+            do 5.times { deschedule(); }
+        }
+        do spawn { }
+        do spawn { }
+    }
 }
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index 1dbc644c8e5..e6f6536956c 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -542,12 +542,9 @@ pub fn deschedule() {
     use rt::local::Local;
     use rt::sched::Scheduler;
 
-    // FIXME #6842: What does yield really mean in newsched?
     // FIXME(#7544): Optimize this, since we know we won't block.
     let sched: ~Scheduler = Local::take();
-    do sched.deschedule_running_task_and_then |sched, task| {
-        sched.enqueue_blocked_task(task);
-    }
+    sched.yield_now();
 }
 
 pub fn failing() -> bool {