about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Holk <eric.holk@gmail.com>2012-07-03 17:33:20 -0700
committerEric Holk <eric.holk@gmail.com>2012-07-06 10:42:39 -0700
commita787f4001388a394d5219b74113a718d980e4c90 (patch)
tree907807154819ee7859c66d230c54deaf0446d1c4
parent89bdd481e59c5416f75668bc1b4782b57a167333 (diff)
downloadrust-a787f4001388a394d5219b74113a718d980e4c90.tar.gz
rust-a787f4001388a394d5219b74113a718d980e4c90.zip
Select on pipes.
Updating syntax and test cases.
-rw-r--r--src/libcore/pipes.rs142
-rw-r--r--src/libcore/vec.rs43
-rw-r--r--src/test/bench/msgsend-ring-contracts.rs2
-rw-r--r--src/test/run-pass/pipe-manual-2.rs178
-rw-r--r--src/test/run-pass/pipe-manual-3.rs178
-rw-r--r--src/test/run-pass/pipe-select.rs125
6 files changed, 284 insertions, 384 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 88f69555439..f1617c59ac7 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -9,23 +9,29 @@ enum state {
     terminated
 }
 
-type packet<T: send> = {
+type packet_header = {
     mut state: state,
     mut blocked_task: option<*rust_task>,
+};
+
+type packet<T: send> = {
+    header: packet_header,
     mut payload: option<T>
 };
 
 fn packet<T: send>() -> *packet<T> unsafe {
     let p: *packet<T> = unsafe::transmute(~{
-        mut state: empty,
-        mut blocked_task: none::<task::task>,
+        header: {
+            mut state: empty,
+            mut blocked_task: none::<task::task>,
+        },
         mut payload: none::<T>
     });
     p
 }
 
 #[abi = "rust-intrinsic"]
-native mod rusti {
+extern mod rusti {
     fn atomic_xchng(&dst: int, src: int) -> int;
     fn atomic_xchng_acq(&dst: int, src: int) -> int;
     fn atomic_xchng_rel(&dst: int, src: int) -> int;
@@ -33,7 +39,7 @@ native mod rusti {
 
 type rust_task = libc::c_void;
 
-native mod rustrt {
+extern mod rustrt {
     #[rust_stack]
     fn rust_get_task() -> *rust_task;
 
@@ -71,7 +77,7 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
     let p = unsafe { uniquify(p_) };
     assert (*p).payload == none;
     (*p).payload <- some(payload);
-    let old_state = swap_state_rel((*p).state, full);
+    let old_state = swap_state_rel(p.header.state, full);
     alt old_state {
       empty {
         // Yay, fastpath.
@@ -82,9 +88,10 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
       full { fail "duplicate send" }
       blocked {
         #debug("waking up task for %?", p_);
-        alt p.blocked_task {
+        alt p.header.blocked_task {
           some(task) {
-            rustrt::task_signal_event(task, p_ as *libc::c_void);
+            rustrt::task_signal_event(
+                task, ptr::addr_of(p.header) as *libc::c_void);
           }
           none { fail "blocked packet has no task" }
         }
@@ -104,20 +111,20 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
     let p = unsafe { uniquify(p_) };
     let this = rustrt::rust_get_task();
     rustrt::task_clear_event_reject(this);
-    p.blocked_task = some(this);
+    p.header.blocked_task = some(this);
     loop {
-        let old_state = swap_state_acq((*p).state,
+        let old_state = swap_state_acq(p.header.state,
                                        blocked);
         #debug("%?", old_state);
         alt old_state {
           empty {
             #debug("no data available on %?, going to sleep.", p_);
             rustrt::task_wait_event(this);
-            #debug("woke up, p.state = %?", p.state);
-            if p.state == full {
+            #debug("woke up, p.state = %?", p.header.state);
+            if p.header.state == full {
                 let mut payload = none;
                 payload <-> (*p).payload;
-                p.state = terminated;
+                p.header.state = terminated;
                 ret some(option::unwrap(payload))
             }
           }
@@ -125,7 +132,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
           full {
             let mut payload = none;
             payload <-> (*p).payload;
-            p.state = terminated;
+            p.header.state = terminated;
             ret some(option::unwrap(payload))
           }
           terminated {
@@ -138,7 +145,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
 
 fn sender_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { uniquify(p) };
-    alt swap_state_rel((*p).state, terminated) {
+    alt swap_state_rel(p.header.state, terminated) {
       empty | blocked {
         // The receiver will eventually clean up.
         unsafe { forget(p) }
@@ -155,7 +162,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
 
 fn receiver_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { uniquify(p) };
-    alt swap_state_rel((*p).state, terminated) {
+    alt swap_state_rel(p.header.state, terminated) {
       empty {
         // the sender will clean up
         unsafe { forget(p) }
@@ -170,15 +177,106 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
     }
 }
 
+impl private_methods for packet_header {
+    // Returns the old state.
+    fn mark_blocked(this: *rust_task) -> state {
+        self.blocked_task = some(this);
+        swap_state_acq(self.state, blocked)
+    }
+
+    fn unblock() {
+        alt swap_state_acq(self.state, empty) {
+          empty | blocked { }
+          terminated { self.state = terminated; }
+          full { self.state = full; }
+        }
+    }
+}
+
+#[doc = "Returns when one of the packet headers reports data is
+available."]
+fn wait_many(pkts: ~[&a.packet_header]) -> uint {
+    let this = rustrt::rust_get_task();
+
+    rustrt::task_clear_event_reject(this);
+    let mut data_avail = false;
+    let mut ready_packet = pkts.len();
+    for pkts.eachi |i, p| {
+        let old = p.mark_blocked(this);
+        alt old {
+          full | terminated {
+            data_avail = true;
+            ready_packet = i;
+            p.state = old;
+            break;
+          }
+          blocked { fail "blocking on blocked packet" }
+          empty { }
+        }
+    }
+
+    while !data_avail {
+        #debug("sleeping on %? packets", pkts.len());
+        let event = rustrt::task_wait_event(this) as *packet_header;
+        let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event);
+
+        alt pos {
+          some(i) {
+            ready_packet = i;
+            data_avail = true;
+          }
+          none {
+            #debug("ignoring spurious event, %?", event);
+          }
+        }
+    }
+
+    #debug("%?", pkts[ready_packet]);
+
+    for pkts.each |p| { p.unblock() }
+
+    #debug("%?, %?", ready_packet, pkts[ready_packet]);
+
+    assert pkts[ready_packet].state == full
+        || pkts[ready_packet].state == terminated;
+
+    ready_packet
+}
+
+#[doc = "Waits on a set of endpoints. Returns a message, its index,
+ and a list of the remaining endpoints."]
+fn select<T: send>(+endpoints: ~[recv_packet<T>])
+    -> (uint, option<T>, ~[recv_packet<T>])
+{
+    let endpoints = vec::map_consume(
+        endpoints,
+        |p| unsafe { uniquify(p.unwrap()) });
+    let endpoints_r = vec::view(endpoints, 0, endpoints.len());
+    let ready = wait_many(endpoints_r.map_r(|p| &p.header));
+    let mut remaining = ~[];
+    let mut result = none;
+    do vec::consume(endpoints) |i, p| {
+        let p = recv_packet(unsafe { unsafe::transmute(p) });
+        if i == ready {
+            result = recv(p);
+        }
+        else {
+            vec::push(remaining, p);
+        }
+    }
+
+    (ready, result, remaining)
+}
+
 class send_packet<T: send> {
     let mut p: option<*packet<T>>;
     new(p: *packet<T>) {
-        //#error("take send %?", p);
+        //#debug("take send %?", p);
         self.p = some(p);
     }
     drop {
         //if self.p != none {
-        //    #error("drop send %?", option::get(self.p));
+        //    #debug("drop send %?", option::get(self.p));
         //}
         if self.p != none {
             let mut p = none;
@@ -196,12 +294,12 @@ class send_packet<T: send> {
 class recv_packet<T: send> {
     let mut p: option<*packet<T>>;
     new(p: *packet<T>) {
-        //#error("take recv %?", p);
+        //#debug("take recv %?", p);
         self.p = some(p);
     }
     drop {
         //if self.p != none {
-        //    #error("drop recv %?", option::get(self.p));
+        //    #debug("drop recv %?", option::get(self.p));
         //}
         if self.p != none {
             let mut p = none;
@@ -222,7 +320,7 @@ fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
 }
 
 fn spawn_service<T: send>(
-    init: native fn() -> (send_packet<T>, recv_packet<T>),
+    init: extern fn() -> (send_packet<T>, recv_packet<T>),
     +service: fn~(+recv_packet<T>))
     -> send_packet<T>
 {
@@ -241,7 +339,7 @@ fn spawn_service<T: send>(
 }
 
 fn spawn_service_recv<T: send>(
-    init: native fn() -> (recv_packet<T>, send_packet<T>),
+    init: extern fn() -> (recv_packet<T>, send_packet<T>),
     +service: fn~(+send_packet<T>))
     -> recv_packet<T>
 {
diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs
index b3b9d089fea..eb3d52edc24 100644
--- a/src/libcore/vec.rs
+++ b/src/libcore/vec.rs
@@ -6,6 +6,7 @@ import libc::size_t;
 
 export append;
 export append_one;
+export consume;
 export init_op;
 export is_empty;
 export is_not_empty;
@@ -40,6 +41,7 @@ export grow_set;
 export map;
 export mapi;
 export map2;
+export map_consume;
 export flat_map;
 export filter_map;
 export filter;
@@ -261,8 +263,8 @@ pure fn slice<T: copy>(v: &[const T], start: uint, end: uint) -> ~[T] {
     ret result;
 }
 
-/// Return a slice that points into another slice.
-pure fn view<T: copy>(v: &[const T], start: uint, end: uint) -> &a.[T] {
+#[doc = "Return a slice that points into another slice."]
+pure fn view<T>(v: &[const T], start: uint, end: uint) -> &a.[T] {
     assert (start <= end);
     assert (end <= len(v));
     do unpack_slice(v) |p, _len| {
@@ -373,7 +375,7 @@ fn rsplitn<T: copy>(v: &[T], n: uint, f: fn(T) -> bool) -> ~[~[T]] {
 /// Removes the first element from a vector and return it
 fn shift<T>(&v: ~[T]) -> T {
     let ln = len::<T>(v);
-    assert (ln > 0u);
+    assert (ln > 0);
 
     let mut vv = ~[];
     v <-> vv;
@@ -384,12 +386,12 @@ fn shift<T>(&v: ~[T]) -> T {
             let vv = unsafe::to_ptr(vv);
             rr <- *vv;
 
-            for uint::range(1u, ln) |i| {
+            for uint::range(1, ln) |i| {
                 let r <- *ptr::offset(vv, i);
                 push(v, r);
             }
         }
-        unsafe::set_len(vv, 0u);
+        unsafe::set_len(vv, 0);
 
         rr
     }
@@ -404,6 +406,17 @@ fn unshift<T>(&v: ~[T], +x: T) {
     }
 }
 
+fn consume<T>(+v: ~[T], f: fn(uint, +T)) unsafe {
+    do unpack_slice(v) |p, ln| {
+        for uint::range(0, ln) |i| {
+            let x <- *ptr::offset(p, i);
+            f(i, x);
+        }
+    }
+
+    unsafe::set_len(v, 0);
+}
+
 /// Remove the last element from a vector and return it
 fn pop<T>(&v: ~[const T]) -> T {
     let ln = len(v);
@@ -575,6 +588,14 @@ pure fn map<T, U>(v: &[T], f: fn(T) -> U) -> ~[U] {
     ret result;
 }
 
+fn map_consume<T, U>(+v: ~[T], f: fn(+T) -> U) -> ~[U] {
+    let mut result = ~[];
+    do consume(v) |_i, x| {
+        vec::push(result, f(x));
+    }
+    result
+}
+
 /// Apply a function to each element of a vector and return the results
 pure fn mapi<T, U>(v: &[T], f: fn(uint, T) -> U) -> ~[U] {
     let mut result = ~[];
@@ -1277,6 +1298,18 @@ impl extensions/&<T> for &[T] {
     pure fn mapi<U>(f: fn(uint, T) -> U) -> ~[U] {
         mapi(self, f)
     }
+
+    #[inline]
+    fn map_r<U>(f: fn(x: &self.T) -> U) -> ~[U] {
+        let mut r = ~[];
+        let mut i = 0;
+        while i < self.len() {
+            push(r, f(&self[i]));
+            i += 1;
+        }
+        r
+    }
+
     /**
      * Returns true if the function returns true for all elements.
      *
diff --git a/src/test/bench/msgsend-ring-contracts.rs b/src/test/bench/msgsend-ring-contracts.rs
index 99265353fe5..9ad3025d33a 100644
--- a/src/test/bench/msgsend-ring-contracts.rs
+++ b/src/test/bench/msgsend-ring-contracts.rs
@@ -119,7 +119,7 @@ fn main(args: [str]/~) {
     thread_ring(0u, msg_per_task, option::unwrap(num_chan), num_port);
 
     // synchronize
-    for futures.each |f| { f.get() };
+    for futures.each |f| { future::get(f) };
 
     let stop = time::precise_time_s();
 
diff --git a/src/test/run-pass/pipe-manual-2.rs b/src/test/run-pass/pipe-manual-2.rs
index 0619a7b6b44..0803bbfe531 100644
--- a/src/test/run-pass/pipe-manual-2.rs
+++ b/src/test/run-pass/pipe-manual-2.rs
@@ -13,184 +13,6 @@ At some point, we'll need to add support for select.
 
 */
 
-// Hopefully someday we'll move this into core.
-mod pipes {
-    import unsafe::{forget, reinterpret_cast};
-
-    enum state {
-        empty,
-        full,
-        blocked,
-        terminated
-    }
-
-    type packet<T: send> = {
-        mut state: state,
-        mut blocked_task: option<task::task>,
-        mut payload: option<T>
-    };
-
-    fn packet<T: send>() -> *packet<T> unsafe {
-        let p: *packet<T> = unsafe::transmute(~{
-            mut state: empty,
-            mut blocked_task: none::<task::task>,
-            mut payload: none::<T>
-        });
-        p
-    }
-
-    #[abi = "rust-intrinsic"]
-    native mod rusti {
-        fn atomic_xchng(&dst: int, src: int) -> int;
-        fn atomic_xchng_acq(&dst: int, src: int) -> int;
-        fn atomic_xchng_rel(&dst: int, src: int) -> int;
-    }
-
-    // We should consider moving this to core::unsafe, although I
-    // suspect graydon would want us to use void pointers instead.
-    unsafe fn uniquify<T>(x: *T) -> ~T {
-        unsafe { unsafe::reinterpret_cast(x) }
-    }
-
-    fn swap_state_acq(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_acq(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn swap_state_rel(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_rel(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn send<T: send>(-p: send_packet<T>, -payload: T) {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        assert (*p).payload == none;
-        (*p).payload <- some(payload);
-        let old_state = swap_state_rel((*p).state, full);
-        alt old_state {
-          empty {
-            // Yay, fastpath.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          full { fail "duplicate send" }
-          blocked {
-            // FIXME: once the target will actually block, tell the
-            // scheduler to wake it up.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          terminated {
-            // The receiver will never receive this. Rely on drop_glue
-            // to clean everything up.
-          }
-        }
-    }
-
-    fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        loop {
-            let old_state = swap_state_acq((*p).state,
-                                           blocked);
-            alt old_state {
-              empty | blocked { task::yield(); }
-              full {
-                let mut payload = none;
-                payload <-> (*p).payload;
-                ret some(option::unwrap(payload))
-              }
-              terminated {
-                assert old_state == terminated;
-                ret none;
-              }
-            }
-        }
-    }
-
-    fn sender_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty | blocked {
-            // The receiver will eventually clean up.
-            unsafe { forget(p) }
-          }
-          full {
-            // This is impossible
-            fail "you dun goofed"
-          }
-          terminated {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    fn receiver_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty {
-            // the sender will clean up
-            unsafe { forget(p) }
-          }
-          blocked {
-            // this shouldn't happen.
-            fail "terminating a blocked packet"
-          }
-          terminated | full {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    class send_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) { self.p = some(p); }
-        drop {
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                sender_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    class recv_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) { self.p = some(p); }
-        drop {
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                receiver_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
-        let p = packet();
-        (send_packet(p), recv_packet(p))
-    }
-}
-
 mod pingpong {
     enum ping = *pipes::packet<pong>;
     enum pong = *pipes::packet<ping>;
diff --git a/src/test/run-pass/pipe-manual-3.rs b/src/test/run-pass/pipe-manual-3.rs
index 905063ea713..9f05038aadc 100644
--- a/src/test/run-pass/pipe-manual-3.rs
+++ b/src/test/run-pass/pipe-manual-3.rs
@@ -15,184 +15,6 @@ This file does horrible things to pretend we have self-move.
 
 */
 
-// Hopefully someday we'll move this into core.
-mod pipes {
-    import unsafe::{forget, reinterpret_cast};
-
-    enum state {
-        empty,
-        full,
-        blocked,
-        terminated
-    }
-
-    type packet<T: send> = {
-        mut state: state,
-        mut blocked_task: option<task::task>,
-        mut payload: option<T>
-    };
-
-    fn packet<T: send>() -> *packet<T> unsafe {
-        let p: *packet<T> = unsafe::transmute(~{
-            mut state: empty,
-            mut blocked_task: none::<task::task>,
-            mut payload: none::<T>
-        });
-        p
-    }
-
-    #[abi = "rust-intrinsic"]
-    native mod rusti {
-        fn atomic_xchng(&dst: int, src: int) -> int;
-        fn atomic_xchng_acq(&dst: int, src: int) -> int;
-        fn atomic_xchng_rel(&dst: int, src: int) -> int;
-    }
-
-    // We should consider moving this to core::unsafe, although I
-    // suspect graydon would want us to use void pointers instead.
-    unsafe fn uniquify<T>(x: *T) -> ~T {
-        unsafe { unsafe::reinterpret_cast(x) }
-    }
-
-    fn swap_state_acq(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_acq(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn swap_state_rel(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_rel(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn send<T: send>(-p: send_packet<T>, -payload: T) {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        assert (*p).payload == none;
-        (*p).payload <- some(payload);
-        let old_state = swap_state_rel((*p).state, full);
-        alt old_state {
-          empty {
-            // Yay, fastpath.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          full { fail "duplicate send" }
-          blocked {
-            // FIXME: once the target will actually block, tell the
-            // scheduler to wake it up.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          terminated {
-            // The receiver will never receive this. Rely on drop_glue
-            // to clean everything up.
-          }
-        }
-    }
-
-    fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        loop {
-            let old_state = swap_state_acq((*p).state,
-                                           blocked);
-            alt old_state {
-              empty | blocked { task::yield(); }
-              full {
-                let mut payload = none;
-                payload <-> (*p).payload;
-                ret some(option::unwrap(payload))
-              }
-              terminated {
-                assert old_state == terminated;
-                ret none;
-              }
-            }
-        }
-    }
-
-    fn sender_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty | blocked {
-            // The receiver will eventually clean up.
-            unsafe { forget(p) }
-          }
-          full {
-            // This is impossible
-            fail "you dun goofed"
-          }
-          terminated {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    fn receiver_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty {
-            // the sender will clean up
-            unsafe { forget(p) }
-          }
-          blocked {
-            // this shouldn't happen.
-            fail "terminating a blocked packet"
-          }
-          terminated | full {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    class send_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) { self.p = some(p); }
-        drop {
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                sender_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    class recv_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) { self.p = some(p); }
-        drop {
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                receiver_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
-        let p = packet();
-        (send_packet(p), recv_packet(p))
-    }
-}
-
 mod pingpong {
     enum ping { ping, }
     enum ping_message = *pipes::packet<pong_message>;
diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs
new file mode 100644
index 00000000000..163cfa95488
--- /dev/null
+++ b/src/test/run-pass/pipe-select.rs
@@ -0,0 +1,125 @@
+use std;
+import std::timer::sleep;
+import std::uv;
+
+import pipes::{recv, select};
+
+// Compiled by pipec
+mod oneshot {
+    fn init() -> (client::waiting, server::waiting) { pipes::entangle() }
+    enum waiting { signal(server::signaled), }
+    enum signaled { }
+    mod client {
+        fn signal(-pipe: waiting) -> signaled {
+            let (c, s) = pipes::entangle();
+            let message = oneshot::signal(s);
+            pipes::send(pipe, message);
+            c
+        }
+        type waiting = pipes::send_packet<oneshot::waiting>;
+        type signaled = pipes::send_packet<oneshot::signaled>;
+    }
+    mod server {
+        impl recv for waiting {
+            fn recv() -> extern fn(-waiting) -> oneshot::waiting {
+                fn recv(-pipe: waiting) -> oneshot::waiting {
+                    option::unwrap(pipes::recv(pipe))
+                }
+                recv
+            }
+        }
+        type waiting = pipes::recv_packet<oneshot::waiting>;
+        impl recv for signaled {
+            fn recv() -> extern fn(-signaled) -> oneshot::signaled {
+                fn recv(-pipe: signaled) -> oneshot::signaled {
+                    option::unwrap(pipes::recv(pipe))
+                }
+                recv
+            }
+        }
+        type signaled = pipes::recv_packet<oneshot::signaled>;
+    }
+}
+
+mod stream {
+    fn init<T: send>() -> (client::stream<T>, server::stream<T>) {
+        pipes::entangle()
+    }
+    enum stream<T: send> { send(T, server::stream<T>), }
+    mod client {
+        fn send<T: send>(+pipe: stream<T>, +x_0: T) -> stream<T> {
+            {
+                let (c, s) = pipes::entangle();
+                let message = stream::send(x_0, s);
+                pipes::send(pipe, message);
+                c
+            }
+        }
+        type stream<T: send> = pipes::send_packet<stream::stream<T>>;
+    }
+    mod server {
+        impl recv<T: send> for stream<T> {
+            fn recv() -> extern fn(+stream<T>) -> stream::stream<T> {
+                fn recv<T: send>(+pipe: stream<T>) -> stream::stream<T> {
+                    option::unwrap(pipes::recv(pipe))
+                }
+                recv
+            }
+        }
+        type stream<T: send> = pipes::recv_packet<stream::stream<T>>;
+    }
+}
+
+fn main() {
+    import oneshot::client::*;
+    import stream::client::*;
+
+    let iotask = uv::global_loop::get();
+    
+    #macro[
+        [#recv[chan],
+         chan.recv()(chan)]
+    ];
+
+    let c = pipes::spawn_service(stream::init, |p| { 
+        #error("waiting for pipes");
+        let stream::send(x, p) = option::unwrap(recv(p));
+        #error("got pipes");
+        let (left, right) : (oneshot::server::waiting,
+                             oneshot::server::waiting)
+            = x;
+        #error("selecting");
+        let (i, _, _) = select(~[left, right]);
+        #error("selected");
+        assert i == 0;
+
+        #error("waiting for pipes");
+        let stream::send(x, _) = option::unwrap(recv(p));
+        #error("got pipes");
+        let (left, right) : (oneshot::server::waiting,
+                             oneshot::server::waiting)
+            = x;
+        #error("selecting");
+        let (i, _, _) = select(~[left, right]);
+        #error("selected");
+        assert i == 1;
+    });
+
+    let (c1, p1) = oneshot::init();
+    let (c2, p2) = oneshot::init();
+
+    let c = send(c, (p1, p2));
+    
+    sleep(iotask, 1000);
+
+    signal(c1);
+
+    let (c1, p1) = oneshot::init();
+    let (c2, p2) = oneshot::init();
+
+    send(c, (p1, p2));
+
+    sleep(iotask, 1000);
+
+    signal(c2);
+}
\ No newline at end of file