diff options
Diffstat (limited to 'src/libstd/task/spawn.rs')
| -rw-r--r-- | src/libstd/task/spawn.rs | 88 |
1 files changed, 80 insertions, 8 deletions
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 88f214ef4c0..e6f115958fd 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -78,13 +78,13 @@ use cast::transmute; use cast; use cell::Cell; use container::MutableMap; -use comm::{Chan, GenericChan}; +use comm::{Chan, GenericChan, oneshot}; use hashmap::{HashSet, HashSetConsumeIterator}; use local_data; use task::local_data_priv::{local_get, local_set, OldHandle}; use task::rt::rust_task; use task::rt; -use task::{Failure}; +use task::{Failure, SingleThreaded}; use task::{Success, TaskOpts, TaskResult}; use task::unkillable; use to_bytes::IterBytes; @@ -93,9 +93,11 @@ use util; use unstable::sync::Exclusive; use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context}; use rt::local::Local; -use rt::task::Task; +use rt::task::{Task, Sched}; use rt::kill::KillHandle; use rt::sched::Scheduler; +use rt::uv::uvio::UvEventLoop; +use rt::thread::Thread; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; @@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = if opts.watched { - Task::build_child(child_wrapper) - } else { - // An unwatched task is a new root in the exit-code propagation tree - Task::build_root(child_wrapper) + let mut task = unsafe { + if opts.sched.mode != SingleThreaded { + if opts.watched { + Task::build_child(child_wrapper) + } else { + Task::build_root(child_wrapper) + } + } else { + // Creating a 1:1 task:thread ... + let sched = Local::unsafe_borrow::<Scheduler>(); + let sched_handle = (*sched).make_handle(); + + // Create a new scheduler to hold the new task + let new_loop = ~UvEventLoop::new(); + let mut new_sched = ~Scheduler::new_special(new_loop, + (*sched).work_queue.clone(), + (*sched).sleeper_list.clone(), + false, + Some(sched_handle)); + let mut new_sched_handle = new_sched.make_handle(); + + // Allow the scheduler to exit when the pinned task exits + new_sched_handle.send(Shutdown); + + // Pin the new task to the new scheduler + let new_task = if opts.watched { + Task::build_homed_child(child_wrapper, Sched(new_sched_handle)) + } else { + Task::build_homed_root(child_wrapper, Sched(new_sched_handle)) + }; + + // Create a task that will later be used to join with the new scheduler + // thread when it is ready to terminate + let (thread_port, thread_chan) = oneshot(); + let thread_port_cell = Cell::new(thread_port); + let join_task = do Task::build_child() { + rtdebug!("running join task"); + let thread_port = thread_port_cell.take(); + let thread: Thread = thread_port.recv(); + thread.join(); + }; + + // Put the scheduler into another thread + let new_sched_cell = Cell::new(new_sched); + let orig_sched_handle_cell = Cell::new((*sched).make_handle()); + let join_task_cell = Cell::new(join_task); + + let thread = do Thread::start { + let mut new_sched = new_sched_cell.take(); + let mut orig_sched_handle = orig_sched_handle_cell.take(); + let join_task = join_task_cell.take(); + + let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || { + rtdebug!("boostraping a 1:1 scheduler"); + }; + new_sched.bootstrap(bootstrap_task); + + rtdebug!("enqueing join_task"); + // Now tell the original scheduler to join with this thread + // by scheduling a thread-joining task on the original scheduler + orig_sched_handle.send(TaskFromFriend(join_task)); + + // NB: We can't simply send a message from here to another task + // because this code isn't running in a task and message passing doesn't + // work outside of tasks. Hence we're sending a scheduler message + // to execute a new task directly to a scheduler. + }; + + // Give the thread handle to the join task + thread_chan.send(thread); + + // When this task is enqueued on the current scheduler it will then get + // forwarded to the scheduler to which it is pinned + new_task + } }; if opts.notify_chan.is_some() { |
