about summary refs log tree commit diff
path: root/src/libcore/task/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcore/task/mod.rs')
-rw-r--r--src/libcore/task/mod.rs258
1 files changed, 108 insertions, 150 deletions
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs
index a4d99bf5db4..aa82309c78a 100644
--- a/src/libcore/task/mod.rs
+++ b/src/libcore/task/mod.rs
@@ -43,16 +43,15 @@ use cmp;
 use cmp::Eq;
 use iter;
 use libc;
-use oldcomm;
 use option;
 use result::Result;
-use pipes::{stream, Chan, Port};
+use pipes::{stream, Chan, Port, SharedChan};
 use pipes;
 use prelude::*;
 use ptr;
 use result;
 use task::local_data_priv::{local_get, local_set};
-use task::rt::{task_id, rust_task};
+use task::rt::{task_id, sched_id, rust_task};
 use task;
 use util;
 use util::replace;
@@ -62,6 +61,12 @@ pub mod local_data;
 pub mod rt;
 pub mod spawn;
 
+/// A handle to a scheduler
+#[deriving_eq]
+pub enum Scheduler {
+    SchedulerHandle(sched_id)
+}
+
 /// A handle to a task
 #[deriving_eq]
 pub enum Task {
@@ -95,7 +100,21 @@ impl TaskResult : Eq {
 }
 
 /// Scheduler modes
+#[deriving_eq]
 pub enum SchedMode {
+    /// Run task on the default scheduler
+    DefaultScheduler,
+    /// Run task on the current scheduler
+    CurrentScheduler,
+    /// Run task on a specific scheduler
+    ExistingScheduler(Scheduler),
+    /**
+     * Tasks are scheduled on the main OS thread
+     *
+     * The main OS thread is the thread used to launch the runtime which,
+     * in most cases, is the process's initial thread as created by the OS.
+     */
+    PlatformThread,
     /// All tasks run in the same OS thread
     SingleThreaded,
     /// Tasks are distributed among available CPUs
@@ -104,53 +123,6 @@ pub enum SchedMode {
     ThreadPerTask,
     /// Tasks are distributed among a fixed number of OS threads
     ManualThreads(uint),
-    /**
-     * Tasks are scheduled on the main OS thread
-     *
-     * The main OS thread is the thread used to launch the runtime which,
-     * in most cases, is the process's initial thread as created by the OS.
-     */
-    PlatformThread
-}
-
-impl SchedMode : cmp::Eq {
-    pure fn eq(&self, other: &SchedMode) -> bool {
-        match (*self) {
-            SingleThreaded => {
-                match (*other) {
-                    SingleThreaded => true,
-                    _ => false
-                }
-            }
-            ThreadPerCore => {
-                match (*other) {
-                    ThreadPerCore => true,
-                    _ => false
-                }
-            }
-            ThreadPerTask => {
-                match (*other) {
-                    ThreadPerTask => true,
-                    _ => false
-                }
-            }
-            ManualThreads(e0a) => {
-                match (*other) {
-                    ManualThreads(e0b) => e0a == e0b,
-                    _ => false
-                }
-            }
-            PlatformThread => {
-                match (*other) {
-                    PlatformThread => true,
-                    _ => false
-                }
-            }
-        }
-    }
-    pure fn ne(&self, other: &SchedMode) -> bool {
-        !(*self).eq(other)
-    }
 }
 
 /**
@@ -204,7 +176,7 @@ pub struct TaskOpts {
     linked: bool,
     supervised: bool,
     mut notify_chan: Option<Chan<TaskResult>>,
-    sched: Option<SchedOpts>,
+    sched: SchedOpts
 }
 
 /**
@@ -369,11 +341,8 @@ impl TaskBuilder {
             opts: TaskOpts {
                 linked: self.opts.linked,
                 supervised: self.opts.supervised,
-                notify_chan: notify_chan,
-                sched: Some(SchedOpts {
-                    mode: mode,
-                    foreign_stack_size: None,
-                })
+                notify_chan: move notify_chan,
+                sched: SchedOpts { mode: mode, foreign_stack_size: None}
             },
             can_not_copy: None,
             .. self.consume()
@@ -457,18 +426,17 @@ impl TaskBuilder {
      * Fails if a future_result was already set for this task.
      */
     fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> {
-        let po = oldcomm::Port();
-        let ch = oldcomm::Chan(&po);
+        let (po, ch) = stream::<T>();
         let mut result = None;
 
         let fr_task_builder = self.future_result(|+r| {
             result = Some(move r);
         });
-        do fr_task_builder.spawn |move f| {
-            oldcomm::send(ch, f());
+        do fr_task_builder.spawn |move f, move ch| {
+            ch.send(f());
         }
         match option::unwrap(move result).recv() {
-            Success => result::Ok(oldcomm::recv(po)),
+            Success => result::Ok(po.recv()),
             Failure => result::Err(())
         }
     }
@@ -489,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts {
         linked: true,
         supervised: false,
         notify_chan: None,
-        sched: None
+        sched: SchedOpts {
+            mode: DefaultScheduler,
+            foreign_stack_size: None
+        }
     }
 }
 
@@ -542,10 +513,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) {
 
 pub fn spawn_sched(mode: SchedMode, f: fn~()) {
     /*!
-     * Creates a new scheduler and executes a task on it
-     *
-     * Tasks subsequently spawned by that task will also execute on
-     * the new scheduler. When there are no more tasks to execute the
+     * Creates a new task on a new or existing scheduler
+
+     * When there are no more tasks to execute the
      * scheduler terminates.
      *
      * # Failure
@@ -599,6 +569,10 @@ pub fn get_task() -> Task {
     }
 }
 
+pub fn get_scheduler() -> Scheduler {
+    SchedulerHandle(unsafe { rt::rust_get_sched_id() })
+}
+
 /**
  * Temporarily make the task unkillable
  *
@@ -711,17 +685,18 @@ fn test_cant_dup_task_builder() {
 
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream();
+    let ch = SharedChan(ch);
     do spawn_unlinked {
+        let ch = ch.clone();
         do spawn_unlinked {
             // Give middle task a chance to fail-but-not-kill-us.
             for iter::repeat(16) { task::yield(); }
-            oldcomm::send(ch, ()); // If killed first, grandparent hangs.
+            ch.send(()); // If killed first, grandparent hangs.
         }
         fail; // Shouldn't kill either (grand)parent or (grand)child.
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
@@ -741,8 +716,7 @@ fn test_spawn_unlinked_sup_fail_down() {
 
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
-    let po = oldcomm::Port::<()>();
-    let _ch = oldcomm::Chan(&po);
+    let (po, _ch) = stream::<()>();
     // Unidirectional "parenting" shouldn't override bidirectional linked.
     // We have to cheat with opts - the interface doesn't support them because
     // they don't make sense (redundant with task().supervised()).
@@ -760,7 +734,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
         .. b0
     };
     do b1.spawn { fail; }
-    oldcomm::recv(po); // We should get punted awake
+    po.recv(); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
@@ -784,11 +758,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
-    let po = oldcomm::Port::<()>();
-    let _ch = oldcomm::Chan(&po);
+    let (po, _ch) = stream::<()>();
     // Default options are to spawn linked & unsupervised.
     do spawn { fail; }
-    oldcomm::recv(po); // We should get punted awake
+    po.recv(); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
@@ -856,27 +829,25 @@ fn test_spawn_linked_sup_propagate_sibling() {
 
 #[test]
 fn test_run_basic() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
     do task().spawn {
-        oldcomm::send(ch, ());
+        ch.send(());
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
 fn test_add_wrapper() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
     let b0 = task();
     let b1 = do b0.add_wrapper |body| {
         fn~(move body) {
             body();
-            oldcomm::send(ch, ());
+            ch.send(());
         }
     };
     do b1.spawn { }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
@@ -929,52 +900,46 @@ fn test_spawn_sched_no_threads() {
 
 #[test]
 fn test_spawn_sched() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
+    let ch = SharedChan(ch);
 
-    fn f(i: int, ch: oldcomm::Chan<()>) {
-        unsafe {
-            let parent_sched_id = rt::rust_get_sched_id();
+    fn f(i: int, ch: SharedChan<()>) {
+        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
 
-            do spawn_sched(SingleThreaded) {
-                unsafe {
-                    let child_sched_id = rt::rust_get_sched_id();
-                    assert parent_sched_id != child_sched_id;
-
-                    if (i == 0) {
-                        oldcomm::send(ch, ());
-                    } else {
-                        f(i - 1, ch);
-                    }
-                }
-            };
-        }
+        do spawn_sched(SingleThreaded) {
+            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            assert parent_sched_id != child_sched_id;
+
+            if (i == 0) {
+                ch.send(());
+            } else {
+                f(i - 1, ch.clone());
+            }
+        };
 
     }
     f(10, ch);
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
-fn test_spawn_sched_childs_on_same_sched() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+fn test_spawn_sched_childs_on_default_sched() {
+    let (po, ch) = stream();
+
+    // Assuming tests run on the default scheduler
+    let default_id = unsafe { rt::rust_get_sched_id() };
 
     do spawn_sched(SingleThreaded) {
-        unsafe {
-            let parent_sched_id = rt::rust_get_sched_id();
-            do spawn {
-                unsafe {
-                    let child_sched_id = rt::rust_get_sched_id();
-                    // This should be on the same scheduler
-                    assert parent_sched_id == child_sched_id;
-                    oldcomm::send(ch, ());
-                }
-            };
-        }
+        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+        do spawn {
+            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            assert parent_sched_id != child_sched_id;
+            assert child_sched_id == default_id;
+            ch.send(());
+        };
     };
 
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[nolink]
@@ -996,10 +961,8 @@ fn test_spawn_sched_blocking() {
         // without affecting other schedulers
         for iter::repeat(20u) {
 
-            let start_po = oldcomm::Port();
-            let start_ch = oldcomm::Chan(&start_po);
-            let fin_po = oldcomm::Port();
-            let fin_ch = oldcomm::Chan(&fin_po);
+            let (start_po, start_ch) = stream();
+            let (fin_po, fin_ch) = stream();
 
             let lock = testrt::rust_dbg_lock_create();
 
@@ -1007,44 +970,42 @@ fn test_spawn_sched_blocking() {
                 unsafe {
                     testrt::rust_dbg_lock_lock(lock);
 
-                    oldcomm::send(start_ch, ());
+                    start_ch.send(());
 
                     // Block the scheduler thread
                     testrt::rust_dbg_lock_wait(lock);
                     testrt::rust_dbg_lock_unlock(lock);
 
-                    oldcomm::send(fin_ch, ());
+                    fin_ch.send(());
                 }
             };
 
             // Wait until the other task has its lock
-            oldcomm::recv(start_po);
+            start_po.recv();
 
-            fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) {
+            fn pingpong(po: &Port<int>, ch: &Chan<int>) {
                 let mut val = 20;
                 while val > 0 {
-                    val = oldcomm::recv(po);
-                    oldcomm::send(ch, val - 1);
+                    val = po.recv();
+                    ch.send(val - 1);
                 }
             }
 
-            let setup_po = oldcomm::Port();
-            let setup_ch = oldcomm::Chan(&setup_po);
-            let parent_po = oldcomm::Port();
-            let parent_ch = oldcomm::Chan(&parent_po);
+            let (setup_po, setup_ch) = stream();
+            let (parent_po, parent_ch) = stream();
             do spawn {
-                let child_po = oldcomm::Port();
-                oldcomm::send(setup_ch, oldcomm::Chan(&child_po));
-                pingpong(child_po, parent_ch);
+                let (child_po, child_ch) = stream();
+                setup_ch.send(child_ch);
+                pingpong(&child_po, &parent_ch);
             };
 
-            let child_ch = oldcomm::recv(setup_po);
-            oldcomm::send(child_ch, 20);
-            pingpong(parent_po, child_ch);
+            let child_ch = setup_po.recv();
+            child_ch.send(20);
+            pingpong(&parent_po, &child_ch);
             testrt::rust_dbg_lock_lock(lock);
             testrt::rust_dbg_lock_signal(lock);
             testrt::rust_dbg_lock_unlock(lock);
-            oldcomm::recv(fin_po);
+            fin_po.recv();
             testrt::rust_dbg_lock_destroy(lock);
         }
     }
@@ -1052,18 +1013,17 @@ fn test_spawn_sched_blocking() {
 
 #[cfg(test)]
 fn avoid_copying_the_body(spawnfn: fn(v: fn~())) {
-    let p = oldcomm::Port::<uint>();
-    let ch = oldcomm::Chan(&p);
+    let (p, ch) = stream::<uint>();
 
     let x = ~1;
     let x_in_parent = ptr::addr_of(&(*x)) as uint;
 
     do spawnfn |move x| {
         let x_in_child = ptr::addr_of(&(*x)) as uint;
-        oldcomm::send(ch, x_in_child);
+        ch.send(x_in_child);
     }
 
-    let x_in_child = oldcomm::recv(p);
+    let x_in_child = p.recv();
     assert x_in_parent == x_in_child;
 }
 
@@ -1101,20 +1061,18 @@ fn test_avoid_copying_the_body_unlinked() {
 
 #[test]
 fn test_platform_thread() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream();
     do task().sched_mode(PlatformThread).spawn {
-        oldcomm::send(ch, ());
+        ch.send(());
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
 #[ignore(cfg(windows))]
 #[should_fail]
 fn test_unkillable() {
-    let po = oldcomm::Port();
-    let ch = po.chan();
+    let (po, ch) = stream();
 
     // We want to do this after failing
     do spawn_unlinked {
@@ -1242,7 +1200,7 @@ fn test_spawn_thread_on_demand() {
 
             let (port2, chan2) = pipes::stream();
 
-            do spawn() |move chan2| {
+            do spawn_sched(CurrentScheduler) |move chan2| {
                 chan2.send(());
             }