diff options
| author | Eric Holk <eric.holk@gmail.com> | 2012-05-16 09:47:00 -0700 |
|---|---|---|
| committer | Eric Holk <eric.holk@gmail.com> | 2012-05-22 15:31:38 -0700 |
| commit | a785f3fc95752dc2cfced5e7ccef710e189acb9c (patch) | |
| tree | d63ccde882a9ebf3270e91fb1b240bc7344025eb | |
| parent | d485f23a1a1879b6c066ae56a1ccfe1f092785fd (diff) | |
| download | rust-a785f3fc95752dc2cfced5e7ccef710e189acb9c.tar.gz rust-a785f3fc95752dc2cfced5e7ccef710e189acb9c.zip | |
Adding a module with parallel vector operations.
This should go in libstd, but currently resolve bugs make this not work.
| -rw-r--r-- | src/libstd/par.rs | 103 | ||||
| -rw-r--r-- | src/libstd/std.rc | 2 | ||||
| -rw-r--r-- | src/test/bench/graph500-bfs.rs | 116 |
3 files changed, 217 insertions, 4 deletions
diff --git a/src/libstd/par.rs b/src/libstd/par.rs new file mode 100644 index 00000000000..5160e50fd9a --- /dev/null +++ b/src/libstd/par.rs @@ -0,0 +1,103 @@ +import comm::port; +import comm::chan; +import comm::send; +import comm::recv; +import task::spawn; + +export future; +export map; +export alli; + +iface future<T: send> { + fn get() -> T; +} + +type future_<T: send> = { + mut slot : option<T>, + port : port<T>, +}; + +impl<T: send> of future<T> for future_<T> { + fn get() -> T { + alt(self.slot) { + some(x) { x } + none { + let x = recv(self.port); + self.slot = some(x); + x + } + } + } +} + + +#[doc="Executes a bit of code asynchronously. + +Returns a handle that can be used to retrieve the result at your +leisure."] +fn future<T: send>(thunk : fn~() -> T) -> future<T> { + let p = port(); + let c = chan(p); + + spawn() {|| + send(c, thunk()); + } + + {mut slot: none::<T>, port : p} as future::<T> +} + +#[doc="The maximum number of tasks this module will spawn for a single + operationg."] +const max_tasks : uint = 32u; + +#[doc="The minimum number of elements each task will process."] +const min_granularity : uint = 1024u; + +#[doc="An internal helper to map a function over a large vector and + return the intermediate results. + +This is used to build most of the other parallel vector functions, +like map or alli."] +fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] { + let len = xs.len(); + if len < min_granularity { + // This is a small vector, fall back on the normal map. + [f(0u, xs)] + } + else { + let num_tasks = uint::min(max_tasks, len / min_granularity); + + let items_per_task = len / num_tasks; + + let mut futures = []; + let mut base = 0u; + while base < len { + let slice = vec::slice(xs, base, + uint::min(len, base + items_per_task)); + futures += [future() {|copy base| + f(base, slice) + }]; + base += items_per_task; + } + + futures.map() {|ys| + ys.get() + } + } +} + +#[doc="A parallel version of map."] +fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] { + vec::concat(map_slices(xs) {|_base, slice| + map(slice, f) + }) +} + +#[doc="Returns true if the function holds for all elements in the vector."] +fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool { + vec::all(map_slices(xs) {|base, slice| + slice.alli() {|i, x| + f(i + base, x) + } + }) {|x| x } +} diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 40a03ae25c2..f25ceb87c3c 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -19,6 +19,7 @@ export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap; export rope, arena; export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint; export test, tempfile, serialization; +export par; // General io and system-services modules @@ -58,6 +59,7 @@ mod getopts; mod json; mod sha1; mod md4; +mod par; mod tempfile; mod term; mod time; diff --git a/src/test/bench/graph500-bfs.rs b/src/test/bench/graph500-bfs.rs index 3295ab2f07c..d145da651b1 100644 --- a/src/test/bench/graph500-bfs.rs +++ b/src/test/bench/graph500-bfs.rs @@ -10,6 +10,7 @@ import std::map; import std::map::hashmap; import std::deque; import std::deque::t; +//import std::par; import io::writer_util; import comm::*; import int::abs; @@ -221,13 +222,11 @@ fn validate(edges: [(node_id, node_id)], log(info, "Verifying tree and graph edges..."); - let status = tree.alli() {|u, v| + let status = par::alli(tree) {|u, v| if v == -1 || u as int == root { true } else { - log(info, #fmt("Checking for %? or %?", - (u, v), (v, u))); edges.contains((u as int, v)) || edges.contains((v, u as int)) } }; @@ -269,9 +268,118 @@ fn main() { stop - start)); let start = time::precise_time_s(); - assert(validate(graph, edges, root, bfs_tree)); + assert(validate(edges, root, bfs_tree)); let stop = time::precise_time_s(); io::stdout().write_line(#fmt("Validation completed in %? seconds.", stop - start)); } + + +// par stuff ///////////////////////////////////////////////////////// + +mod par { +import comm::port; +import comm::chan; +import comm::send; +import comm::recv; +import task::spawn; + +iface future<T: send> { + fn get() -> T; +} + +type future_<T: send> = { + mut slot : option<T>, + port : port<T>, +}; + +impl<T: send> of future<T> for future_<T> { + fn get() -> T { + get(self) + } +} + +fn get<T: send>(f: future_<T>) -> T { + alt(f.slot) { + some(x) { x } + none { + let x = recv(f.port); + f.slot = some(x); + x + } + } +} + + +#[doc="Executes a bit of code asynchronously. + +Returns a handle that can be used to retrieve the result at your +leisure."] +fn future<T: send>(thunk : fn~() -> T) -> future<T> { + let p = port(); + let c = chan(p); + + spawn() {|| + send(c, thunk()); + } + + {mut slot: none::<T>, port : p} as future::<T> +} + +#[doc="The maximum number of tasks this module will spawn for a single + operationg."] +const max_tasks : uint = 32u; + +#[doc="The minimum number of elements each task will process."] +const min_granularity : uint = 1024u; + +#[doc="An internal helper to map a function over a large vector and + return the intermediate results. + +This is used to build most of the other parallel vector functions, +like map or alli."] +fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] { + let len = xs.len(); + if len < min_granularity { + // This is a small vector, fall back on the normal map. + [f(0u, xs)] + } + else { + let num_tasks = uint::min(max_tasks, len / min_granularity); + + let items_per_task = len / num_tasks; + + let mut futures = []; + let mut base = 0u; + while base < len { + let slice = vec::slice(xs, base, + uint::min(len, base + items_per_task)); + futures += [future() {|copy base| + f(base, slice) + }]; + base += items_per_task; + } + + futures.map() {|ys| + ys.get() + } + } +} + +#[doc="A parallel version of map."] +fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] { + vec::concat(map_slices(xs) {|_base, slice| + map(slice, f) + }) +} + +#[doc="Returns true if the function holds for all elements in the vector."] +fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool { + vec::all(map_slices(xs) {|base, slice| + slice.alli() {|i, x| + f(i + base, x) + } + }) {|x| x } +} +} \ No newline at end of file |
