diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-06-01 14:03:38 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-06-05 22:07:40 -0700 |
| commit | ece38b3c7e16be1bedb45e552a127fe75bdb726a (patch) | |
| tree | 1c2f1a333ba70446e93ccc7fef2c2e9879a635fc /src/libstd/rt | |
| parent | 51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9 (diff) | |
| download | rust-ece38b3c7e16be1bedb45e552a127fe75bdb726a.tar.gz rust-ece38b3c7e16be1bedb45e552a127fe75bdb726a.zip | |
core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, lock-free queue
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 4772a8596bf..ef2091f789c 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -471,6 +471,44 @@ impl<T> Clone for SharedPort<T> { } } +// XXX: Need better name +type MegaPipe<T> = (SharedPort<T>, SharedChan<T>); + +pub fn megapipe<T: Owned>() -> MegaPipe<T> { + let (port, chan) = stream(); + (SharedPort::new(port), SharedChan::new(chan)) +} + +impl<T: Owned> GenericChan<T> for MegaPipe<T> { + fn send(&self, val: T) { + match *self { + (_, ref c) => c.send(val) + } + } +} + +impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> { + fn try_send(&self, val: T) -> bool { + match *self { + (_, ref c) => c.try_send(val) + } + } +} + +impl<T: Owned> GenericPort<T> for MegaPipe<T> { + fn recv(&self) -> T { + match *self { + (ref p, _) => p.recv() + } + } + + fn try_recv(&self) -> Option<T> { + match *self { + (ref p, _) => p.try_recv() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -834,5 +872,38 @@ mod test { assert!(recvd == send_total); } } + + #[test] + fn megapipe_stress() { + use rand; + use rand::RngUtil; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let pipe = megapipe(); + let total = stress_factor() + 10; + let mut rng = rand::rng(); + for total.times { + let msgs = rng.gen_uint_range(0, 10); + let pipe_clone = pipe.clone(); + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for msgs.times { + pipe_clone.send(()); + } + for msgs.times { + pipe_clone.recv(); + } + } + + end_chan_clone.send(()); + } + + for total.times { + end_port.recv(); + } + } + } } |
