about summary refs log tree commit diff
path: root/src/libstd/task_pool.rs
diff options
context:
space:
mode:
authorTim Chevalier <chevalier@alum.wellesley.edu>2012-11-15 12:32:00 -0800
committerTim Chevalier <chevalier@alum.wellesley.edu>2012-11-15 12:35:14 -0800
commit9b6f025eb607aab8ea29c2033dcebe1d263ba614 (patch)
tree81a79d92c6a0f5a876595069592ed37107dc61e9 /src/libstd/task_pool.rs
parenta0610c952f0d23927443b901241627ae910fd3cd (diff)
downloadrust-9b6f025eb607aab8ea29c2033dcebe1d263ba614.tar.gz
rust-9b6f025eb607aab8ea29c2033dcebe1d263ba614.zip
Rename thread_pool to task_pool
Minor change, no review.

Closes #3972
Diffstat (limited to 'src/libstd/task_pool.rs')
-rw-r--r--src/libstd/task_pool.rs85
1 files changed, 85 insertions, 0 deletions
diff --git a/src/libstd/task_pool.rs b/src/libstd/task_pool.rs
new file mode 100644
index 00000000000..4ed3c16c994
--- /dev/null
+++ b/src/libstd/task_pool.rs
@@ -0,0 +1,85 @@
+/// A task pool abstraction. Useful for achieving predictable CPU
+/// parallelism.
+
+use pipes::{Chan, Port};
+use task::{SchedMode, SingleThreaded};
+
+enum Msg<T> {
+    Execute(~fn(&T)),
+    Quit
+}
+
+pub struct TaskPool<T> {
+    channels: ~[Chan<Msg<T>>],
+    mut next_index: uint,
+
+    drop {
+        for self.channels.each |channel| {
+            channel.send(Quit);
+        }
+    }
+}
+
+pub impl<T> TaskPool<T> {
+    /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode`
+    /// is None, the tasks run on this scheduler; otherwise, they run on a
+    /// new scheduler with the given mode. The provided `init_fn_factory`
+    /// returns a function which, given the index of the task, should return
+    /// local data to be kept around in that task.
+    static fn new(n_tasks: uint,
+                  opt_sched_mode: Option<SchedMode>,
+                  init_fn_factory: ~fn() -> ~fn(uint) -> T) -> TaskPool<T> {
+        assert n_tasks >= 1;
+
+        let channels = do vec::from_fn(n_tasks) |i| {
+            let (chan, port) = pipes::stream::<Msg<T>>();
+            let init_fn = init_fn_factory();
+
+            let task_body: ~fn() = |move port, move init_fn| {
+                let local_data = init_fn(i);
+                loop {
+                    match port.recv() {
+                        Execute(move f) => f(&local_data),
+                        Quit => break
+                    }
+                }
+            };
+
+            // Start the task.
+            match opt_sched_mode {
+                None => {
+                    // Run on this scheduler.
+                    task::spawn(move task_body);
+                }
+                Some(sched_mode) => {
+                    task::task().sched_mode(sched_mode).spawn(move task_body);
+                }
+            }
+
+            move chan
+        };
+
+        return TaskPool { channels: move channels, next_index: 0 };
+    }
+
+    /// Executes the function `f` on a task in the pool. The function
+    /// receives a reference to the local data returned by the `init_fn`.
+    fn execute(&self, f: ~fn(&T)) {
+        self.channels[self.next_index].send(Execute(move f));
+        self.next_index += 1;
+        if self.next_index == self.channels.len() { self.next_index = 0; }
+    }
+}
+
+#[test]
+fn test_task_pool() {
+    let f: ~fn() -> ~fn(uint) -> uint = || {
+        let g: ~fn(uint) -> uint = |i| i;
+        move g
+    };
+    let pool = TaskPool::new(4, Some(SingleThreaded), move f);
+    for 8.times {
+        pool.execute(|i| io::println(fmt!("Hello from thread %u!", *i)));
+    }
+}
+