about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--library/alloc/src/boxed.rs14
-rw-r--r--library/alloc/src/lib.rs1
-rw-r--r--library/core/src/lib.rs2
-rw-r--r--library/core/src/stream/mod.rs154
-rw-r--r--library/core/src/stream/stream/mod.rs129
-rw-r--r--library/core/src/stream/stream/next.rs30
-rw-r--r--library/std/src/lib.rs3
-rw-r--r--library/std/src/panic.rs14
8 files changed, 347 insertions, 0 deletions
diff --git a/library/alloc/src/boxed.rs b/library/alloc/src/boxed.rs
index 0aa52b35ced..e586ff89902 100644
--- a/library/alloc/src/boxed.rs
+++ b/library/alloc/src/boxed.rs
@@ -149,6 +149,7 @@ use core::ops::{
 };
 use core::pin::Pin;
 use core::ptr::{self, Unique};
+use core::stream::Stream;
 use core::task::{Context, Poll};
 
 use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
@@ -1618,3 +1619,16 @@ where
         F::poll(Pin::new(&mut *self), cx)
     }
 }
+
+#[unstable(feature = "async_stream", issue = "79024")]
+impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
+    type Item = S::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        Pin::new(&mut **self).poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (**self).size_hint()
+    }
+}
diff --git a/library/alloc/src/lib.rs b/library/alloc/src/lib.rs
index 8d721ed7487..e524eb05fcd 100644
--- a/library/alloc/src/lib.rs
+++ b/library/alloc/src/lib.rs
@@ -82,6 +82,7 @@
 #![feature(array_windows)]
 #![feature(allow_internal_unstable)]
 #![feature(arbitrary_self_types)]
+#![feature(async_stream)]
 #![feature(box_patterns)]
 #![feature(box_syntax)]
 #![feature(cfg_sanitize)]
diff --git a/library/core/src/lib.rs b/library/core/src/lib.rs
index 263c6c9cf0f..a4395ab57e8 100644
--- a/library/core/src/lib.rs
+++ b/library/core/src/lib.rs
@@ -254,6 +254,8 @@ pub mod panicking;
 pub mod pin;
 pub mod raw;
 pub mod result;
+#[unstable(feature = "async_stream", issue = "79024")]
+pub mod stream;
 pub mod sync;
 
 pub mod fmt;
