about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-01-08 19:46:12 -0800
committerBrian Anderson <banderson@mozilla.com>2013-01-15 18:00:19 -0800
commit090b247056a9dd2d4d4a32c631fe2f0ddd3e744d (patch)
treeb7b9d7ab483fa7150480f90c9a5ad69905753685
parent989667e545cb45265d3cb4b19500224459c3bf8c (diff)
downloadrust-090b247056a9dd2d4d4a32c631fe2f0ddd3e744d.tar.gz
rust-090b247056a9dd2d4d4a32c631fe2f0ddd3e744d.zip
Spawn new tasks onto the primary scheduler by default. #3760
-rw-r--r--src/libcore/task/mod.rs100
-rw-r--r--src/libcore/task/spawn.rs23
-rw-r--r--src/rt/rust.cpp4
-rw-r--r--src/rt/rust_builtin.cpp5
-rw-r--r--src/rt/rust_kernel.cpp91
-rw-r--r--src/rt/rust_kernel.h11
6 files changed, 145 insertions, 89 deletions
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs
index c6b0491786d..86d38a18c50 100644
--- a/src/libcore/task/mod.rs
+++ b/src/libcore/task/mod.rs
@@ -52,7 +52,7 @@ use prelude::*;
 use ptr;
 use result;
 use task::local_data_priv::{local_get, local_set};
-use task::rt::{task_id, rust_task};
+use task::rt::{task_id, sched_id, rust_task};
 use task;
 use util;
 use util::replace;
@@ -62,6 +62,12 @@ pub mod local_data;
 pub mod rt;
 pub mod spawn;
 
