diff options
Diffstat (limited to 'src/libcore/task/mod.rs')
| -rw-r--r-- | src/libcore/task/mod.rs | 258 |
1 files changed, 108 insertions, 150 deletions
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index a4d99bf5db4..aa82309c78a 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -43,16 +43,15 @@ use cmp; use cmp::Eq; use iter; use libc; -use oldcomm; use option; use result::Result; -use pipes::{stream, Chan, Port}; +use pipes::{stream, Chan, Port, SharedChan}; use pipes; use prelude::*; use ptr; use result; use task::local_data_priv::{local_get, local_set}; -use task::rt::{task_id, rust_task}; +use task::rt::{task_id, sched_id, rust_task}; use task; use util; use util::replace; @@ -62,6 +61,12 @@ pub mod local_data; pub mod rt; pub mod spawn; +/// A handle to a scheduler +#[deriving_eq] +pub enum Scheduler { + SchedulerHandle(sched_id) +} + /// A handle to a task #[deriving_eq] pub enum Task { @@ -95,7 +100,21 @@ impl TaskResult : Eq { } /// Scheduler modes +#[deriving_eq] pub enum SchedMode { + /// Run task on the default scheduler + DefaultScheduler, + /// Run task on the current scheduler + CurrentScheduler, + /// Run task on a specific scheduler + ExistingScheduler(Scheduler), + /** + * Tasks are scheduled on the main OS thread + * + * The main OS thread is the thread used to launch the runtime which, + * in most cases, is the process's initial thread as created by the OS. + */ + PlatformThread, /// All tasks run in the same OS thread SingleThreaded, /// Tasks are distributed among available CPUs @@ -104,53 +123,6 @@ pub enum SchedMode { ThreadPerTask, /// Tasks are distributed among a fixed number of OS threads ManualThreads(uint), - /** - * Tasks are scheduled on the main OS thread - * - * The main OS thread is the thread used to launch the runtime which, - * in most cases, is the process's initial thread as created by the OS. - */ - PlatformThread -} - -impl SchedMode : cmp::Eq { - pure fn eq(&self, other: &SchedMode) -> bool { - match (*self) { - SingleThreaded => { - match (*other) { - SingleThreaded => true, - _ => false - } - } - ThreadPerCore => { - match (*other) { - ThreadPerCore => true, - _ => false - } - } - ThreadPerTask => { - match (*other) { - ThreadPerTask => true, - _ => false - } - } - ManualThreads(e0a) => { - match (*other) { - ManualThreads(e0b) => e0a == e0b, - _ => false - } - } - PlatformThread => { - match (*other) { - PlatformThread => true, - _ => false - } - } - } - } - pure fn ne(&self, other: &SchedMode) -> bool { - !(*self).eq(other) - } } /** @@ -204,7 +176,7 @@ pub struct TaskOpts { linked: bool, supervised: bool, mut notify_chan: Option<Chan<TaskResult>>, - sched: Option<SchedOpts>, + sched: SchedOpts } /** @@ -369,11 +341,8 @@ impl TaskBuilder { opts: TaskOpts { linked: self.opts.linked, supervised: self.opts.supervised, - notify_chan: notify_chan, - sched: Some(SchedOpts { - mode: mode, - foreign_stack_size: None, - }) + notify_chan: move notify_chan, + sched: SchedOpts { mode: mode, foreign_stack_size: None} }, can_not_copy: None, .. self.consume() @@ -457,18 +426,17 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<T>(); let mut result = None; let fr_task_builder = self.future_result(|+r| { result = Some(move r); }); - do fr_task_builder.spawn |move f| { - oldcomm::send(ch, f()); + do fr_task_builder.spawn |move f, move ch| { + ch.send(f()); } match option::unwrap(move result).recv() { - Success => result::Ok(oldcomm::recv(po)), + Success => result::Ok(po.recv()), Failure => result::Err(()) } } @@ -489,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts { linked: true, supervised: false, notify_chan: None, - sched: None + sched: SchedOpts { + mode: DefaultScheduler, + foreign_stack_size: None + } } } @@ -542,10 +513,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) { pub fn spawn_sched(mode: SchedMode, f: fn~()) { /*! - * Creates a new scheduler and executes a task on it - * - * Tasks subsequently spawned by that task will also execute on - * the new scheduler. When there are no more tasks to execute the + * Creates a new task on a new or existing scheduler + + * When there are no more tasks to execute the * scheduler terminates. * * # Failure @@ -599,6 +569,10 @@ pub fn get_task() -> Task { } } +pub fn get_scheduler() -> Scheduler { + SchedulerHandle(unsafe { rt::rust_get_sched_id() }) +} + /** * Temporarily make the task unkillable * @@ -711,17 +685,18 @@ fn test_cant_dup_task_builder() { #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); + let ch = SharedChan(ch); do spawn_unlinked { + let ch = ch.clone(); do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. for iter::repeat(16) { task::yield(); } - oldcomm::send(ch, ()); // If killed first, grandparent hangs. + ch.send(()); // If killed first, grandparent hangs. } fail; // Shouldn't kill either (grand)parent or (grand)child. } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails @@ -741,8 +716,7 @@ fn test_spawn_unlinked_sup_fail_down() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). @@ -760,7 +734,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails .. b0 }; do b1.spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails @@ -784,11 +758,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Default options are to spawn linked & unsupervised. do spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails @@ -856,27 +829,25 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_run_basic() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); do task().spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] fn test_add_wrapper() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); let b0 = task(); let b1 = do b0.add_wrapper |body| { fn~(move body) { body(); - oldcomm::send(ch, ()); + ch.send(()); } }; do b1.spawn { } - oldcomm::recv(po); + po.recv(); } #[test] @@ -929,52 +900,46 @@ fn test_spawn_sched_no_threads() { #[test] fn test_spawn_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); + let ch = SharedChan(ch); - fn f(i: int, ch: oldcomm::Chan<()>) { - unsafe { - let parent_sched_id = rt::rust_get_sched_id(); + fn f(i: int, ch: SharedChan<()>) { + let parent_sched_id = unsafe { rt::rust_get_sched_id() }; - do spawn_sched(SingleThreaded) { - unsafe { - let child_sched_id = rt::rust_get_sched_id(); - assert parent_sched_id != child_sched_id; - - if (i == 0) { - oldcomm::send(ch, ()); - } else { - f(i - 1, ch); - } - } - }; - } + do spawn_sched(SingleThreaded) { + let child_sched_id = unsafe { rt::rust_get_sched_id() }; + assert parent_sched_id != child_sched_id; + + if (i == 0) { + ch.send(()); + } else { + f(i - 1, ch.clone()); + } + }; } f(10, ch); - oldcomm::recv(po); + po.recv(); } #[test] -fn test_spawn_sched_childs_on_same_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); +fn test_spawn_sched_childs_on_default_sched() { + let (po, ch) = stream(); + + // Assuming tests run on the default scheduler + let default_id = unsafe { rt::rust_get_sched_id() }; do spawn_sched(SingleThreaded) { - unsafe { - let parent_sched_id = rt::rust_get_sched_id(); - do spawn { - unsafe { - let child_sched_id = rt::rust_get_sched_id(); - // This should be on the same scheduler - assert parent_sched_id == child_sched_id; - oldcomm::send(ch, ()); - } - }; - } + let parent_sched_id = unsafe { rt::rust_get_sched_id() }; + do spawn { + let child_sched_id = unsafe { rt::rust_get_sched_id() }; + assert parent_sched_id != child_sched_id; + assert child_sched_id == default_id; + ch.send(()); + }; }; - oldcomm::recv(po); + po.recv(); } #[nolink] @@ -996,10 +961,8 @@ fn test_spawn_sched_blocking() { // without affecting other schedulers for iter::repeat(20u) { - let start_po = oldcomm::Port(); - let start_ch = oldcomm::Chan(&start_po); - let fin_po = oldcomm::Port(); - let fin_ch = oldcomm::Chan(&fin_po); + let (start_po, start_ch) = stream(); + let (fin_po, fin_ch) = stream(); let lock = testrt::rust_dbg_lock_create(); @@ -1007,44 +970,42 @@ fn test_spawn_sched_blocking() { unsafe { testrt::rust_dbg_lock_lock(lock); - oldcomm::send(start_ch, ()); + start_ch.send(()); // Block the scheduler thread testrt::rust_dbg_lock_wait(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::send(fin_ch, ()); + fin_ch.send(()); } }; // Wait until the other task has its lock - oldcomm::recv(start_po); + start_po.recv(); - fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) { + fn pingpong(po: &Port<int>, ch: &Chan<int>) { let mut val = 20; while val > 0 { - val = oldcomm::recv(po); - oldcomm::send(ch, val - 1); + val = po.recv(); + ch.send(val - 1); } } - let setup_po = oldcomm::Port(); - let setup_ch = oldcomm::Chan(&setup_po); - let parent_po = oldcomm::Port(); - let parent_ch = oldcomm::Chan(&parent_po); + let (setup_po, setup_ch) = stream(); + let (parent_po, parent_ch) = stream(); do spawn { - let child_po = oldcomm::Port(); - oldcomm::send(setup_ch, oldcomm::Chan(&child_po)); - pingpong(child_po, parent_ch); + let (child_po, child_ch) = stream(); + setup_ch.send(child_ch); + pingpong(&child_po, &parent_ch); }; - let child_ch = oldcomm::recv(setup_po); - oldcomm::send(child_ch, 20); - pingpong(parent_po, child_ch); + let child_ch = setup_po.recv(); + child_ch.send(20); + pingpong(&parent_po, &child_ch); testrt::rust_dbg_lock_lock(lock); testrt::rust_dbg_lock_signal(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::recv(fin_po); + fin_po.recv(); testrt::rust_dbg_lock_destroy(lock); } } @@ -1052,18 +1013,17 @@ fn test_spawn_sched_blocking() { #[cfg(test)] fn avoid_copying_the_body(spawnfn: fn(v: fn~())) { - let p = oldcomm::Port::<uint>(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream::<uint>(); let x = ~1; let x_in_parent = ptr::addr_of(&(*x)) as uint; do spawnfn |move x| { let x_in_child = ptr::addr_of(&(*x)) as uint; - oldcomm::send(ch, x_in_child); + ch.send(x_in_child); } - let x_in_child = oldcomm::recv(p); + let x_in_child = p.recv(); assert x_in_parent == x_in_child; } @@ -1101,20 +1061,18 @@ fn test_avoid_copying_the_body_unlinked() { #[test] fn test_platform_thread() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do task().sched_mode(PlatformThread).spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] #[should_fail] fn test_unkillable() { - let po = oldcomm::Port(); - let ch = po.chan(); + let (po, ch) = stream(); // We want to do this after failing do spawn_unlinked { @@ -1242,7 +1200,7 @@ fn test_spawn_thread_on_demand() { let (port2, chan2) = pipes::stream(); - do spawn() |move chan2| { + do spawn_sched(CurrentScheduler) |move chan2| { chan2.send(()); } |
