about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-21 22:15:04 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-01-15 11:21:56 -0800
commitadb895a34f6d0b925b8ef877289ca6e3c4d854d4 (patch)
treefc5a2b0a6930d08c9b96e4ef24bdf5ff31adc3ec
parent900893112570eea5a01c0573ae1fa1e3a72397e9 (diff)
downloadrust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.tar.gz
rust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.zip
Allow more "error" values in try_recv()
This should allow callers to know whether the channel was empty or disconnected
without having to block.

Closes #11087
-rw-r--r--src/libextra/comm.rs14
-rw-r--r--src/libextra/sync.rs9
-rw-r--r--src/libgreen/sched.rs5
-rw-r--r--src/libstd/comm/mod.rs104
-rw-r--r--src/libstd/comm/select.rs9
-rw-r--r--src/libstd/io/signal.rs3
-rw-r--r--src/libstd/io/timer.rs5
7 files changed, 114 insertions, 35 deletions
diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs
index 52b5bedb7ea..bd1a46ae9bf 100644
--- a/src/libextra/comm.rs
+++ b/src/libextra/comm.rs
@@ -16,6 +16,8 @@ Higher level communication abstractions.
 
 #[allow(missing_doc)];
 
+use std::comm;
+
 /// An extension of `pipes::stream` that allows both sending and receiving.
 pub struct DuplexStream<T, U> {
     priv chan: Chan<T>,
@@ -40,7 +42,7 @@ impl<T:Send,U:Send> DuplexStream<T, U> {
     pub fn recv(&self) -> U {
         self.port.recv()
     }
-    pub fn try_recv(&self) -> Option<U> {
+    pub fn try_recv(&self) -> comm::TryRecvResult<U> {
         self.port.try_recv()
     }
     pub fn recv_opt(&self) -> Option<U> {
@@ -77,11 +79,11 @@ impl<T: Send> SyncPort<T> {
         })
     }
 
-    pub fn try_recv(&self) -> Option<T> {
-        self.duplex_stream.try_recv().map(|val| {
-            self.duplex_stream.try_send(());
-            val
-        })
+    pub fn try_recv(&self) -> comm::TryRecvResult<T> {
+        match self.duplex_stream.try_recv() {
+            comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
+            state => state,
+        }
     }
 }
 
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs
index 12566ac8551..044e5e9e509 100644
--- a/src/libextra/sync.rs
+++ b/src/libextra/sync.rs
@@ -19,6 +19,7 @@
 
 
 use std::borrow;
+use std::comm;
 use std::unstable::sync::Exclusive;
 use std::sync::arc::UnsafeArc;
 use std::sync::atomics;
@@ -49,7 +50,7 @@ impl WaitQueue {
     // Signals one live task from the queue.
     fn signal(&self) -> bool {
         match self.head.try_recv() {
-            Some(ch) => {
+            comm::Data(ch) => {
                 // Send a wakeup signal. If the waiter was killed, its port will
                 // have closed. Keep trying until we get a live task.
                 if ch.try_send_deferred(()) {
@@ -58,7 +59,7 @@ impl WaitQueue {
                     self.signal()
                 }
             }
-            None => false
+            _ => false
         }
     }
 
@@ -66,12 +67,12 @@ impl WaitQueue {
         let mut count = 0;
         loop {
             match self.head.try_recv() {
-                None => break,
-                Some(ch) => {
+                comm::Data(ch) => {
                     if ch.try_send_deferred(()) {
                         count += 1;
                     }
                 }
+                _ => break
             }
         }
         count
diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs
index b0b88e4be79..1ae4d07af18 100644
--- a/src/libgreen/sched.rs
+++ b/src/libgreen/sched.rs
@@ -958,6 +958,7 @@ fn new_sched_rng() -> XorShiftRng {
 
 #[cfg(test)]
 mod test {
+    use std::comm;
     use std::task::TaskOpts;
     use std::rt::Runtime;
     use std::rt::task::Task;
@@ -1376,7 +1377,7 @@ mod test {
             // This task should not be able to starve the sender;
             // The sender should get stolen to another thread.
             do spawn {
-                while port.try_recv().is_none() { }
+                while port.try_recv() != comm::Data(()) { }
             }
 
             chan.send(());
@@ -1393,7 +1394,7 @@ mod test {
             // This task should not be able to starve the other task.
             // The sends should eventually yield.
             do spawn {
-                while port.try_recv().is_none() {
+                while port.try_recv() != comm::Data(()) {
                     chan2.send(());
                 }
             }
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index bf37e5fca6a..bf9e28f3e97 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -251,6 +251,7 @@ macro_rules! test (
             #[allow(unused_imports)];
 
             use native;
+            use comm::*;
             use prelude::*;
             use super::*;
             use super::super::*;
@@ -323,6 +324,20 @@ pub struct SharedChan<T> {
     priv queue: mpsc::Producer<T, Packet>,
 }
 
+/// This enumeration is the list of the possible reasons that try_recv could not
+/// return data when called.
+#[deriving(Eq, Clone)]
+pub enum TryRecvResult<T> {
+    /// This channel is currently empty, but the sender(s) have not yet
+    /// disconnected, so data may yet become available.
+    Empty,
+    /// This channel's sending half has become disconnected, and there will
+    /// never be any more data received on this channel
+    Disconnected,
+    /// The channel had some data and we successfully popped it
+    Data(T),
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // Internal struct definitions
 ///////////////////////////////////////////////////////////////////////////////
@@ -739,11 +754,11 @@ impl<T: Send> Port<T> {
     /// block on a port.
     ///
     /// This function cannot fail.
-    pub fn try_recv(&self) -> Option<T> {
+    pub fn try_recv(&self) -> TryRecvResult<T> {
         self.try_recv_inc(true)
     }
 
-    fn try_recv_inc(&self, increment: bool) -> Option<T> {
+    fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
         // This is a "best effort" situation, so if a queue is inconsistent just
         // don't worry about it.
         let this = unsafe { cast::transmute_mut(self) };
@@ -807,7 +822,35 @@ impl<T: Send> Port<T> {
         if increment && ret.is_some() {
             unsafe { (*this.queue.packet()).steals += 1; }
         }
-        return ret;
+        match ret {
+            Some(t) => Data(t),
+            None => {
+                // It's possible that between the time that we saw the queue was
+                // empty and here the other side disconnected. It's also
+                // possible for us to see the disconnection here while there is
+                // data in the queue. It's pretty backwards-thinking to return
+                // Disconnected when there's actually data on the queue, so if
+                // we see a disconnected state be sure to check again to be 100%
+                // sure that there's no data in the queue.
+                let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
+                if cnt != DISCONNECTED { return Empty }
+
+                let ret = match this.queue {
+                    SPSC(ref mut queue) => queue.pop(),
+                    MPSC(ref mut queue) => match queue.pop() {
+                        mpsc::Data(t) => Some(t),
+                        mpsc::Empty => None,
+                        mpsc::Inconsistent => {
+                            fail!("inconsistent with no senders?!");
+                        }
+                    }
+                };
+                match ret {
+                    Some(data) => Data(data),
+                    None => Disconnected,
+                }
+            }
+        }
     }
 
     /// Attempt to wait for a value on this port, but does not fail if the
@@ -824,7 +867,11 @@ impl<T: Send> Port<T> {
     /// the value found on the port is returned.
     pub fn recv_opt(&self) -> Option<T> {
         // optimistic preflight check (scheduling is expensive)
-        match self.try_recv() { None => {}, data => return data }
+        match self.try_recv() {
+            Empty => {},
+            Disconnected => return None,
+            Data(t) => return Some(t),
+        }
 
         let packet;
         let this;
@@ -843,12 +890,11 @@ impl<T: Send> Port<T> {
             });
         }
 
-        let data = self.try_recv_inc(false);
-        if data.is_none() &&
-           unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
-            fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
+        match self.try_recv_inc(false) {
+            Data(t) => Some(t),
+            Empty => fail!("bug: woke up too soon"),
+            Disconnected => None,
         }
-        return data;
     }
 
     /// Returns an iterator which will block waiting for messages, but never
@@ -1005,7 +1051,10 @@ mod test {
             for _ in range(0, AMT * NTHREADS) {
                 assert_eq!(p.recv(), 1);
             }
-            assert_eq!(p.try_recv(), None);
+            match p.try_recv() {
+                Data(..) => fail!(),
+                _ => {}
+            }
             c1.send(());
         }
 
@@ -1129,7 +1178,7 @@ mod test {
     test!(fn oneshot_single_thread_try_recv_open() {
         let (port, chan) = Chan::<int>::new();
         chan.send(10);
-        assert!(port.try_recv() == Some(10));
+        assert!(port.recv_opt() == Some(10));
     })
 
     test!(fn oneshot_single_thread_try_recv_closed() {
@@ -1140,21 +1189,21 @@ mod test {
 
     test!(fn oneshot_single_thread_peek_data() {
         let (port, chan) = Chan::<int>::new();
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Empty)
         chan.send(10);
-        assert!(port.try_recv().is_some());
+        assert_eq!(port.try_recv(), Data(10));
     })
 
     test!(fn oneshot_single_thread_peek_close() {
         let (port, chan) = Chan::<int>::new();
         { let _c = chan; }
-        assert!(port.try_recv().is_none());
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Disconnected);
+        assert_eq!(port.try_recv(), Disconnected);
     })
 
     test!(fn oneshot_single_thread_peek_open() {
         let (port, _) = Chan::<int>::new();
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Empty);
     })
 
     test!(fn oneshot_multi_task_recv_then_send() {
@@ -1321,4 +1370,27 @@ mod test {
         drop(chan);
         assert_eq!(count_port.recv(), 4);
     })
+
+    test!(fn try_recv_states() {
+        let (p, c) = Chan::<int>::new();
+        let (p1, c1) = Chan::<()>::new();
+        let (p2, c2) = Chan::<()>::new();
+        do spawn {
+            p1.recv();
+            c.send(1);
+            c2.send(());
+            p1.recv();
+            drop(c);
+            c2.send(());
+        }
+
+        assert_eq!(p.try_recv(), Empty);
+        c1.send(());
+        p2.recv();
+        assert_eq!(p.try_recv(), Data(1));
+        assert_eq!(p.try_recv(), Empty);
+        c1.send(());
+        p2.recv();
+        assert_eq!(p.try_recv(), Disconnected);
+    })
 }
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
index 302c9d9ea46..fa5ec1d3e30 100644
--- a/src/libstd/comm/select.rs
+++ b/src/libstd/comm/select.rs
@@ -45,6 +45,7 @@
 #[allow(dead_code)];
 
 use cast;
+use comm;
 use iter::Iterator;
 use kinds::Send;
 use ops::Drop;
@@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> {
     pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
     /// Immediately attempt to receive a value on a port, this function will
     /// never block. Has the same semantics as `Port.try_recv`.
-    pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
+    pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
+        self.port.try_recv()
+    }
 }
 
 #[unsafe_destructor]
@@ -409,8 +412,8 @@ mod test {
             a = p1.recv() => { assert_eq!(a, 1); },
             a = p2.recv() => { assert_eq!(a, 2); }
         )
-        assert_eq!(p1.try_recv(), None);
-        assert_eq!(p2.try_recv(), None);
+        assert_eq!(p1.try_recv(), Empty);
+        assert_eq!(p2.try_recv(), Empty);
         c3.send(());
     })
 
diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs
index 34b4ed5e1ef..0f05254b034 100644
--- a/src/libstd/io/signal.rs
+++ b/src/libstd/io/signal.rs
@@ -144,6 +144,7 @@ impl Listener {
 #[cfg(test)]
 mod test {
     use libc;
+    use comm::Empty;
     use io::timer;
     use super::{Listener, Interrupt};
 
@@ -194,7 +195,7 @@ mod test {
         s2.unregister(Interrupt);
         sigint();
         timer::sleep(10);
-        assert!(s2.port.try_recv().is_none());
+        assert_eq!(s2.port.try_recv(), Empty);
     }
 
     #[cfg(windows)]
diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs
index 7c9aa28bfe9..d156a7460e1 100644
--- a/src/libstd/io/timer.rs
+++ b/src/libstd/io/timer.rs
@@ -123,7 +123,7 @@ mod test {
         let port1 = timer.oneshot(10000);
         let port = timer.oneshot(1);
         port.recv();
-        assert_eq!(port1.try_recv(), None);
+        assert!(port1.recv_opt().is_none());
     }
 
     #[test]
@@ -131,8 +131,7 @@ mod test {
         let mut timer = Timer::new().unwrap();
         let port = timer.oneshot(100000000000);
         timer.sleep(1); // this should invalidate the port
-
-        assert_eq!(port.try_recv(), None);
+        assert!(port.recv_opt().is_none());
     }
 
     #[test]