+/// A handle to a scheduler
+#[deriving_eq]
+pub enum Scheduler {
+    SchedulerHandle(sched_id)
+}
+
 /// A handle to a task
 #[deriving_eq]
 pub enum Task {
@@ -95,7 +101,21 @@ impl TaskResult : Eq {
 }
 
 /// Scheduler modes
+#[deriving_eq]
 pub enum SchedMode {
+    /// Run task on the default scheduler
+    DefaultScheduler,
+    /// Run task on the current scheduler
+    CurrentScheduler,
+    /// Run task on a specific scheduler
+    ExistingScheduler(Scheduler),
+    /**
+     * Tasks are scheduled on the main OS thread
+     *
+     * The main OS thread is the thread used to launch the runtime which,
+     * in most cases, is the process's initial thread as created by the OS.
+     */
+    PlatformThread,
     /// All tasks run in the same OS thread
     SingleThreaded,
     /// Tasks are distributed among available CPUs
@@ -104,53 +124,6 @@ pub enum SchedMode {
     ThreadPerTask,
     /// Tasks are distributed among a fixed number of OS threads
     ManualThreads(uint),
-    /**
-     * Tasks are scheduled on the main OS thread
-     *
-     * The main OS thread is the thread used to launch the runtime which,
-     * in most cases, is the process's initial thread as created by the OS.
-     */
-    PlatformThread
-}
-
-impl SchedMode : cmp::Eq {
-    pure fn eq(&self, other: &SchedMode) -> bool {
-        match (*self) {
-            SingleThreaded => {
-                match (*other) {
-                    SingleThreaded => true,
-                    _ => false
-                }
-            }
-            ThreadPerCore => {
-                match (*other) {
-                    ThreadPerCore => true,
-                    _ => false
-                }
-            }
-            ThreadPerTask => {
-                match (*other) {
-                    ThreadPerTask => true,
-                    _ => false
-                }
-            }
-            ManualThreads(e0a) => {
-                match (*other) {
-                    ManualThreads(e0b) => e0a == e0b,
-                    _ => false
-                }
-            }
-            PlatformThread => {
-                match (*other) {
-                    PlatformThread => true,
-                    _ => false
-                }
-            }
-        }
-    }
-    pure fn ne(&self, other: &SchedMode) -> bool {
-        !(*self).eq(other)
-    }
 }
 
 /**
@@ -204,7 +177,7 @@ pub type TaskOpts = {
     linked: bool,
     supervised: bool,
     mut notify_chan: Option<Chan<TaskResult>>,
-    sched: Option<SchedOpts>,
+    sched: SchedOpts,
 };
 
 /**
@@ -370,7 +343,7 @@ impl TaskBuilder {
                 linked: self.opts.linked,
                 supervised: self.opts.supervised,
                 mut notify_chan: move notify_chan,
-                sched: Some({ mode: mode, foreign_stack_size: None})
+                sched: { mode: mode, foreign_stack_size: None}
             },
             can_not_copy: None,
             .. self.consume()
@@ -486,7 +459,10 @@ pub fn default_task_opts() -> TaskOpts {
         linked: true,
         supervised: false,
         mut notify_chan: None,
-        sched: None
+        sched: {
+            mode: DefaultScheduler,
+            foreign_stack_size: None
+        }
     }
 }
 
@@ -539,10 +515,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) {
 
 pub fn spawn_sched(mode: SchedMode, f: fn~()) {
     /*!
-     * Creates a new scheduler and executes a task on it
-     *
-     * Tasks subsequently spawned by that task will also execute on
-     * the new scheduler. When there are no more tasks to execute the
+     * Creates a new task on a new or existing scheduler
+
+     * When there are no more tasks to execute the
      * scheduler terminates.
      *
      * # Failure
@@ -590,6 +565,10 @@ pub fn get_task() -> Task {
     TaskHandle(rt::get_task_id())
 }
 
+pub fn get_scheduler() -> Scheduler {
+    SchedulerHandle(rt::rust_get_sched_id())
+}
+
 /**
  * Temporarily make the task unkillable
  *
@@ -927,16 +906,19 @@ fn test_spawn_sched() {
 }
 
 #[test]
-fn test_spawn_sched_childs_on_same_sched() {
+fn test_spawn_sched_childs_on_default_sched() {
     let po = oldcomm::Port();
     let ch = oldcomm::Chan(&po);
 
+    // Assuming tests run on the default scheduler
+    let default_id = rt::rust_get_sched_id();
+
     do spawn_sched(SingleThreaded) {
         let parent_sched_id = rt::rust_get_sched_id();
         do spawn {
             let child_sched_id = rt::rust_get_sched_id();
-            // This should be on the same scheduler
-            assert parent_sched_id == child_sched_id;
+            assert parent_sched_id != child_sched_id;
+            assert child_sched_id == default_id;
             oldcomm::send(ch, ());
         };
     };
@@ -1206,7 +1188,7 @@ fn test_spawn_thread_on_demand() {
 
         let (port2, chan2) = pipes::stream();
 
-        do spawn() |move chan2| {
+        do spawn_sched(CurrentScheduler) |move chan2| {
             chan2.send(());
         }
 
diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs
index 1c5531303e1..e3afa7c4535 100644
--- a/src/libcore/task/spawn.rs
+++ b/src/libcore/task/spawn.rs
@@ -88,6 +88,7 @@ use task::rt::rust_closure;
 use task::rt;
 use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
 use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask};
+use task::{ExistingScheduler, SchedulerHandle};
 use task::{default_task_opts, unkillable};
 use uint;
 use util;
@@ -525,9 +526,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
             // Agh. Get move-mode items into the closure. FIXME (#2829)
             let (child_tg, ancestors, f) = option::swap_unwrap(child_data);
             // Create child task.
-            let new_task = match opts.sched {
-              None             => rt::new_task(),
-              Some(sched_opts) => new_task_in_new_sched(sched_opts)
+            let new_task = match opts.sched.mode {
+                DefaultScheduler => rt::new_task(),
+                _ => new_task_in_sched(opts.sched)
             };
             assert !new_task.is_null();
             // Getting killed after here would leak the task.
@@ -631,12 +632,16 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
         }
     }
 
-    fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task {
+    fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
         if opts.foreign_stack_size != None {
             fail ~"foreign_stack_size scheduler option unimplemented";
         }
 
         let num_threads = match opts.mode {
+          DefaultScheduler
+          | CurrentScheduler
+          | ExistingScheduler(*)
+          | PlatformThread => 0u, /* Won't be used */
           SingleThreaded => 1u,
           ThreadPerCore => rt::rust_num_threads(),
           ThreadPerTask => {
@@ -648,13 +653,13 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
             }
             threads
           }
-          PlatformThread => 0u /* Won't be used */
         };
 
