about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2016-07-21 22:39:48 -0700
committerGitHub <noreply@github.com>2016-07-21 22:39:48 -0700
commit0d7597588d5aa7993dcd26cb0d773ebd27384d91 (patch)
treeb7f81a0c81d8e6c346c47e648724da25f83a991f /src
parentd46ed83e2e410d6c144657ef284f7ab649e3b70d (diff)
parent05af033b7fec63638497a9780e6b323d327d1e17 (diff)
downloadrust-0d7597588d5aa7993dcd26cb0d773ebd27384d91.tar.gz
rust-0d7597588d5aa7993dcd26cb0d773ebd27384d91.zip
Auto merge of #34724 - mitchmindtree:mpsc_receiver_try_recv, r=alexcrichton
Add a method to the mpsc::Receiver for producing a non-blocking iterator

Currently, the `mpsc::Receiver` offers methods for receiving values in both blocking (`recv`) and non-blocking (`try_recv`) flavours. However only blocking iteration over values is supported. This PR adds a non-blocking iterator to complement the `try_recv` method, just as the blocking iterator complements the `recv` method.

Use-case
-------------

I predominantly use rust in my work on real-time systems and in particular real-time audio generation/processing. I use `mpsc::channel`s to communicate between threads in a purely non-blocking manner. I.e. I might send messages from the GUI thread to the audio thread to update the state of the dsp-graph, or from the audio thread to the GUI thread to display the RMS of each node. These are just a couple examples (I'm probably using 30+ channels across my various projects). I almost exclusively use the `mpsc::Receiver::try_recv` method to avoid blocking any of the real-time threads and causing unwanted glitching/stuttering. Now that I mention it, I can't think of a single time that I personally have used the `recv` method (though I can of course see why it would be useful, and perhaps the common case for many people).

As a result of this experience, I can't help but feel there is a large hole in the `Receiver` API.

| blocking | non-blocking |
|------------|--------------------|
| `recv` | `try_recv` |
| `iter` | 🙀   |

For the most part, I've been working around this using `while let Ok(v) = r.try_recv() { ... }`, however as nice as this is, it is clearly no match for the Iterator API.

As an example, in the majority of my channel use cases I only want to check for *n* number of messages before breaking from the loop so that I don't miss the audio IO callback or hog the GUI thread for too long when an unexpectedly large number of messages are sent. Currently, I have to write something like this:

```rust
let mut take = 100;
while let Ok(msg) = rx.try_recv() {
    // Do stuff with msg
    if take == 0 {
        break;
    }
    take -= 1;
}
```

or wrap the `try_recv` call in a `Range<usize>`/`FilterMap` iterator combo.

On the other hand, this PR would allow for the following:

```rust
for msg in rx.try_iter().take(100) {
    // Do stuff with msg
}
```

I imagine this might also be useful to game devs, embedded or anyone doing message passing across real-time threads.
Diffstat (limited to 'src')
-rw-r--r--src/libstd/sync/mpsc/mod.rs56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
index d96fd6228e6..11f785dffd1 100644
--- a/src/libstd/sync/mpsc/mod.rs
+++ b/src/libstd/sync/mpsc/mod.rs
@@ -311,6 +311,17 @@ pub struct Iter<'a, T: 'a> {
     rx: &'a Receiver<T>
 }
 
+/// An iterator that attempts to yield all pending values for a receiver.
+/// `None` will be returned when there are no pending values remaining or
+/// if the corresponding channel has hung up.
+///
+/// This Iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return `None`.
+#[unstable(feature = "receiver_try_iter", issue = "34931")]
+pub struct TryIter<'a, T: 'a> {
+    rx: &'a Receiver<T>
+}
+
 /// An owning iterator over messages on a receiver, this iterator will block
 /// whenever `next` is called, waiting for a new message, and `None` will be
 /// returned when the corresponding channel has hung up.
@@ -982,6 +993,16 @@ impl<T> Receiver<T> {
     pub fn iter(&self) -> Iter<T> {
         Iter { rx: self }
     }
+
+    /// Returns an iterator that will attempt to yield all pending values.
+    /// It will return `None` if there are no more pending values or if the
+    /// channel has hung up. The iterator will never `panic!` or block the
+    /// user by waiting for values.
+    #[unstable(feature = "receiver_try_iter", issue = "34931")]
+    pub fn try_iter(&self) -> TryIter<T> {
+        TryIter { rx: self }
+    }
+
 }
 
 impl<T> select::Packet for Receiver<T> {
@@ -1077,6 +1098,13 @@ impl<'a, T> Iterator for Iter<'a, T> {
     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
 }
 
+#[unstable(feature = "receiver_try_iter", issue = "34931")]
+impl<'a, T> Iterator for TryIter<'a, T> {
+    type Item = T;
+
+    fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
+}
+
 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
 impl<'a, T> IntoIterator for &'a Receiver<T> {
     type Item = T;
@@ -1815,6 +1843,34 @@ mod tests {
     }
 
     #[test]
+    fn test_recv_try_iter() {
+        let (request_tx, request_rx) = channel();
+        let (response_tx, response_rx) = channel();
+
+        // Request `x`s until we have `6`.
+        let t = thread::spawn(move|| {
+            let mut count = 0;
+            loop {
+                for x in response_rx.try_iter() {
+                    count += x;
+                    if count == 6 {
+                        return count;
+                    }
+                }
+                request_tx.send(()).unwrap();
+            }
+        });
+
+        for _ in request_rx.iter() {
+            if response_tx.send(2).is_err() {
+                break;
+            }
+        }
+
+        assert_eq!(t.join().unwrap(), 6);
+    }
+
+    #[test]
     fn test_recv_into_iter_owned() {
         let mut iter = {
           let (tx, rx) = channel::<i32>();