diff options
| author | Eric Holk <eric.holk@gmail.com> | 2012-07-10 11:58:43 -0700 |
|---|---|---|
| committer | Eric Holk <eric.holk@gmail.com> | 2012-07-10 22:00:48 -0700 |
| commit | 22e955a76add6b5a3afce936b852f476b5d6ca64 (patch) | |
| tree | e8c3984ab7482efc61e5f34bb6221e14a35c6fae | |
| parent | 594d9a055497313a098469a597ac348e47911555 (diff) | |
| download | rust-22e955a76add6b5a3afce936b852f476b5d6ca64.tar.gz rust-22e955a76add6b5a3afce936b852f476b5d6ca64.zip | |
Move streams into core.
| -rw-r--r-- | src/libcore/pipes.rs | 67 | ||||
| -rw-r--r-- | src/test/bench/shootout-k-nucleotide-pipes.rs | 47 |
2 files changed, 71 insertions, 43 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 94c99361ef8..794f2d3890b 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -2,6 +2,7 @@ import unsafe::{forget, reinterpret_cast, transmute}; import either::{either, left, right}; +import option::unwrap; enum state { empty, @@ -428,3 +429,69 @@ fn spawn_service_recv<T: send>( client } + +// Streams - Make pipes a little easier in general. + +proto! streamp { + open:send<T: send> { + data(T) -> open<T> + } +} + +type chan<T:send> = { mut endp: option<streamp::client::open<T>> }; +type port<T:send> = { mut endp: option<streamp::server::open<T>> }; + +fn stream<T:send>() -> (chan<T>, port<T>) { + let (c, s) = streamp::init(); + + #macro[ + [#move[x], + unsafe { let y <- *ptr::addr_of(x); y }] + ]; + + ({ mut endp: some(c) }, { mut endp: some(s) }) +} + +impl chan<T: send> for chan<T> { + fn send(+x: T) { + let mut endp = none; + endp <-> self.endp; + self.endp = some( + streamp::client::data(unwrap(endp), x)) + } +} + +impl port<T: send> for port<T> { + fn recv() -> T { + let mut endp = none; + endp <-> self.endp; + let streamp::data(x, endp) = pipes::recv(unwrap(endp)); + self.endp = some(endp); + x + } + + fn try_recv() -> option<T> { + let mut endp = none; + endp <-> self.endp; + alt pipes::try_recv(unwrap(endp)) { + some(streamp::data(x, endp)) { + self.endp = some(#move(endp)); + some(#move(x)) + } + none { none } + } + } + + pure fn peek() -> bool unchecked { + let mut endp = none; + endp <-> self.endp; + let peek = alt endp { + some(endp) { + pipes::peek(endp) + } + none { fail "peeking empty stream" } + }; + self.endp <-> endp; + peek + } +} diff --git a/src/test/bench/shootout-k-nucleotide-pipes.rs b/src/test/bench/shootout-k-nucleotide-pipes.rs index 488af7a9b23..3704bb3d780 100644 --- a/src/test/bench/shootout-k-nucleotide-pipes.rs +++ b/src/test/bench/shootout-k-nucleotide-pipes.rs @@ -9,46 +9,7 @@ import std::map; import std::map::hashmap; import std::sort; -import stream::{stream, chan, port}; - -// After a snapshot, this should move into core, or std. -mod stream { - import option::unwrap; - - proto! streamp { - open:send<T: send> { - data(T) -> open<T> - } - } - - type chan<T:send> = { mut endp: option<streamp::client::open<T>> }; - type port<T:send> = { mut endp: option<streamp::server::open<T>> }; - - fn stream<T:send>() -> (chan<T>, port<T>) { - let (c, s) = streamp::init(); - ({ mut endp: some(c) }, { mut endp: some(s) }) - } - - impl chan<T: send> for chan<T> { - fn send(+x: T) { - let mut endp = none; - endp <-> self.endp; - self.endp = some( - streamp::client::data(unwrap(endp), x)) - } - } - - impl port<T: send> for port<T> { - fn recv() -> T { - let mut endp = none; - endp <-> self.endp; - let streamp::data(x, endp) = unwrap( - pipes::try_recv(unwrap(endp))); - self.endp = some(endp); - x - } - } -} +import pipes::{stream, port, chan}; // given a map, print a sorted version of it fn sort_and_fmt(mm: hashmap<~[u8], uint>, total: uint) -> str { @@ -127,8 +88,8 @@ fn windows_with_carry(bb: ~[const u8], nn: uint, ret vec::slice(bb, len - (nn - 1u), len); } -fn make_sequence_processor(sz: uint, from_parent: stream::port<~[u8]>, - to_parent: stream::chan<str>) { +fn make_sequence_processor(sz: uint, from_parent: pipes::port<~[u8]>, + to_parent: pipes::chan<str>) { let freqs: hashmap<~[u8], uint> = map::bytes_hash(); let mut carry: ~[u8] = ~[]; @@ -190,7 +151,7 @@ fn main(args: ~[str]) { vec::push(from_child, from_child_); - let (to_child, from_parent) = stream::stream(); + let (to_child, from_parent) = pipes::stream(); do task::spawn_with(from_parent) |from_parent| { make_sequence_processor(sz, from_parent, to_parent_); |