-        let sched_id = if opts.mode != PlatformThread {
-            rt::rust_new_sched(num_threads)
-        } else {
-            rt::rust_osmain_sched_id()
+        let sched_id = match opts.mode {
+            CurrentScheduler => rt::rust_get_sched_id(),
+            ExistingScheduler(SchedulerHandle(id)) => id,
+            PlatformThread => rt::rust_osmain_sched_id(),
+            _ => rt::rust_new_sched(num_threads)
         };
         rt::rust_new_task_in_sched(sched_id)
     }
diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp
index f21a7441640..803da32cbc8 100644
--- a/src/rt/rust.cpp
+++ b/src/rt/rust.cpp
@@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
 
     rust_kernel *kernel = new rust_kernel(env);
 
-    // Create the main scheduler and the main task
-    rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads);
+    // Create the main task
+    rust_sched_id sched_id = kernel->main_sched_id();
     rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
     assert(sched != NULL);
     rust_task *root_task = sched->create_task(NULL, "main");
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index de69272aca1..cbc58e85db6 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) {
 extern "C" CDECL rust_task*
 new_task() {
     rust_task *task = rust_get_current_task();
-    return new_task_common(task->sched, task);
+    rust_sched_id sched_id = task->kernel->main_sched_id();
+    rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id);
+    assert(sched != NULL && "should always have a main scheduler");
+    return new_task_common(sched, task);
 }
 
 extern "C" CDECL rust_task*
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 8871d133ea1..cc98b474ee3 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -30,6 +30,7 @@ rust_kernel::rust_kernel(rust_env *env) :
     rval(0),
     max_sched_id(1),
     killed(false),
+    already_exiting(false),
     sched_reaper(this),
     osmain_driver(NULL),
     non_weak_tasks(0),
@@ -38,13 +39,20 @@ rust_kernel::rust_kernel(rust_env *env) :
     env(env)
 
 {
-
     // Create the single threaded scheduler that will run on the platform's
     // main thread
-    rust_manual_sched_launcher_factory *launchfac =
+    rust_manual_sched_launcher_factory *osmain_launchfac =
         new rust_manual_sched_launcher_factory();
-    osmain_scheduler = create_scheduler(launchfac, 1, false);
-    osmain_driver = launchfac->get_driver();
+    osmain_scheduler = create_scheduler(osmain_launchfac, 1, false);
+    osmain_driver = osmain_launchfac->get_driver();
+
+    // Create the primary scheduler
+    rust_thread_sched_launcher_factory *main_launchfac =
+        new rust_thread_sched_launcher_factory();
+    main_scheduler = create_scheduler(main_launchfac,
+                                      env->num_sched_threads,
+                                      false);
+
     sched_reaper.start();
 }
 
