diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-06-01 13:34:05 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-06-05 22:07:40 -0700 |
| commit | 51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9 (patch) | |
| tree | 62094b6c207804d3cd90d8fc7082180c33d0a856 | |
| parent | 422f663a988370a93a6ae21b92215e49750c2e87 (diff) | |
| download | rust-51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9.tar.gz rust-51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9.zip | |
core::rt: Add SharedPort
| -rw-r--r-- | src/libstd/rt/comm.rs | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index b97a4df2245..4772a8596bf 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -416,6 +416,61 @@ impl<T> Clone for SharedChan<T> { } } +pub struct SharedPort<T> { + // The next port on which we will receive the next port on which we will receive T + priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>> +} + +impl<T> SharedPort<T> { + pub fn new(port: Port<T>) -> SharedPort<T> { + // Put the data port into a new link pipe + let next_data_port = port.next.take(); + let (next_link_port, next_link_chan) = oneshot(); + next_link_chan.send(next_data_port); + let next_link = AtomicOption::new(~next_link_port); + SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) } + } +} + +impl<T: Owned> GenericPort<T> for SharedPort<T> { + fn recv(&self) -> T { + match self.try_recv() { + Some(val) => val, + None => { + fail!("receiving on a closed channel"); + } + } + } + + fn try_recv(&self) -> Option<T> { + unsafe { + let (next_link_port, next_link_chan) = oneshot(); + let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); + let link_port = link_port.unwrap(); + let data_port = link_port.recv(); + let (next_data_port, res) = match data_port.try_recv() { + Some(StreamPayload { val, next }) => { + (next, Some(val)) + } + None => { + let (next_data_port, _) = oneshot(); + (next_data_port, None) + } + }; + next_link_chan.send(next_data_port); + return res; + } + } +} + +impl<T> Clone for SharedPort<T> { + fn clone(&self) -> SharedPort<T> { + SharedPort { + next_link: self.next_link.clone() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -702,5 +757,82 @@ mod test { } } } + + #[test] + fn shared_port_stress() { + do run_in_mt_newsched_task { + // XXX: Removing these type annotations causes an ICE + let (end_port, end_chan) = stream::<()>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let total = stress_factor() + 100; + for total.times { + let end_chan_clone = end_chan.clone(); + let port_clone = port.clone(); + do spawntask_random { + port_clone.recv(); + end_chan_clone.send(()); + } + } + + for total.times { + chan.send(()); + } + + for total.times { + end_port.recv(); + } + } + } + + #[test] + fn shared_port_close_simple() { + do run_in_mt_newsched_task { + let (port, chan) = stream::<()>(); + let port = SharedPort::new(port); + { let _chan = chan; } + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn shared_port_close() { + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<bool>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let chan = SharedChan::new(chan); + let send_total = 10; + let recv_total = 20; + do spawntask_random { + for send_total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + } + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for recv_total.times { + let port_clone = port.clone(); + let end_chan_clone = end_chan_clone.clone(); + do spawntask_random { + let recvd = port_clone.try_recv().is_some(); + end_chan_clone.send(recvd); + } + } + } + + let mut recvd = 0; + for recv_total.times { + recvd += if end_port.recv() { 1 } else { 0 }; + } + + assert!(recvd == send_total); + } + } } |
