about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-21 22:15:04 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-01-15 11:21:56 -0800
commitadb895a34f6d0b925b8ef877289ca6e3c4d854d4 (patch)
treefc5a2b0a6930d08c9b96e4ef24bdf5ff31adc3ec /src/libstd
parent900893112570eea5a01c0573ae1fa1e3a72397e9 (diff)
downloadrust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.tar.gz
rust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.zip
Allow more "error" values in try_recv()
This should allow callers to know whether the channel was empty or disconnected
without having to block.

Closes #11087
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/comm/mod.rs104
-rw-r--r--src/libstd/comm/select.rs9
-rw-r--r--src/libstd/io/signal.rs3
-rw-r--r--src/libstd/io/timer.rs5
4 files changed, 98 insertions, 23 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index bf37e5fca6a..bf9e28f3e97 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -251,6 +251,7 @@ macro_rules! test (
             #[allow(unused_imports)];
 
             use native;
+            use comm::*;
             use prelude::*;
             use super::*;
             use super::super::*;
@@ -323,6 +324,20 @@ pub struct SharedChan<T> {
     priv queue: mpsc::Producer<T, Packet>,
 }
 
+/// This enumeration is the list of the possible reasons that try_recv could not
+/// return data when called.
+#[deriving(Eq, Clone)]
+pub enum TryRecvResult<T> {
+    /// This channel is currently empty, but the sender(s) have not yet
+    /// disconnected, so data may yet become available.
+    Empty,
+    /// This channel's sending half has become disconnected, and there will
+    /// never be any more data received on this channel
+    Disconnected,
+    /// The channel had some data and we successfully popped it
+    Data(T),
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // Internal struct definitions
 ///////////////////////////////////////////////////////////////////////////////
@@ -739,11 +754,11 @@ impl<T: Send> Port<T> {
     /// block on a port.
     ///
     /// This function cannot fail.
-    pub fn try_recv(&self) -> Option<T> {
+    pub fn try_recv(&self) -> TryRecvResult<T> {
         self.try_recv_inc(true)
     }
 
-    fn try_recv_inc(&self, increment: bool) -> Option<T> {
+    fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
         // This is a "best effort" situation, so if a queue is inconsistent just
         // don't worry about it.
         let this = unsafe { cast::transmute_mut(self) };
@@ -807,7 +822,35 @@ impl<T: Send> Port<T> {
         if increment && ret.is_some() {
             unsafe { (*this.queue.packet()).steals += 1; }
         }
-        return ret;
+        match ret {
+            Some(t) => Data(t),
+            None => {
+                // It's possible that between the time that we saw the queue was
+                // empty and here the other side disconnected. It's also
+                // possible for us to see the disconnection here while there is
+                // data in the queue. It's pretty backwards-thinking to return
+                // Disconnected when there's actually data on the queue, so if
+                // we see a disconnected state be sure to check again to be 100%
+                // sure that there's no data in the queue.
+                let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
+                if cnt != DISCONNECTED { return Empty }
+
+                let ret = match this.queue {
+                    SPSC(ref mut queue) => queue.pop(),
+                    MPSC(ref mut queue) => match queue.pop() {
+                        mpsc::Data(t) => Some(t),
+                        mpsc::Empty => None,
+                        mpsc::Inconsistent => {
+                            fail!("inconsistent with no senders?!");
+                        }
+                    }
+                };
+                match ret {
+                    Some(data) => Data(data),
+                    None => Disconnected,
+                }
+            }
+        }
     }
 
     /// Attempt to wait for a value on this port, but does not fail if the
@@ -824,7 +867,11 @@ impl<T: Send> Port<T> {
     /// the value found on the port is returned.
     pub fn recv_opt(&self) -> Option<T> {
         // optimistic preflight check (scheduling is expensive)
-        match self.try_recv() { None => {}, data => return data }
+        match self.try_recv() {
+            Empty => {},
+            Disconnected => return None,
+            Data(t) => return Some(t),
+        }
 
         let packet;
         let this;
@@ -843,12 +890,11 @@ impl<T: Send> Port<T> {
             });
         }
 
-        let data = self.try_recv_inc(false);
-        if data.is_none() &&
-           unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
-            fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
+        match self.try_recv_inc(false) {
+            Data(t) => Some(t),
+            Empty => fail!("bug: woke up too soon"),
+            Disconnected => None,
         }
-        return data;
     }
 
     /// Returns an iterator which will block waiting for messages, but never
@@ -1005,7 +1051,10 @@ mod test {
             for _ in range(0, AMT * NTHREADS) {
                 assert_eq!(p.recv(), 1);
             }
-            assert_eq!(p.try_recv(), None);
+            match p.try_recv() {
+                Data(..) => fail!(),
+                _ => {}
+            }
             c1.send(());
         }
 
@@ -1129,7 +1178,7 @@ mod test {
     test!(fn oneshot_single_thread_try_recv_open() {
         let (port, chan) = Chan::<int>::new();
         chan.send(10);
-        assert!(port.try_recv() == Some(10));
+        assert!(port.recv_opt() == Some(10));
     })
 
     test!(fn oneshot_single_thread_try_recv_closed() {
@@ -1140,21 +1189,21 @@ mod test {
 
     test!(fn oneshot_single_thread_peek_data() {
         let (port, chan) = Chan::<int>::new();
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Empty)
         chan.send(10);
-        assert!(port.try_recv().is_some());
+        assert_eq!(port.try_recv(), Data(10));
     })
 
     test!(fn oneshot_single_thread_peek_close() {
         let (port, chan) = Chan::<int>::new();
         { let _c = chan; }
-        assert!(port.try_recv().is_none());
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Disconnected);
+        assert_eq!(port.try_recv(), Disconnected);
     })
 
     test!(fn oneshot_single_thread_peek_open() {
         let (port, _) = Chan::<int>::new();
-        assert!(port.try_recv().is_none());
+        assert_eq!(port.try_recv(), Empty);
     })
 
     test!(fn oneshot_multi_task_recv_then_send() {
@@ -1321,4 +1370,27 @@ mod test {
         drop(chan);
         assert_eq!(count_port.recv(), 4);
     })
+
+    test!(fn try_recv_states() {
+        let (p, c) = Chan::<int>::new();
+        let (p1, c1) = Chan::<()>::new();
+        let (p2, c2) = Chan::<()>::new();
+        do spawn {
+            p1.recv();
+            c.send(1);
+            c2.send(());
+            p1.recv();
+            drop(c);
+            c2.send(());
+        }
+
+        assert_eq!(p.try_recv(), Empty);
+        c1.send(());
+        p2.recv();
+        assert_eq!(p.try_recv(), Data(1));
+        assert_eq!(p.try_recv(), Empty);
+        c1.send(());
+        p2.recv();
+        assert_eq!(p.try_recv(), Disconnected);
+    })
 }
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
index 302c9d9ea46..fa5ec1d3e30 100644
--- a/src/libstd/comm/select.rs
+++ b/src/libstd/comm/select.rs
@@ -45,6 +45,7 @@
 #[allow(dead_code)];
 
 use cast;
+use comm;
 use iter::Iterator;
 use kinds::Send;
 use ops::Drop;
@@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> {
     pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
     /// Immediately attempt to receive a value on a port, this function will
     /// never block. Has the same semantics as `Port.try_recv`.
-    pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
+    pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
+        self.port.try_recv()
+    }
 }
 
 #[unsafe_destructor]
@@ -409,8 +412,8 @@ mod test {
             a = p1.recv() => { assert_eq!(a, 1); },
             a = p2.recv() => { assert_eq!(a, 2); }
         )
-        assert_eq!(p1.try_recv(), None);
-        assert_eq!(p2.try_recv(), None);
+        assert_eq!(p1.try_recv(), Empty);
+        assert_eq!(p2.try_recv(), Empty);
         c3.send(());
     })
 
diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs
index 34b4ed5e1ef..0f05254b034 100644
--- a/src/libstd/io/signal.rs
+++ b/src/libstd/io/signal.rs
@@ -144,6 +144,7 @@ impl Listener {
 #[cfg(test)]
 mod test {
     use libc;
+    use comm::Empty;
     use io::timer;
     use super::{Listener, Interrupt};
 
@@ -194,7 +195,7 @@ mod test {
         s2.unregister(Interrupt);
         sigint();
         timer::sleep(10);
-        assert!(s2.port.try_recv().is_none());
+        assert_eq!(s2.port.try_recv(), Empty);
     }
 
     #[cfg(windows)]
diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs
index 7c9aa28bfe9..d156a7460e1 100644
--- a/src/libstd/io/timer.rs
+++ b/src/libstd/io/timer.rs
@@ -123,7 +123,7 @@ mod test {
         let port1 = timer.oneshot(10000);
         let port = timer.oneshot(1);
         port.recv();
-        assert_eq!(port1.try_recv(), None);
+        assert!(port1.recv_opt().is_none());
     }
 
     #[test]
@@ -131,8 +131,7 @@ mod test {
         let mut timer = Timer::new().unwrap();
         let port = timer.oneshot(100000000000);
         timer.sleep(1); // this should invalidate the port
-
-        assert_eq!(port.try_recv(), None);
+        assert!(port.recv_opt().is_none());
     }
 
     #[test]