about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-06-01 14:03:38 -0700
committerBrian Anderson <banderson@mozilla.com>2013-06-05 22:07:40 -0700
commitece38b3c7e16be1bedb45e552a127fe75bdb726a (patch)
tree1c2f1a333ba70446e93ccc7fef2c2e9879a635fc /src/libstd/rt
parent51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9 (diff)
downloadrust-ece38b3c7e16be1bedb45e552a127fe75bdb726a.tar.gz
rust-ece38b3c7e16be1bedb45e552a127fe75bdb726a.zip
core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, lock-free queue
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs71
1 files changed, 71 insertions, 0 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 4772a8596bf..ef2091f789c 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -471,6 +471,44 @@ impl<T> Clone for SharedPort<T> {
     }
 }
 
+// XXX: Need better name
+type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
+
+pub fn megapipe<T: Owned>() -> MegaPipe<T> {
+    let (port, chan) = stream();
+    (SharedPort::new(port), SharedChan::new(chan))
+}
+
+impl<T: Owned> GenericChan<T> for MegaPipe<T> {
+    fn send(&self, val: T) {
+        match *self {
+            (_, ref c) => c.send(val)
+        }
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> {
+    fn try_send(&self, val: T) -> bool {
+        match *self {
+            (_, ref c) => c.try_send(val)
+        }
+    }
+}
+
+impl<T: Owned> GenericPort<T> for MegaPipe<T> {
+    fn recv(&self) -> T {
+        match *self {
+            (ref p, _) => p.recv()
+        }
+    }
+
+    fn try_recv(&self) -> Option<T> {
+        match *self {
+            (ref p, _) => p.try_recv()
+        }
+    }
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
@@ -834,5 +872,38 @@ mod test {
             assert!(recvd == send_total);
         }
     }
+
+    #[test]
+    fn megapipe_stress() {
+        use rand;
+        use rand::RngUtil;
+
+        do run_in_mt_newsched_task {
+            let (end_port, end_chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let pipe = megapipe();
+            let total = stress_factor() + 10;
+            let mut rng = rand::rng();
+            for total.times {
+                let msgs = rng.gen_uint_range(0, 10);
+                let pipe_clone = pipe.clone();
+                let end_chan_clone = end_chan.clone();
+                do spawntask_random {
+                    for msgs.times {
+                        pipe_clone.send(());
+                    }
+                    for msgs.times {
+                        pipe_clone.recv();
+                    }
+                }
+
+                end_chan_clone.send(());
+            }
+
+            for total.times {
+                end_port.recv();
+            }
+        }
+    }
 }