diff --git a/library/core/src/stream/mod.rs b/library/core/src/stream/mod.rs
new file mode 100644
index 00000000000..48cca497292
--- /dev/null
+++ b/library/core/src/stream/mod.rs
@@ -0,0 +1,154 @@
+//! Composable asynchronous iteration.
+//!
+//! If futures are asynchronous values, then streams are asynchronous
+//! iterators. If you've found yourself with an asynchronous collection of some kind,
+//! and needed to perform an operation on the elements of said collection,
+//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
+//! asynchronous Rust code, so it's worth becoming familiar with them.
+//!
+//! Before explaining more, let's talk about how this module is structured:
+//!
+//! # Organization
+//!
+//! This module is largely organized by type:
+//!
+//! * [Traits] are the core portion: these traits define what kind of streams
+//!   exist and what you can do with them. The methods of these traits are worth
+//!   putting some extra study time into.
+//! * Functions provide some helpful ways to create some basic streams.
+//! * [Structs] are often the return types of the various methods on this
+//!   module's traits. You'll usually want to look at the method that creates
+//!   the `struct`, rather than the `struct` itself. For more detail about why,
+//!   see '[Implementing Stream](#implementing-stream)'.
+//!
+//! [Traits]: #traits
+//! [Structs]: #structs
+//!
+//! That's it! Let's dig into streams.
+//!
+//! # Stream
+//!
+//! The heart and soul of this module is the [`Stream`] trait. The core of
+//! [`Stream`] looks like this:
+//!
+//! ```
+//! # use core::task::{Context, Poll};
+//! # use core::pin::Pin;
+//! trait Stream {
+//!     type Item;
+//!     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
+//! }
+//! ```
+//!
+//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
+//! method which is used when implementing a `Stream`, and the [`next`] method
+//! which is used when consuming a stream. Consumers of `Stream` only need to
+//! consider [`next`], which when called, returns a future which yields
+//! yields [`Option`][`<Item>`].
+//!
+//! The future returned by [`next`] will yield `Some(Item)` as long as there are
+//! elements, and once they've all been exhausted, will yield `None` to indicate
+//! that iteration is finished. If we're waiting on something asynchronous to
+//! resolve, the future will wait until the stream is ready to yield again.
+//!
+//! Individual streams may choose to resume iteration, and so calling [`next`]
+//! again may or may not eventually yield `Some(Item)` again at some point.
+//!
+//! [`Stream`]'s full definition includes a number of other methods as well,
+//! but they are default methods, built on top of [`poll_next`], and so you get
+//! them for free.
+//!
+//! [`Poll`]: super::task::Poll
+//! [`poll_next`]: Stream::poll_next
+//! [`next`]: Stream::next
+//! [`<Item>`]: Stream::Item
+//!
+//! # Implementing Stream
+//!
+//! Creating a stream of your own involves two steps: creating a `struct` to
+//! hold the stream's state, and then implementing [`Stream`] for that
+//! `struct`.
+//!
+//! Let's make a stream named `Counter` which counts from `1` to `5`:
+//!
+//! ```no_run
+//! #![feature(async_stream)]
+//! # use core::stream::Stream;
+//! # use core::task::{Context, Poll};
+//! # use core::pin::Pin;
+//!
+//! // First, the struct:
+//!
+//! /// A stream which counts from one to five
+//! struct Counter {
+//!     count: usize,
+//! }
+//!
+//! // we want our count to start at one, so let's add a new() method to help.
+//! // This isn't strictly necessary, but is convenient. Note that we start
+//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
+//! impl Counter {
+//!     fn new() -> Counter {
+//!         Counter { count: 0 }
+//!     }
+//! }
+//!
+//! // Then, we implement `Stream` for our `Counter`:
+//!
+//! impl Stream for Counter {
+//!     // we will be counting with usize
+//!     type Item = usize;
+//!
+//!     // poll_next() is the only required method
+//!     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+//!         // Increment our count. This is why we started at zero.
+//!         self.count += 1;
+//!
+//!         // Check to see if we've finished counting or not.
+//!         if self.count < 6 {
+//!             Poll::Ready(Some(self.count))
+//!         } else {
+//!             Poll::Ready(None)
+//!         }
+//!     }
+//! }
+//!
+//! // And now we can use it!
+//! # async fn run() {
+//! #
+//! let mut counter = Counter::new();
+//!
+//! let x = counter.next().await.unwrap();
+//! println!("{}", x);
+//!
+//! let x = counter.next().await.unwrap();
+//! println!("{}", x);
+//!
+//! let x = counter.next().await.unwrap();
+//! println!("{}", x);
+//!
+//! let x = counter.next().await.unwrap();
+//! println!("{}", x);
+//!
+//! let x = counter.next().await.unwrap();
+//! println!("{}", x);
+//! #
+//! }
+//! ```
+//!
+//! This will print `1` through `5`, each on their own line.
+//!
+//! # Laziness
+//!
+//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
+//! whole lot. Nothing really happens until you call [`next`]. This is sometimes a
+//! source of confusion when creating a stream solely for its side effects. The
+//! compiler will warn us about this kind of behavior:
+//!
+//! ```text
+//! warning: unused result that must be used: streams do nothing unless polled
+//! ```
+
+mod stream;
+
+pub use stream::{Next, Stream};
diff --git a/library/core/src/stream/stream/mod.rs b/library/core/src/stream/stream/mod.rs
new file mode 100644
index 00000000000..3f92c2e8c1c
--- /dev/null
+++ b/library/core/src/stream/stream/mod.rs
@@ -0,0 +1,129 @@
+mod next;
+
+pub use next::Next;
+
+use crate::ops::DerefMut;
+use crate::pin::Pin;
+use crate::task::{Context, Poll};
+
+/// An interface for dealing with asynchronous iterators.
+///
+/// This is the main stream trait. For more about the concept of streams
+/// generally, please see the [module-level documentation]. In particular, you
+/// may want to know how to [implement `Stream`][impl].
+///
+/// [module-level documentation]: index.html
+/// [impl]: index.html#implementing-stream
+#[unstable(feature = "async_stream", issue = "79024")]
+#[must_use = "streams do nothing unless polled"]
+pub trait Stream {
+    /// The type of items yielded by the stream.
+    type Item;
+
+    /// Attempt to pull out the next value of this stream, registering the
+    /// current task for wakeup if the value is not yet available, and returning
+    /// `None` if the stream is exhausted.
+    ///
+    /// # Return value
+    ///
+    /// There are several possible return values, each indicating a distinct
+    /// stream state:
+    ///
+    /// - `Poll::Pending` means that this stream's next value is not ready
+    /// yet. Implementations will ensure that the current task will be notified
+    /// when the next value may be ready.
+    ///
+    /// - `Poll::Ready(Some(val))` means that the stream has successfully
+    /// produced a value, `val`, and may produce further values on subsequent
+    /// `poll_next` calls.
+    ///
+    /// - `Poll::Ready(None)` means that the stream has terminated, and
+    /// `poll_next` should not be invoked again.
+    ///
+    /// # Panics
+    ///
+    /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
+    /// `poll_next` method again may panic, block forever, or cause other kinds of
+    /// problems; the `Stream` trait places no requirements on the effects of
+    /// such a call. However, as the `poll_next` method is not marked `unsafe`,
+    /// Rust's usual rules apply: calls must never cause undefined behavior
+    /// (memory corruption, incorrect use of `unsafe` functions, or the like),
+    /// regardless of the stream's state.
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
+
+    /// Returns the bounds on the remaining length of the stream.
+    ///
+    /// Specifically, `size_hint()` returns a tuple where the first element
+    /// is the lower bound, and the second element is the upper bound.
+    ///
+    /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
+    /// A [`None`] here means that either there is no known upper bound, or the
+    /// upper bound is larger than [`usize`].
+    ///
+    /// # Implementation notes
+    ///
+    /// It is not enforced that a stream implementation yields the declared
+    /// number of elements. A buggy stream may yield less than the lower bound
+    /// or more than the upper bound of elements.
+    ///
+    /// `size_hint()` is primarily intended to be used for optimizations such as
+    /// reserving space for the elements of the stream, but must not be
+    /// trusted to e.g., omit bounds checks in unsafe code. An incorrect
+    /// implementation of `size_hint()` should not lead to memory safety
+    /// violations.
+    ///
+    /// That said, the implementation should provide a correct estimation,
+    /// because otherwise it would be a violation of the trait's protocol.
+    ///
+    /// The default implementation returns `(0, `[`None`]`)` which is correct for any
+    /// stream.
+    #[inline]
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+
+    /// Advances the stream and returns a future which yields the next value.
+    ///
+    /// The returned future yields [`None`] when iteration is finished.
+    /// Individual stream implementations may choose to resume iteration, and so
+    /// calling `next()` again may or may not eventually start yielding
+    /// [`Some(Item)`] again at some point.
+    ///
+    /// [`Some(Item)`]: Some
+    fn next(&mut self) -> Next<'_, Self>
+    where
+        Self: Unpin,
+    {
+        Next::new(self)
+    }
+}
+
+#[unstable(feature = "async_stream", issue = "79024")]
+impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
+    type Item = S::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        S::poll_next(Pin::new(&mut **self), cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (**self).size_hint()
+    }
+}
+
+#[unstable(feature = "async_stream", issue = "79024")]
+impl<P> Stream for Pin<P>
+where
+    P: DerefMut + Unpin,
+    P::Target: Stream,
+{
+    type Item = <P::Target as Stream>::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.get_mut().as_mut().poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (**self).size_hint()
+    }
+}
diff --git a/library/core/src/stream/stream/next.rs b/library/core/src/stream/stream/next.rs
new file mode 100644
index 00000000000..e25d44228e7
--- /dev/null
+++ b/library/core/src/stream/stream/next.rs
@@ -0,0 +1,30 @@
+use crate::future::Future;
+use crate::pin::Pin;
+use crate::stream::Stream;
+use crate::task::{Context, Poll};
+
+/// A future which advances the stream and returns the next value.
+///
+/// This `struct` is created by [`Stream::next`]. See its documentation for more.
+#[unstable(feature = "async_stream", issue = "79024")]
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Next<'a, S: ?Sized> {
+    stream: &'a mut S,
+}
+
+impl<'a, S: ?Sized> Next<'a, S> {
+    /// Create a new instance of `Next`.
+    pub(crate) fn new(stream: &'a mut S) -> Self {
+        Self { stream }
+    }
+}
+
+#[unstable(feature = "async_stream", issue = "79024")]
+impl<S: Stream + Unpin + ?Sized> Future for Next<'_, S> {
+    type Output = Option<S::Item>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        Pin::new(&mut *self.stream).poll_next(cx)
+    }
+}
diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs
index 5ba13c2f913..f739fffd1c0 100644
--- a/library/std/src/lib.rs
+++ b/library/std/src/lib.rs
@@ -224,6 +224,7 @@
 #![feature(allocator_internals)]
 #![feature(allow_internal_unsafe)]
 #![feature(allow_internal_unstable)]
