about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Holk <eric.holk@gmail.com>2012-07-10 11:58:43 -0700
committerEric Holk <eric.holk@gmail.com>2012-07-10 22:00:48 -0700
commit22e955a76add6b5a3afce936b852f476b5d6ca64 (patch)
treee8c3984ab7482efc61e5f34bb6221e14a35c6fae
parent594d9a055497313a098469a597ac348e47911555 (diff)
downloadrust-22e955a76add6b5a3afce936b852f476b5d6ca64.tar.gz
rust-22e955a76add6b5a3afce936b852f476b5d6ca64.zip
Move streams into core.
-rw-r--r--src/libcore/pipes.rs67
-rw-r--r--src/test/bench/shootout-k-nucleotide-pipes.rs47
2 files changed, 71 insertions, 43 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 94c99361ef8..794f2d3890b 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -2,6 +2,7 @@
 
 import unsafe::{forget, reinterpret_cast, transmute};
 import either::{either, left, right};
+import option::unwrap;
 
 enum state {
     empty,
@@ -428,3 +429,69 @@ fn spawn_service_recv<T: send>(
 
     client
 }
+
+// Streams - Make pipes a little easier in general.
+
+proto! streamp {
+    open:send<T: send> {
+        data(T) -> open<T>
+    }
+}
+
+type chan<T:send> = { mut endp: option<streamp::client::open<T>> };
+type port<T:send> = { mut endp: option<streamp::server::open<T>> };
+
+fn stream<T:send>() -> (chan<T>, port<T>) {
+    let (c, s) = streamp::init();
+
+    #macro[
+        [#move[x],
+         unsafe { let y <- *ptr::addr_of(x); y }]
+    ];
+
+    ({ mut endp: some(c) }, { mut endp: some(s) })
+}
+
+impl chan<T: send> for chan<T> {
+    fn send(+x: T) {
+        let mut endp = none;
+        endp <-> self.endp;
+        self.endp = some(
+            streamp::client::data(unwrap(endp), x))
+    }
+}
+
+impl port<T: send> for port<T> {
+    fn recv() -> T {
+        let mut endp = none;
+        endp <-> self.endp;
+        let streamp::data(x, endp) = pipes::recv(unwrap(endp));
+        self.endp = some(endp);
+        x
+    }
+
+    fn try_recv() -> option<T> {
+        let mut endp = none;
+        endp <-> self.endp;
+        alt pipes::try_recv(unwrap(endp)) {
+          some(streamp::data(x, endp)) {
+            self.endp = some(#move(endp));
+            some(#move(x))
+          }
+          none { none }
+        }
+    }
+
+    pure fn peek() -> bool unchecked {
+        let mut endp = none;
+        endp <-> self.endp;
+        let peek = alt endp {
+          some(endp) {
+            pipes::peek(endp)
+          }
+          none { fail "peeking empty stream" }
+        };
+        self.endp <-> endp;
+        peek
+    }
+}
diff --git a/src/test/bench/shootout-k-nucleotide-pipes.rs b/src/test/bench/shootout-k-nucleotide-pipes.rs
index 488af7a9b23..3704bb3d780 100644
--- a/src/test/bench/shootout-k-nucleotide-pipes.rs
+++ b/src/test/bench/shootout-k-nucleotide-pipes.rs
@@ -9,46 +9,7 @@ import std::map;
 import std::map::hashmap;
 import std::sort;
 
-import stream::{stream, chan, port};
-
-// After a snapshot, this should move into core, or std.
-mod stream {
-    import option::unwrap;
-
-    proto! streamp {
-        open:send<T: send> {
-            data(T) -> open<T>
-        }
-    }
-
-    type chan<T:send> = { mut endp: option<streamp::client::open<T>> };
-    type port<T:send> = { mut endp: option<streamp::server::open<T>> };
-
-    fn stream<T:send>() -> (chan<T>, port<T>) {
-        let (c, s) = streamp::init();
-        ({ mut endp: some(c) }, { mut endp: some(s) })
-    }
-
-    impl chan<T: send> for chan<T> {
-        fn send(+x: T) {
-            let mut endp = none;
-            endp <-> self.endp;
-            self.endp = some(
-                streamp::client::data(unwrap(endp), x))
-        }
-    }
-
-    impl port<T: send> for port<T> {
-        fn recv() -> T {
-            let mut endp = none;
-            endp <-> self.endp;
-            let streamp::data(x, endp) = unwrap(
-                pipes::try_recv(unwrap(endp)));
-            self.endp = some(endp);
-            x
-        }
-    }
-}
+import pipes::{stream, port, chan};
 
 // given a map, print a sorted version of it
 fn sort_and_fmt(mm: hashmap<~[u8], uint>, total: uint) -> str { 
@@ -127,8 +88,8 @@ fn windows_with_carry(bb: ~[const u8], nn: uint,
    ret vec::slice(bb, len - (nn - 1u), len); 
 }
 
-fn make_sequence_processor(sz: uint, from_parent: stream::port<~[u8]>,
-                           to_parent: stream::chan<str>) {
+fn make_sequence_processor(sz: uint, from_parent: pipes::port<~[u8]>,
+                           to_parent: pipes::chan<str>) {
    
    let freqs: hashmap<~[u8], uint> = map::bytes_hash();
    let mut carry: ~[u8] = ~[];
@@ -190,7 +151,7 @@ fn main(args: ~[str]) {
 
         vec::push(from_child, from_child_);
 
-        let (to_child, from_parent) = stream::stream();
+        let (to_child, from_parent) = pipes::stream();
 
         do task::spawn_with(from_parent) |from_parent| {
             make_sequence_processor(sz, from_parent, to_parent_);