about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-06-01 13:34:05 -0700
committerBrian Anderson <banderson@mozilla.com>2013-06-05 22:07:40 -0700
commit51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9 (patch)
tree62094b6c207804d3cd90d8fc7082180c33d0a856
parent422f663a988370a93a6ae21b92215e49750c2e87 (diff)
downloadrust-51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9.tar.gz
rust-51d257fd9a6c3ce9bd02f9e30d15d91d39a5aee9.zip
core::rt: Add SharedPort
-rw-r--r--src/libstd/rt/comm.rs132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index b97a4df2245..4772a8596bf 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -416,6 +416,61 @@ impl<T> Clone for SharedChan<T> {
     }
 }
 
+pub struct SharedPort<T> {
+    // The next port on which we will receive the next port on which we will receive T
+    priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
+}
+
+impl<T> SharedPort<T> {
+    pub fn new(port: Port<T>) -> SharedPort<T> {
+        // Put the data port into a new link pipe
+        let next_data_port = port.next.take();
+        let (next_link_port, next_link_chan) = oneshot();
+        next_link_chan.send(next_data_port);
+        let next_link = AtomicOption::new(~next_link_port);
+        SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
+    }
+}
+
+impl<T: Owned> GenericPort<T> for SharedPort<T> {
+    fn recv(&self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on a closed channel");
+            }
+        }
+    }
+
+    fn try_recv(&self) -> Option<T> {
+        unsafe {
+            let (next_link_port, next_link_chan) = oneshot();
+            let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
+            let link_port = link_port.unwrap();
+            let data_port = link_port.recv();
+            let (next_data_port, res) = match data_port.try_recv() {
+                Some(StreamPayload { val, next }) => {
+                    (next, Some(val))
+                }
+                None => {
+                    let (next_data_port, _) = oneshot();
+                    (next_data_port, None)
+                }
+            };
+            next_link_chan.send(next_data_port);
+            return res;
+        }
+    }
+}
+
+impl<T> Clone for SharedPort<T> {
+    fn clone(&self) -> SharedPort<T> {
+        SharedPort {
+            next_link: self.next_link.clone()
+        }
+    }
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
@@ -702,5 +757,82 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn shared_port_stress() {
+        do run_in_mt_newsched_task {
+            // XXX: Removing these type annotations causes an ICE
+            let (end_port, end_chan) = stream::<()>();
+            let (port, chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let port = SharedPort::new(port);
+            let total = stress_factor() + 100;
+            for total.times {
+                let end_chan_clone = end_chan.clone();
+                let port_clone = port.clone();
+                do spawntask_random {
+                    port_clone.recv();
+                    end_chan_clone.send(());
+                }
+            }
+
+            for total.times {
+                chan.send(());
+            }
+
+            for total.times {
+                end_port.recv();
+            }
+        }
+    }
+
+    #[test]
+    fn shared_port_close_simple() {
+        do run_in_mt_newsched_task {
+            let (port, chan) = stream::<()>();
+            let port = SharedPort::new(port);
+            { let _chan = chan; }
+            assert!(port.try_recv().is_none());
+        }
+    }
+
+    #[test]
+    fn shared_port_close() {
+        do run_in_mt_newsched_task {
+            let (end_port, end_chan) = stream::<bool>();
+            let (port, chan) = stream::<()>();
+            let end_chan = SharedChan::new(end_chan);
+            let port = SharedPort::new(port);
+            let chan = SharedChan::new(chan);
+            let send_total = 10;
+            let recv_total = 20;
+            do spawntask_random {
+                for send_total.times {
+                    let chan_clone = chan.clone();
+                    do spawntask_random {
+                        chan_clone.send(());
+                    }
+                }
+            }
+            let end_chan_clone = end_chan.clone();
+            do spawntask_random {
+                for recv_total.times {
+                    let port_clone = port.clone();
+                    let end_chan_clone = end_chan_clone.clone();
+                    do spawntask_random {
+                        let recvd = port_clone.try_recv().is_some();
+                        end_chan_clone.send(recvd);
+                    }
+                }
+            }
+
+            let mut recvd = 0;
+            for recv_total.times {
+                recvd += if end_port.recv() { 1 } else { 0 };
+            }
+
+            assert!(recvd == send_total);
+        }
+    }
 }