+#![feature(async_stream)]
 #![feature(arbitrary_self_types)]
 #![feature(array_error_internals)]
 #![feature(asm)]
@@ -448,6 +449,8 @@ pub use core::ptr;
 pub use core::raw;
 #[stable(feature = "rust1", since = "1.0.0")]
 pub use core::result;
+#[unstable(feature = "async_stream", issue = "79024")]
+pub use core::stream;
 #[stable(feature = "i128", since = "1.26.0")]
 #[allow(deprecated, deprecated_in_future)]
 pub use core::u128;
diff --git a/library/std/src/panic.rs b/library/std/src/panic.rs
index d18b94b6c1a..66e363bf67b 100644
--- a/library/std/src/panic.rs
+++ b/library/std/src/panic.rs
@@ -12,6 +12,7 @@ use crate::panicking;
 use crate::pin::Pin;
 use crate::ptr::{NonNull, Unique};
 use crate::rc::Rc;
+use crate::stream::Stream;
 use crate::sync::atomic;
 use crate::sync::{Arc, Mutex, RwLock};
 use crate::task::{Context, Poll};
@@ -340,6 +341,19 @@ impl<F: Future> Future for AssertUnwindSafe<F> {
     }
 }
 
+#[unstable(feature = "async_stream", issue = "79024")]
+impl<S: Stream> Stream for AssertUnwindSafe<S> {
+    type Item = S::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
+        unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.0.size_hint()
+    }
+}
+
 /// Invokes a closure, capturing the cause of an unwinding panic if one occurs.
 ///
 /// This function will return `Ok` with the closure's result if the closure