diff options
| author | Tim Chevalier <chevalier@alum.wellesley.edu> | 2012-11-15 12:32:00 -0800 |
|---|---|---|
| committer | Tim Chevalier <chevalier@alum.wellesley.edu> | 2012-11-15 12:35:14 -0800 |
| commit | 9b6f025eb607aab8ea29c2033dcebe1d263ba614 (patch) | |
| tree | 81a79d92c6a0f5a876595069592ed37107dc61e9 /src/libstd/task_pool.rs | |
| parent | a0610c952f0d23927443b901241627ae910fd3cd (diff) | |
| download | rust-9b6f025eb607aab8ea29c2033dcebe1d263ba614.tar.gz rust-9b6f025eb607aab8ea29c2033dcebe1d263ba614.zip | |
Rename thread_pool to task_pool
Minor change, no review. Closes #3972
Diffstat (limited to 'src/libstd/task_pool.rs')
| -rw-r--r-- | src/libstd/task_pool.rs | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/src/libstd/task_pool.rs b/src/libstd/task_pool.rs new file mode 100644 index 00000000000..4ed3c16c994 --- /dev/null +++ b/src/libstd/task_pool.rs @@ -0,0 +1,85 @@ +/// A task pool abstraction. Useful for achieving predictable CPU +/// parallelism. + +use pipes::{Chan, Port}; +use task::{SchedMode, SingleThreaded}; + +enum Msg<T> { + Execute(~fn(&T)), + Quit +} + +pub struct TaskPool<T> { + channels: ~[Chan<Msg<T>>], + mut next_index: uint, + + drop { + for self.channels.each |channel| { + channel.send(Quit); + } + } +} + +pub impl<T> TaskPool<T> { + /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode` + /// is None, the tasks run on this scheduler; otherwise, they run on a + /// new scheduler with the given mode. The provided `init_fn_factory` + /// returns a function which, given the index of the task, should return + /// local data to be kept around in that task. + static fn new(n_tasks: uint, + opt_sched_mode: Option<SchedMode>, + init_fn_factory: ~fn() -> ~fn(uint) -> T) -> TaskPool<T> { + assert n_tasks >= 1; + + let channels = do vec::from_fn(n_tasks) |i| { + let (chan, port) = pipes::stream::<Msg<T>>(); + let init_fn = init_fn_factory(); + + let task_body: ~fn() = |move port, move init_fn| { + let local_data = init_fn(i); + loop { + match port.recv() { + Execute(move f) => f(&local_data), + Quit => break + } + } + }; + + // Start the task. + match opt_sched_mode { + None => { + // Run on this scheduler. + task::spawn(move task_body); + } + Some(sched_mode) => { + task::task().sched_mode(sched_mode).spawn(move task_body); + } + } + + move chan + }; + + return TaskPool { channels: move channels, next_index: 0 }; + } + + /// Executes the function `f` on a task in the pool. The function + /// receives a reference to the local data returned by the `init_fn`. + fn execute(&self, f: ~fn(&T)) { + self.channels[self.next_index].send(Execute(move f)); + self.next_index += 1; + if self.next_index == self.channels.len() { self.next_index = 0; } + } +} + +#[test] +fn test_task_pool() { + let f: ~fn() -> ~fn(uint) -> uint = || { + let g: ~fn(uint) -> uint = |i| i; + move g + }; + let pool = TaskPool::new(4, Some(SingleThreaded), move f); + for 8.times { + pool.execute(|i| io::println(fmt!("Hello from thread %u!", *i))); + } +} + |
