about summary refs log tree commit diff
diff options
context:
space:
mode:
authorPhilipp Brüschweiler <blei42@gmail.com>2012-09-14 15:01:17 +0200
committerBrian Anderson <banderson@mozilla.com>2012-09-19 14:01:53 -0700
commit68e755b1c26db09cf8e121bbbea2075f6116e279 (patch)
tree8d48dc089bb9effd2090117fd6033c4ca81470d1
parent35a935377483823ca1fbaede5a87406b494b0488 (diff)
downloadrust-68e755b1c26db09cf8e121bbbea2075f6116e279.tar.gz
rust-68e755b1c26db09cf8e121bbbea2075f6116e279.zip
core: Allocate threads on demand, not on scheduler startup
API change: rust_kernel::create_scheduler() or
rust_scheduler::rust_scheduler() respecitevly now take ownership of the
launch factory argument, it is needed to create new threads on demand.

Also renames rustrt::sched_threads() to rustrt::rust_sched_threads() for
consistency. Added rustrt::rust_max_sched_threads() to return the
maximal number of scheduled threads of the current scheduler.

Fixes #3493.
-rw-r--r--src/libcore/task.rs31
-rw-r--r--src/libstd/test.rs4
-rw-r--r--src/rt/rust_builtin.cpp8
-rw-r--r--src/rt/rust_kernel.cpp12
-rw-r--r--src/rt/rust_scheduler.cpp75
-rw-r--r--src/rt/rust_scheduler.h13
-rw-r--r--src/rt/rustrt.def.in3
-rw-r--r--src/test/run-pass/morestack6.rs4
8 files changed, 102 insertions, 48 deletions
diff --git a/src/libcore/task.rs b/src/libcore/task.rs
index 070da8ffd4a..d9b5eb15a71 100644
--- a/src/libcore/task.rs
+++ b/src/libcore/task.rs
@@ -1661,7 +1661,8 @@ extern mod rustrt {
 
     fn rust_get_sched_id() -> sched_id;
     fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id;
-    fn sched_threads() -> libc::size_t;
+    fn rust_max_sched_threads() -> libc::size_t;
+    fn rust_sched_threads() -> libc::size_t;
     fn rust_num_threads() -> libc::uintptr_t;
 
     fn get_task_id() -> task_id;
@@ -2435,10 +2436,36 @@ fn test_sched_thread_per_core() {
 
     do spawn_sched(ThreadPerCore) {
         let cores = rustrt::rust_num_threads();
-        let reported_threads = rustrt::sched_threads();
+        let reported_threads = rustrt::rust_max_sched_threads();
         assert(cores as uint == reported_threads as uint);
         chan.send(());
     }
 
     port.recv();
 }
+
+#[test]
+fn test_spawn_thread_on_demand() {
+    let (chan, port) = pipes::stream();
+
+    do spawn_sched(ManualThreads(2)) {
+        let max_threads = rustrt::rust_max_sched_threads();
+        assert(max_threads as int == 2);
+        let running_threads = rustrt::rust_sched_threads();
+        assert(running_threads as int == 1);
+
+        let (chan2, port2) = pipes::stream();
+
+        do spawn() {
+            chan2.send(());
+        }
+
+        let running_threads2 = rustrt::rust_sched_threads();
+        assert(running_threads2 as int == 2);
+
+        port2.recv();
+        chan.send(());
+    }
+
+    port.recv();
+}
diff --git a/src/libstd/test.rs b/src/libstd/test.rs
index 8692a9a440a..ad0003934e4 100644
--- a/src/libstd/test.rs
+++ b/src/libstd/test.rs
@@ -26,7 +26,7 @@ export run_tests_console;
 
 #[abi = "cdecl"]
 extern mod rustrt {
-    fn sched_threads() -> libc::size_t;
+    fn rust_max_sched_threads() -> libc::size_t;
 }
 
 // The name of a test. By convention this follows the rules for rust
@@ -327,7 +327,7 @@ const sched_overcommit : uint = 1u;
 const sched_overcommit : uint = 4u;
 
 fn get_concurrency() -> uint {
-    let threads = rustrt::sched_threads() as uint;
+    let threads = rustrt::rust_max_sched_threads() as uint;
     if threads == 1u { 1u }
     else { threads * sched_overcommit }
 }
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index a601908359c..f1c2afc0f4b 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -627,11 +627,17 @@ start_task(rust_task *target, fn_env_pair *f) {
 }
 
 extern "C" CDECL size_t
-sched_threads() {
+rust_sched_threads() {
     rust_task *task = rust_get_current_task();
     return task->sched->number_of_threads();
 }
 
+extern "C" CDECL size_t
+rust_max_sched_threads() {
+    rust_task *task = rust_get_current_task();
+    return task->sched->max_number_of_threads();
+}
+
 extern "C" CDECL rust_port*
 rust_port_take(rust_port_id id) {
     rust_task *task = rust_get_current_task();
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 698ee866728..669ebd55a7c 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -31,9 +31,10 @@ rust_kernel::rust_kernel(rust_env *env) :
 
     // Create the single threaded scheduler that will run on the platform's
     // main thread
-    rust_manual_sched_launcher_factory launchfac;
-    osmain_scheduler = create_scheduler(&launchfac, 1, false);
-    osmain_driver = launchfac.get_driver();
+    rust_manual_sched_launcher_factory *launchfac =
+        new rust_manual_sched_launcher_factory();
+    osmain_scheduler = create_scheduler(launchfac, 1, false);
+    osmain_driver = launchfac->get_driver();
     sched_reaper.start();
 }
 
@@ -79,8 +80,9 @@ void rust_kernel::free(void *mem) {
 
 rust_sched_id
 rust_kernel::create_scheduler(size_t num_threads) {
-    rust_thread_sched_launcher_factory launchfac;
-    return create_scheduler(&launchfac, num_threads, true);
+    rust_thread_sched_launcher_factory *launchfac =
+        new rust_thread_sched_launcher_factory();
+    return create_scheduler(launchfac, num_threads, true);
 }
 
 rust_sched_id
diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp
index 5dd1a261c0e..aa288cf3b94 100644
--- a/src/rt/rust_scheduler.cpp
+++ b/src/rt/rust_scheduler.cpp
@@ -6,34 +6,39 @@
 #include "rust_sched_launcher.h"
 
 rust_scheduler::rust_scheduler(rust_kernel *kernel,
-                               size_t num_threads,
+                               size_t max_num_threads,
                                rust_sched_id id,
                                bool allow_exit,
                                bool killed,
                                rust_sched_launcher_factory *launchfac) :
     ref_count(1),
     kernel(kernel),
-    live_threads(num_threads),
+    live_threads(0),
     live_tasks(0),
     cur_thread(0),
     may_exit(allow_exit),
-    num_threads(num_threads),
+    killed(killed),
+    launchfac(launchfac),
+    max_num_threads(max_num_threads),
     id(id)
 {
-    create_task_threads(launchfac, killed);
+    // Create the first thread
+    threads.push(create_task_thread(0));
 }
 
 void rust_scheduler::delete_this() {
     destroy_task_threads();
+    delete launchfac;
     delete this;
 }
 
 rust_sched_launcher *
-rust_scheduler::create_task_thread(rust_sched_launcher_factory *launchfac,
-                                   int id, bool killed) {
+rust_scheduler::create_task_thread(int id) {
+    live_threads++;
     rust_sched_launcher *thread = launchfac->create(this, id, killed);
-    KLOG(kernel, kern, "created task thread: " PTR ", id: %d",
-          thread, id);
+    KLOG(kernel, kern, "created task thread: " PTR
+         ", id: %d, live_threads: %d",
+         thread, id, live_threads);
     return thread;
 }
 
@@ -44,18 +49,8 @@ rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) {
 }
 
 void
-rust_scheduler::create_task_threads(rust_sched_launcher_factory *launchfac,
-                                    bool killed) {
-    KLOG(kernel, kern, "Using %d scheduler threads.", num_threads);
-
-    for(size_t i = 0; i < num_threads; ++i) {
-        threads.push(create_task_thread(launchfac, i, killed));
-    }
-}
-
-void
 rust_scheduler::destroy_task_threads() {
-    for(size_t i = 0; i < num_threads; ++i) {
+    for(size_t i = 0; i < threads.size(); ++i) {
         destroy_task_thread(threads[i]);
     }
 }
@@ -63,7 +58,7 @@ rust_scheduler::destroy_task_threads() {
 void
 rust_scheduler::start_task_threads()
 {
-    for(size_t i = 0; i < num_threads; ++i) {
+    for(size_t i = 0; i < threads.size(); ++i) {
         rust_sched_launcher *thread = threads[i];
         thread->start();
     }
@@ -72,7 +67,7 @@ rust_scheduler::start_task_threads()
 void
 rust_scheduler::join_task_threads()
 {
-    for(size_t i = 0; i < num_threads; ++i) {
+    for(size_t i = 0; i < threads.size(); ++i) {
         rust_sched_launcher *thread = threads[i];
         thread->join();
     }
@@ -80,7 +75,7 @@ rust_scheduler::join_task_threads()
 
 void
 rust_scheduler::kill_all_tasks() {
-    for(size_t i = 0; i < num_threads; ++i) {
+    for(size_t i = 0; i < threads.size(); ++i) {
         rust_sched_launcher *thread = threads[i];
         thread->get_loop()->kill_all_tasks();
     }
@@ -92,10 +87,29 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
     {
         scoped_lock with(lock);
         live_tasks++;
-        thread_no = cur_thread++;
-        if (cur_thread >= num_threads)
-            cur_thread = 0;
+
+        // Find unoccupied thread
+        for (thread_no = 0; thread_no < threads.size(); ++thread_no) {
+            if (threads[thread_no]->get_loop()->number_of_live_tasks() == 0)
+                break;
+        }
+
+        if (thread_no == threads.size()) {
+            if (threads.size() < max_num_threads) {
+                // Else create new thread
+                thread_no = threads.size();
+                rust_sched_launcher *thread = create_task_thread(thread_no);
+                thread->start();
+                threads.push(thread);
+            } else {
+                // Or use round robin allocation
+                thread_no = cur_thread++;
+                if (cur_thread >= max_num_threads)
+                    cur_thread = 0;
+            }
+        }
     }
+    KLOG(kernel, kern, "Creating task %s, on thread %d.", name, thread_no);
     kernel->register_task();
     rust_sched_launcher *thread = threads[thread_no];
     return thread->get_loop()->create_task(spawner, name);
@@ -119,17 +133,22 @@ rust_scheduler::release_task() {
 
 void
 rust_scheduler::exit() {
-    // Take a copy of num_threads. After the last thread exits this
+    // Take a copy of the number of threads. After the last thread exits this
     // scheduler will get destroyed, and our fields will cease to exist.
-    size_t current_num_threads = num_threads;
+    size_t current_num_threads = threads.size();
     for(size_t i = 0; i < current_num_threads; ++i) {
         threads[i]->get_loop()->exit();
     }
 }
 
 size_t
+rust_scheduler::max_number_of_threads() {
+    return max_num_threads;
+}
+
+size_t
 rust_scheduler::number_of_threads() {
-    return num_threads;
+    return threads.size();
 }
 
 void
diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h
index 767ecaf7d1e..019f69f7a31 100644
--- a/src/rt/rust_scheduler.h
+++ b/src/rt/rust_scheduler.h
@@ -30,19 +30,17 @@ private:
     uintptr_t live_tasks;
     size_t cur_thread;
     bool may_exit;
+    bool killed;
 
+    rust_sched_launcher_factory *launchfac;
     array_list<rust_sched_launcher *> threads;
-    const size_t num_threads;
+    const size_t max_num_threads;
 
     rust_sched_id id;
 
-    void create_task_threads(rust_sched_launcher_factory *launchfac,
-                             bool killed);
     void destroy_task_threads();
 
-    rust_sched_launcher *
-    create_task_thread(rust_sched_launcher_factory *launchfac, int id,
-                       bool killed);
+    rust_sched_launcher *create_task_thread(int id);
     void destroy_task_thread(rust_sched_launcher *thread);
 
     void exit();
@@ -51,7 +49,7 @@ private:
     void delete_this();
 
 public:
-    rust_scheduler(rust_kernel *kernel, size_t num_threads,
+    rust_scheduler(rust_kernel *kernel, size_t max_num_threads,
                    rust_sched_id id, bool allow_exit, bool killed,
                    rust_sched_launcher_factory *launchfac);
 
@@ -62,6 +60,7 @@ public:
 
     void release_task();
 
+    size_t max_number_of_threads();
     size_t number_of_threads();
     // Called by each thread when it terminates. When all threads
     // terminate the scheduler does as well.
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index cb2f36fe31b..e0930dbf753 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -30,6 +30,7 @@ rand_new_seeded
 rand_next
 rand_seed
 rust_get_sched_id
+rust_max_sched_threads
 rust_new_sched
 rust_new_task_in_sched
 rust_num_threads
@@ -48,6 +49,7 @@ rust_port_size
 rust_process_wait
 rust_ptr_eq
 rust_run_program
+rust_sched_threads
 rust_set_exit_status
 rust_start
 rust_getcwd
@@ -58,7 +60,6 @@ rust_get_task
 rust_get_stack_segment
 rust_task_weaken
 rust_task_unweaken
-sched_threads
 shape_log_str
 start_task
 vec_reserve_shared_actual
diff --git a/src/test/run-pass/morestack6.rs b/src/test/run-pass/morestack6.rs
index 3036d4c201f..b10c18ffd3e 100644
--- a/src/test/run-pass/morestack6.rs
+++ b/src/test/run-pass/morestack6.rs
@@ -8,7 +8,7 @@ extern mod rustrt {
     fn last_os_error() -> ~str;
     fn rust_getcwd() -> ~str;
     fn get_task_id() -> libc::intptr_t;
-    fn sched_threads();
+    fn rust_max_sched_threads();
     fn rust_get_task();
 }
 
@@ -16,7 +16,7 @@ fn calllink01() { rustrt::rust_get_sched_id(); }
 fn calllink02() { rustrt::last_os_error(); }
 fn calllink03() { rustrt::rust_getcwd(); }
 fn calllink08() { rustrt::get_task_id(); }
-fn calllink09() { rustrt::sched_threads(); }
+fn calllink09() { rustrt::rust_max_sched_threads(); }
 fn calllink10() { rustrt::rust_get_task(); }
 
 fn runtest(f: fn~(), frame_backoff: u32) {