@@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac,
     {
         scoped_lock with(sched_lock);
 
+        /*if (sched_table.size() == 2) {
+            // The main and OS main schedulers may not exit while there are
+            // other schedulers
+            KLOG_("Disallowing main scheduler to exit");
+            rust_scheduler *main_sched =
+                get_scheduler_by_id_nolock(main_scheduler);
+            assert(main_sched != NULL);
+            main_sched->disallow_exit();
+        }
         if (sched_table.size() == 1) {
-            // The OS main scheduler may not exit while there are other
-            // schedulers
             KLOG_("Disallowing osmain scheduler to exit");
-            rust_scheduler *sched =
+            rust_scheduler *osmain_sched =
                 get_scheduler_by_id_nolock(osmain_scheduler);
-            assert(sched != NULL);
-            sched->disallow_exit();
-        }
+            assert(osmain_sched != NULL);
+            osmain_sched->disallow_exit();
+            }*/
 
         id = max_sched_id++;
         assert(id != INTPTR_MAX && "Hit the maximum scheduler id");
@@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers()
             sched_table.erase(iter);
             sched->join_task_threads();
             sched->deref();
+            /*if (sched_table.size() == 2) {
+                KLOG_("Allowing main scheduler to exit");
+                // It's only the main schedulers left. Tell them to exit
+                rust_scheduler *main_sched =
+                    get_scheduler_by_id_nolock(main_scheduler);
+                assert(main_sched != NULL);
+                main_sched->allow_exit();
+            }
             if (sched_table.size() == 1) {
                 KLOG_("Allowing osmain scheduler to exit");
-                // It's only the osmain scheduler left. Tell it to exit
-                rust_scheduler *sched =
+                rust_scheduler *osmain_sched =
                     get_scheduler_by_id_nolock(osmain_scheduler);
-                assert(sched != NULL);
-                sched->allow_exit();
-            }
+                assert(osmain_sched != NULL);
+                osmain_sched->allow_exit();
+            }*/
         }
         if (!sched_table.empty()) {
             sched_lock.wait();
@@ -319,12 +341,30 @@ rust_kernel::register_task() {
 }
 
 void
+rust_kernel::allow_scheduler_exit() {
+    scoped_lock with(sched_lock);
+
+    KLOG_("Allowing main scheduler to exit");
+    // It's only the main schedulers left. Tell them to exit
+    rust_scheduler *main_sched =
+        get_scheduler_by_id_nolock(main_scheduler);
+    assert(main_sched != NULL);
+    main_sched->allow_exit();
+
+    KLOG_("Allowing osmain scheduler to exit");
+    rust_scheduler *osmain_sched =
+        get_scheduler_by_id_nolock(osmain_scheduler);
+    assert(osmain_sched != NULL);
+    osmain_sched->allow_exit();
+}
+
+void
 rust_kernel::unregister_task() {
     KLOG_("Unregistering task");
     uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
     KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
     if (new_non_weak_tasks == 0) {
-        end_weak_tasks();
+        begin_shutdown();
     }
 }
 
@@ -338,7 +378,7 @@ rust_kernel::weaken_task(rust_port_id chan) {
     uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
     KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
     if (new_non_weak_tasks == 0) {
-        end_weak_tasks();
+        begin_shutdown();
     }
 }
 
@@ -374,6 +414,23 @@ rust_kernel::end_weak_tasks() {
     }
 }
 
+void
+rust_kernel::begin_shutdown() {
+    {
+        scoped_lock with(sched_lock);
+        // FIXME #4410: This shouldn't be necessary, but because of
+        // unweaken_task this may end up getting called multiple times.
+        if (already_exiting) {
+            return;
+        } else {
+            already_exiting = true;
+        }
+    }
+
+    allow_scheduler_exit();
+    end_weak_tasks();
+}
+
 bool
 rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
     KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan);
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index cd52bfae8d3..13fd8934172 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -82,7 +82,8 @@ class rust_kernel {
     lock_and_signal rval_lock;
     int rval;
 
-    // Protects max_sched_id and sched_table, join_list, killed
+    // Protects max_sched_id and sched_table, join_list, killed,
+    // already_exiting
     lock_and_signal sched_lock;
     // The next scheduler id
     rust_sched_id max_sched_id;
@@ -95,8 +96,13 @@ class rust_kernel {
     // task group fails). This propagates to all new schedulers and tasks
     // created after it is set.
     bool killed;
+    bool already_exiting;
+
 
     rust_sched_reaper sched_reaper;
+
+    // The primary scheduler
+    rust_sched_id main_scheduler;
     // The single-threaded scheduler that uses the main thread
     rust_sched_id osmain_scheduler;
     // Runs the single-threaded scheduler that executes tasks
@@ -111,7 +117,9 @@ class rust_kernel {
     std::vector<rust_port_id> weak_task_chans;
 
     rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
+    void allow_scheduler_exit();
     void end_weak_tasks();
+    void begin_shutdown();
 
     // Used to communicate with the process-side, global libuv loop
     uintptr_t global_loop_chan;
@@ -155,6 +163,7 @@ public:
 
     void set_exit_status(int code);
 
+    rust_sched_id main_sched_id() { return main_scheduler; }
     rust_sched_id osmain_sched_id() { return osmain_scheduler; }
 
     void register_task();