about summary refs log tree commit diff
path: root/src/libstd/task
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-07-30 19:02:21 -0700
committerBrian Anderson <banderson@mozilla.com>2013-08-02 00:26:45 -0700
commitc3fa411459cdcc70c5893e44209320762cdc26d1 (patch)
treeb7f034ab9df835a03e8e5776e2299bcc7b09799a /src/libstd/task
parent5890fcf87295d5b7a8f4ffa8d9918f755f72baf8 (diff)
downloadrust-c3fa411459cdcc70c5893e44209320762cdc26d1.tar.gz
rust-c3fa411459cdcc70c5893e44209320762cdc26d1.zip
std: Implement SingleThreaded spawn mode for newsched
Diffstat (limited to 'src/libstd/task')
-rw-r--r--src/libstd/task/mod.rs23
-rw-r--r--src/libstd/task/spawn.rs88
2 files changed, 98 insertions, 13 deletions
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index 7a864ecb867..19acedb56dd 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -971,16 +971,29 @@ fn test_try_fail() {
     }
 }
 
+#[cfg(test)]
+fn get_sched_id() -> int {
+    if context() == OldTaskContext {
+        unsafe {
+            rt::rust_get_sched_id() as int
+        }
+    } else {
+        do Local::borrow::<::rt::sched::Scheduler, int> |sched| {
+            sched.sched_id() as int
+        }
+    }
+}
+
 #[test]
 fn test_spawn_sched() {
     let (po, ch) = stream::<()>();
     let ch = SharedChan::new(ch);
 
     fn f(i: int, ch: SharedChan<()>) {
-        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+        let parent_sched_id = get_sched_id();
 
         do spawn_sched(SingleThreaded) {
-            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            let child_sched_id = get_sched_id();
             assert!(parent_sched_id != child_sched_id);
 
             if (i == 0) {
@@ -1000,15 +1013,15 @@ fn test_spawn_sched_childs_on_default_sched() {
     let (po, ch) = stream();
 
     // Assuming tests run on the default scheduler
-    let default_id = unsafe { rt::rust_get_sched_id() };
+    let default_id = get_sched_id();
 
     let ch = Cell::new(ch);
     do spawn_sched(SingleThreaded) {
-        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+        let parent_sched_id = get_sched_id();
         let ch = Cell::new(ch.take());
         do spawn {
             let ch = ch.take();
-            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            let child_sched_id = get_sched_id();
             assert!(parent_sched_id != child_sched_id);
             assert_eq!(child_sched_id, default_id);
             ch.send(());
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs
index 88f214ef4c0..e6f115958fd 100644
--- a/src/libstd/task/spawn.rs
+++ b/src/libstd/task/spawn.rs
@@ -78,13 +78,13 @@ use cast::transmute;
 use cast;
 use cell::Cell;
 use container::MutableMap;
-use comm::{Chan, GenericChan};
+use comm::{Chan, GenericChan, oneshot};
 use hashmap::{HashSet, HashSetConsumeIterator};
 use local_data;
 use task::local_data_priv::{local_get, local_set, OldHandle};
 use task::rt::rust_task;
 use task::rt;
-use task::{Failure};
+use task::{Failure, SingleThreaded};
 use task::{Success, TaskOpts, TaskResult};
 use task::unkillable;
 use to_bytes::IterBytes;
@@ -93,9 +93,11 @@ use util;
 use unstable::sync::Exclusive;
 use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context};
 use rt::local::Local;
-use rt::task::Task;
+use rt::task::{Task, Sched};
 use rt::kill::KillHandle;
 use rt::sched::Scheduler;
+use rt::uv::uvio::UvEventLoop;
+use rt::thread::Thread;
 
 #[cfg(test)] use task::default_task_opts;
 #[cfg(test)] use comm;
@@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
         }
     };
 
-    let mut task = if opts.watched {
-        Task::build_child(child_wrapper)
-    } else {
-        // An unwatched task is a new root in the exit-code propagation tree
-        Task::build_root(child_wrapper)
+    let mut task = unsafe {
+        if opts.sched.mode != SingleThreaded {
+            if opts.watched {
+                Task::build_child(child_wrapper)
+            } else {
+                Task::build_root(child_wrapper)
+            }
+        } else {
+            // Creating a 1:1 task:thread ...
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            let sched_handle = (*sched).make_handle();
+
+            // Create a new scheduler to hold the new task
+            let new_loop = ~UvEventLoop::new();
+            let mut new_sched = ~Scheduler::new_special(new_loop,
+                                                        (*sched).work_queue.clone(),
+                                                        (*sched).sleeper_list.clone(),
+                                                        false,
+                                                        Some(sched_handle));
+            let mut new_sched_handle = new_sched.make_handle();
+
+            // Allow the scheduler to exit when the pinned task exits
+            new_sched_handle.send(Shutdown);
+
+            // Pin the new task to the new scheduler
+            let new_task = if opts.watched {
+                Task::build_homed_child(child_wrapper, Sched(new_sched_handle))
+            } else {
+                Task::build_homed_root(child_wrapper, Sched(new_sched_handle))
+            };
+
+            // Create a task that will later be used to join with the new scheduler
+            // thread when it is ready to terminate
+            let (thread_port, thread_chan) = oneshot();
+            let thread_port_cell = Cell::new(thread_port);
+            let join_task = do Task::build_child() {
+                rtdebug!("running join task");
+                let thread_port = thread_port_cell.take();
+                let thread: Thread = thread_port.recv();
+                thread.join();
+            };
+
+            // Put the scheduler into another thread
+            let new_sched_cell = Cell::new(new_sched);
+            let orig_sched_handle_cell = Cell::new((*sched).make_handle());
+            let join_task_cell = Cell::new(join_task);
+
+            let thread = do Thread::start {
+                let mut new_sched = new_sched_cell.take();
+                let mut orig_sched_handle = orig_sched_handle_cell.take();
+                let join_task = join_task_cell.take();
+
+                let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || {
+                    rtdebug!("boostraping a 1:1 scheduler");
+                };
+                new_sched.bootstrap(bootstrap_task);
+
+                rtdebug!("enqueing join_task");
+                // Now tell the original scheduler to join with this thread
+                // by scheduling a thread-joining task on the original scheduler
+                orig_sched_handle.send(TaskFromFriend(join_task));
+
+                // NB: We can't simply send a message from here to another task
+                // because this code isn't running in a task and message passing doesn't
+                // work outside of tasks. Hence we're sending a scheduler message
+                // to execute a new task directly to a scheduler.
+            };
+
+            // Give the thread handle to the join task
+            thread_chan.send(thread);
+
+            // When this task is enqueued on the current scheduler it will then get
+            // forwarded to the scheduler to which it is pinned
+            new_task
+        }
     };
 
     if opts.notify_chan.is_some() {