diff options
| author | Tomás Vallotton <tvallotton@uc.cl> | 2023-11-30 18:11:02 -0300 |
|---|---|---|
| committer | Tomás Vallotton <tvallotton@uc.cl> | 2024-01-20 10:13:08 -0300 |
| commit | 60a08196b6f473da17fc280a8545f1b62097b4fa (patch) | |
| tree | 7bd013ef94cb3e51724302edf6d888483ad9bae3 | |
| parent | 314384b5fb21f3c163f2b871ddceabdd88319958 (diff) | |
| download | rust-60a08196b6f473da17fc280a8545f1b62097b4fa.tar.gz rust-60a08196b6f473da17fc280a8545f1b62097b4fa.zip | |
feat: add LocalWaker type, ContextBuilder type, and LocalWake trait.
| -rw-r--r-- | library/alloc/src/lib.rs | 1 | ||||
| -rw-r--r-- | library/alloc/src/task.rs | 165 | ||||
| -rw-r--r-- | library/core/src/task/mod.rs | 2 | ||||
| -rw-r--r-- | library/core/src/task/wake.rs | 361 |
4 files changed, 504 insertions, 25 deletions
diff --git a/library/alloc/src/lib.rs b/library/alloc/src/lib.rs index 78629b39d34..c8b4cebdf89 100644 --- a/library/alloc/src/lib.rs +++ b/library/alloc/src/lib.rs @@ -135,6 +135,7 @@ #![feature(iter_next_chunk)] #![feature(iter_repeat_n)] #![feature(layout_for_ptr)] +#![feature(local_waker)] #![feature(maybe_uninit_slice)] #![feature(maybe_uninit_uninit_array)] #![feature(maybe_uninit_uninit_array_transpose)] diff --git a/library/alloc/src/task.rs b/library/alloc/src/task.rs index 5d9772b878b..16ae5241da8 100644 --- a/library/alloc/src/task.rs +++ b/library/alloc/src/task.rs @@ -7,8 +7,9 @@ //! `#[cfg(target_has_atomic = "ptr")]`. use core::mem::ManuallyDrop; -use core::task::{RawWaker, RawWakerVTable, Waker}; +use core::task::{LocalWaker, RawWaker, RawWakerVTable, Waker}; +use crate::rc::Rc; use crate::sync::Arc; /// The implementation of waking a task on an executor. @@ -152,3 +153,165 @@ fn raw_waker<W: Wake + Send + Sync + 'static>(waker: Arc<W>) -> RawWaker { &RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>), ) } + +/// An analogous trait to `Wake` but used to construct a `LocalWaker`. This API +/// works in exactly the same way as `Wake`, except that it uses an `Rc` instead +/// of an `Arc`, and the result is a `LocalWaker` instead of a `Waker`. +/// +/// The benefits of using `LocalWaker` over `Waker` are that it allows the local waker +/// to hold data that does not implement `Send` and `Sync`. Additionally, it saves calls +/// to `Arc::clone`, which requires atomic synchronization. +/// + +/// # Examples +/// +/// A +/// +/// This is a simplified example of a `spawn` and a `block_on` function. The `spawn` function +/// is used to push new tasks onto the run queue, while the block on function will remove them +/// and poll them. When a task is woken, it will put itself back on the run queue to be polled by the executor. +/// +/// **Note:** A real world example would interlieve poll calls with calls to an io reactor to wait for events instead +/// of spinning on a loop. +/// +/// ```rust +/// use std::task::{LocalWake, ContextBuilder, LocalWaker}; +/// use std::future::Future; +/// use std::pin::Pin; +/// use std::rc::Rc; +/// use std::cell::RefCell; +/// use std::collections::VecDeque; +/// +/// +/// thread_local! { +/// // A queue containing all tasks ready to do progress +/// static RUN_QUEUE: RefCell<VecDeque<Rc<Task>>> = RefCell::default(); +/// } +/// +/// type BoxedFuture = Pin<Box<dyn Future<Output = ()>>>; +/// +/// struct Task(RefCell<BoxedFuture>); +/// +/// impl LocalWake for Task { +/// fn wake(self: Rc<Self>) { +/// RUN_QUEUE.with_borrow_mut(|queue| { +/// queue.push_back(self) +/// }) +/// } +/// } +/// +/// fn spawn<F>(future: F) +/// where +/// F: Future<Output=()> + 'static + Send + Sync +/// { +/// let task = Rc::new(Box::pin(future)); +/// RUN_QUEUE.with_borrow_mut(|queue| { +/// queue.push_back(task) +/// }); +/// } +/// +/// fn block_on<F>(future: F) +/// where +/// F: Future<Output=()> + 'static + Sync + Send +/// { +/// spawn(future); +/// loop { +/// let Some(task) = RUN_QUEUE.with_borrow_mut(|queue|queue.pop_front()) else { +/// // we exit, since there are no more tasks remaining on the queue +/// return; +/// }; +/// // cast the Rc<Task> into a `LocalWaker` +/// let waker: LocalWaker = task.into(); +/// // Build the context using `ContextBuilder` +/// let mut cx = ContextBuilder::new() +/// .local_waker(&waker) +/// .build(); +/// +/// // Poll the task +/// task.0 +/// .borrow_mut() +/// .as_mut() +/// .poll(&mut cx); +/// } +/// } +/// ``` +/// +#[unstable(feature = "local_waker", issue = "none")] +pub trait LocalWake { + /// Wake this task. + #[unstable(feature = "local_waker", issue = "none")] + fn wake(self: Rc<Self>); + + /// Wake this task without consuming the local waker. + /// + /// If an executor supports a cheaper way to wake without consuming the + /// waker, it should override this method. By default, it clones the + /// [`Rc`] and calls [`wake`] on the clone. + /// + /// [`wake`]: Rc::wake + #[unstable(feature = "local_waker", issue = "none")] + fn wake_by_ref(self: &Rc<Self>) { + self.clone().wake(); + } +} + +#[unstable(feature = "local_waker", issue = "none")] +impl<W: LocalWake + 'static> From<Rc<W>> for LocalWaker { + /// Use a `Wake`-able type as a `LocalWaker`. + /// + /// No heap allocations or atomic operations are used for this conversion. + fn from(waker: Rc<W>) -> LocalWaker { + // SAFETY: This is safe because raw_waker safely constructs + // a RawWaker from Rc<W>. + unsafe { LocalWaker::from_raw(local_raw_waker(waker)) } + } +} +#[allow(ineffective_unstable_trait_impl)] +#[unstable(feature = "local_waker", issue = "none")] +impl<W: LocalWake + 'static> From<Rc<W>> for RawWaker { + /// Use a `Wake`-able type as a `RawWaker`. + /// + /// No heap allocations or atomic operations are used for this conversion. + fn from(waker: Rc<W>) -> RawWaker { + local_raw_waker(waker) + } +} + +// NB: This private function for constructing a RawWaker is used, rather than +// inlining this into the `From<Rc<W>> for RawWaker` impl, to ensure that +// the safety of `From<Rc<W>> for Waker` does not depend on the correct +// trait dispatch - instead both impls call this function directly and +// explicitly. +#[inline(always)] +fn local_raw_waker<W: LocalWake + 'static>(waker: Rc<W>) -> RawWaker { + // Increment the reference count of the Rc to clone it. + unsafe fn clone_waker<W: LocalWake + 'static>(waker: *const ()) -> RawWaker { + unsafe { Rc::increment_strong_count(waker as *const W) }; + RawWaker::new( + waker as *const (), + &RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>), + ) + } + + // Wake by value, moving the Rc into the LocalWake::wake function + unsafe fn wake<W: LocalWake + 'static>(waker: *const ()) { + let waker = unsafe { Rc::from_raw(waker as *const W) }; + <W as LocalWake>::wake(waker); + } + + // Wake by reference, wrap the waker in ManuallyDrop to avoid dropping it + unsafe fn wake_by_ref<W: LocalWake + 'static>(waker: *const ()) { + let waker = unsafe { ManuallyDrop::new(Rc::from_raw(waker as *const W)) }; + <W as LocalWake>::wake_by_ref(&waker); + } + + // Decrement the reference count of the Rc on drop + unsafe fn drop_waker<W: LocalWake + 'static>(waker: *const ()) { + unsafe { Rc::decrement_strong_count(waker as *const W) }; + } + + RawWaker::new( + Rc::into_raw(waker) as *const (), + &RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>), + ) +} diff --git a/library/core/src/task/mod.rs b/library/core/src/task/mod.rs index 3f0080e3832..f1a789e32a7 100644 --- a/library/core/src/task/mod.rs +++ b/library/core/src/task/mod.rs @@ -8,7 +8,7 @@ pub use self::poll::Poll; mod wake; #[stable(feature = "futures_api", since = "1.36.0")] -pub use self::wake::{Context, RawWaker, RawWakerVTable, Waker}; +pub use self::wake::{Context, ContextBuilder, LocalWaker, RawWaker, RawWakerVTable, Waker}; mod ready; #[stable(feature = "ready_macro", since = "1.64.0")] diff --git a/library/core/src/task/wake.rs b/library/core/src/task/wake.rs index 077852b0120..4dedcbe47a0 100644 --- a/library/core/src/task/wake.rs +++ b/library/core/src/task/wake.rs @@ -1,5 +1,7 @@ #![stable(feature = "futures_api", since = "1.36.0")] +use crate::mem::transmute; + use crate::fmt; use crate::marker::PhantomData; use crate::ptr; @@ -60,6 +62,21 @@ impl RawWaker { pub fn vtable(&self) -> &'static RawWakerVTable { self.vtable } + + #[unstable(feature = "noop_waker", issue = "98286")] + const NOOP: RawWaker = { + const VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| RawWaker::NOOP, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, + ); + RawWaker::new(ptr::null(), &VTABLE) + }; } /// A virtual function pointer table (vtable) that specifies the behavior @@ -177,7 +194,8 @@ impl RawWakerVTable { #[stable(feature = "futures_api", since = "1.36.0")] #[lang = "Context"] pub struct Context<'a> { - waker: &'a Waker, + waker: Option<&'a Waker>, + local_waker: Option<&'a LocalWaker>, // Ensure we future-proof against variance changes by forcing // the lifetime to be invariant (argument-position lifetimes // are contravariant while return-position lifetimes are @@ -195,16 +213,36 @@ impl<'a> Context<'a> { #[must_use] #[inline] pub const fn from_waker(waker: &'a Waker) -> Self { - Context { waker, _marker: PhantomData, _marker2: PhantomData } + ContextBuilder::new().waker(waker).build() } /// Returns a reference to the [`Waker`] for the current task. + /// + /// Note that if the waker does not need to be sent across threads, it + /// is preferable to call `local_waker`, which is more portable and + /// potentially more efficient. + /// + /// # Panics + /// This function will panic if no `Waker` was set on the context. This happens if + /// the executor does not support working with thread safe wakers. An alternative + /// may be to call [`.local_waker()`](Context::local_waker) instead. #[stable(feature = "futures_api", since = "1.36.0")] #[rustc_const_unstable(feature = "const_waker", issue = "102012")] #[must_use] #[inline] pub const fn waker(&self) -> &'a Waker { - &self.waker + &self + .waker + .expect("no waker was set on this context, consider calling `local_waker` instead.") + } + /// Returns a reference to the [`LocalWaker`] for the current task. + #[unstable(feature = "local_waker", issue = "none")] + pub fn local_waker(&self) -> &'a LocalWaker { + // Safety: + // It is safe to transmute a `&Waker` into a `&LocalWaker` since both are a transparent + // wrapper around a local waker. Also, the Option<&Waker> here cannot be None since it is + // impossible to construct a Context without any waker set. + self.local_waker.unwrap_or_else(|| unsafe { transmute(self.waker) }) } } @@ -215,6 +253,94 @@ impl fmt::Debug for Context<'_> { } } +/// A Builder used to construct a `Context` instance +/// with support for `LocalWaker`. +/// +/// # Examples +/// ``` +/// #![feature(local_waker)] +/// #![feature(noop_waker)] +/// use std::task::{ContextBuilder, LocalWaker, Waker}; +/// +/// let local_waker = LocalWaker::noop(); +/// let waker = Waker::noop(); +/// +/// let context = ContextBuilder::default() +/// .local_waker(&local_waker) +/// .waker(&waker) +/// .build(); +/// ``` +#[unstable(feature = "local_waker", issue = "none")] +#[derive(Default, Debug)] +pub struct ContextBuilder<'a> { + waker: Option<&'a Waker>, + local_waker: Option<&'a LocalWaker>, +} + +impl<'a> ContextBuilder<'a> { + /// Creates a new empty `ContextBuilder`. + #[inline] + #[rustc_const_unstable(feature = "const_waker", issue = "102012")] + #[unstable(feature = "local_waker", issue = "none")] + pub const fn new() -> Self { + ContextBuilder { waker: None, local_waker: None } + } + + /// This field is used to set the value of the waker on `Context`. + #[inline] + #[rustc_const_unstable(feature = "const_waker", issue = "102012")] + #[unstable(feature = "local_waker", issue = "none")] + pub const fn waker(self, waker: &'a Waker) -> Self { + Self { waker: Some(waker), ..self } + } + + /// This method is used to set the value for the local waker on `Context`. + /// + /// # Examples + /// ``` + /// #![feature(local_waker)] + /// #![feature(noop_waker)] + /// + /// use std::task; + /// use std::pin; + /// use std::future::Future; + /// + /// let local_waker = task::LocalWaker::noop(); + /// + /// let mut context = task::ContextBuilder::new() + /// .local_waker(&local_waker) + /// .build(); + /// + /// let future = pin::pin!(async { 20 }); + /// + /// let poll = future.poll(&mut context); + /// + /// assert_eq!(poll, task::Poll::Ready(20)); + /// ``` + #[inline] + #[unstable(feature = "local_waker", issue = "none")] + #[rustc_const_unstable(feature = "const_waker", issue = "102012")] + pub const fn local_waker(self, local_waker: &'a LocalWaker) -> Self { + Self { local_waker: Some(local_waker), ..self } + } + + /// Builds the `Context`. + /// + /// # Panics + /// Panics if no `Waker` or `LocalWaker` is set. + #[inline] + #[unstable(feature = "local_waker", issue = "none")] + #[rustc_const_unstable(feature = "const_waker", issue = "102012")] + pub const fn build(self) -> Context<'a> { + let ContextBuilder { waker, local_waker } = self; + assert!( + waker.is_some() || local_waker.is_some(), + "at least one waker must be set with either the `local_waker` or `waker` methods on `ContextBuilder`." + ); + Context { waker, local_waker, _marker: PhantomData, _marker2: PhantomData } + } +} + /// A `Waker` is a handle for waking up a task by notifying its executor that it /// is ready to be run. /// @@ -229,7 +355,8 @@ impl fmt::Debug for Context<'_> { /// Implements [`Clone`], [`Send`], and [`Sync`]; therefore, a waker may be invoked /// from any thread, including ones not in any way managed by the executor. For example, /// this might be done to wake a future when a blocking function call completes on another -/// thread. +/// thread. If the waker does not need to be moved across threads, it is better to use +/// [`LocalWaker`], which the executor may use to skip unnecessary memory synchronization. /// /// Note that it is preferable to use `waker.clone_from(&new_waker)` instead /// of `*waker = new_waker.clone()`, as the former will avoid cloning the waker @@ -354,25 +481,8 @@ impl Waker { #[must_use] #[unstable(feature = "noop_waker", issue = "98286")] pub const fn noop() -> &'static Waker { - // Ideally all this data would be explicitly `static` because it is used by reference and - // only ever needs one copy. But `const fn`s (and `const` items) cannot refer to statics, - // even though their values can be promoted to static. (That might change; see #119618.) - // An alternative would be a `pub static NOOP: &Waker`, but associated static items are not - // currently allowed either, and making it non-associated would be unergonomic. - const VTABLE: RawWakerVTable = RawWakerVTable::new( - // Cloning just returns a new no-op raw waker - |_| RAW, - // `wake` does nothing - |_| {}, - // `wake_by_ref` does nothing - |_| {}, - // Dropping does nothing as we don't allocate anything - |_| {}, - ); - const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE); - const WAKER_REF: &Waker = &Waker { waker: RAW }; - - WAKER_REF + const WAKER: &Waker = &Waker { waker: RawWaker::NOOP }; + WAKER } /// Get a reference to the underlying [`RawWaker`]. @@ -425,3 +535,208 @@ impl fmt::Debug for Waker { .finish() } } + +/// A `LocalWaker` is analogous to a [`Waker`], but it does not implement [`Send`] or [`Sync`]. +/// This handle encapsulates a [`RawWaker`] instance, which defines the +/// executor-specific wakeup behavior. +/// +/// Local wakers can be requested from a `Context` with the [`local_waker`] method. +/// +/// The typical life of a `LocalWaker` is that it is constructed by an executor, wrapped in a +/// [`Context`], then passed to [`Future::poll()`]. Then, if the future chooses to return +/// [`Poll::Pending`], it must also store the waker somehow and call [`Waker::wake()`] when +/// the future should be polled again. +/// +/// Implements [`Clone`], but neither [`Send`] nor [`Sync`]; therefore, a local waker may +/// not be moved to other threads. In general, when deciding to use wakers or local wakers, +/// local wakers are preferable unless the waker needs to be sent across threads. This is because +/// wakers can incur in additional cost related to memory synchronization, and not all executors +/// may support wakers. +/// +/// Note that it is preferable to use `local_waker.clone_from(&new_waker)` instead +/// of `*local_waker = new_waker.clone()`, as the former will avoid cloning the waker +/// unnecessarily if the two wakers [wake the same task](Self::will_wake). +/// +/// # Examples +/// +/// ``` +/// #![feature(local_waker)] +/// use std::future::{Future, poll_fn}; +/// use std::task::Poll; +/// +/// // a future that returns pending once. +/// fn yield_now() -> impl Future<Output=()> + Unpin { +/// let mut yielded = false; +/// poll_fn(move |cx| { +/// if !yielded { +/// yielded = true; +/// cx.local_waker().wake_by_ref(); +/// return Poll::Pending; +/// } +/// return Poll::Ready(()) +/// }) +/// } +/// # async { +/// yield_now().await; +/// # }; +/// ``` +/// +/// [`Future::poll()`]: core::future::Future::poll +/// [`Poll::Pending`]: core::task::Poll::Pending +/// [`local_waker`]: core::task::Context::local_waker +#[unstable(feature = "local_waker", issue = "none")] +#[repr(transparent)] +pub struct LocalWaker { + waker: RawWaker, +} + +#[unstable(feature = "local_waker", issue = "none")] +impl Unpin for LocalWaker {} + +impl LocalWaker { + /// Creates a new `LocalWaker` from [`RawWaker`]. + /// + /// The behavior of the returned `Waker` is undefined if the contract defined + /// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld. + /// Therefore this method is unsafe. + #[inline] + #[must_use] + #[stable(feature = "futures_api", since = "1.36.0")] + #[rustc_const_unstable(feature = "const_waker", issue = "102012")] + pub const unsafe fn from_raw(waker: RawWaker) -> LocalWaker { + Self { waker } + } + + /// Wake up the task associated with this `LocalWaker`. + /// + /// As long as the executor keeps running and the task is not finished, it is + /// guaranteed that each invocation of [`wake()`](Self::wake) (or + /// [`wake_by_ref()`](Self::wake_by_ref)) will be followed by at least one + /// [`poll()`] of the task to which this `Waker` belongs. This makes + /// it possible to temporarily yield to other tasks while running potentially + /// unbounded processing loops. + /// + /// Note that the above implies that multiple wake-ups may be coalesced into a + /// single [`poll()`] invocation by the runtime. + /// + /// Also note that yielding to competing tasks is not guaranteed: it is the + /// executor’s choice which task to run and the executor may choose to run the + /// current task again. + /// + /// [`poll()`]: crate::future::Future::poll + #[inline] + #[stable(feature = "futures_api", since = "1.36.0")] + pub fn wake(self) { + // The actual wakeup call is delegated through a virtual function call + // to the implementation which is defined by the executor. + let wake = self.waker.vtable.wake; + let data = self.waker.data; + + // Don't call `drop` -- the waker will be consumed by `wake`. + crate::mem::forget(self); + + // SAFETY: This is safe because `Waker::from_raw` is the only way + // to initialize `wake` and `data` requiring the user to acknowledge + // that the contract of `RawWaker` is upheld. + unsafe { (wake)(data) }; + } + + /// Creates a new `LocalWaker` that does nothing when `wake` is called. + /// + /// This is mostly useful for writing tests that need a [`Context`] to poll + /// some futures, but are not expecting those futures to wake the waker or + /// do not need to do anything specific if it happens. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_waker)] + /// #![feature(noop_waker)] + /// + /// use std::future::Future; + /// use std::task::{ContextBuilder, LocalWaker}; + /// + /// let mut cx = task::ContextBuilder::new() + /// .local_waker(LocalWaker::noop()) + /// .build(); + /// + /// let mut future = Box::pin(async { 10 }); + /// assert_eq!(future.as_mut().poll(&mut cx), task::Poll::Ready(10)); + /// ``` + #[inline] + #[must_use] + #[unstable(feature = "noop_waker", issue = "98286")] + pub const fn noop() -> &'static LocalWaker { + const WAKER: &LocalWaker = &LocalWaker { waker: RawWaker::NOOP }; + WAKER + } + + /// Get a reference to the underlying [`RawWaker`]. + #[inline] + #[must_use] + #[unstable(feature = "waker_getters", issue = "87021")] + pub fn as_raw(&self) -> &RawWaker { + &self.waker + } + + /// Returns `true` if this `LocalWaker` and another `LocalWaker` would awake the same task. + /// + /// This function works on a best-effort basis, and may return false even + /// when the `Waker`s would awaken the same task. However, if this function + /// returns `true`, it is guaranteed that the `Waker`s will awaken the same task. + /// + /// This function is primarily used for optimization purposes — for example, + /// this type's [`clone_from`](Self::clone_from) implementation uses it to + /// avoid cloning the waker when they would wake the same task anyway. + #[inline] + #[must_use] + #[stable(feature = "futures_api", since = "1.36.0")] + pub fn will_wake(&self, other: &LocalWaker) -> bool { + self.waker == other.waker + } + + /// Wake up the task associated with this `LocalWaker` without consuming the `LocalWaker`. + /// + /// This is similar to [`wake()`](Self::wake), but may be slightly less efficient in + /// the case where an owned `Waker` is available. This method should be preferred to + /// calling `waker.clone().wake()`. + #[inline] + #[stable(feature = "futures_api", since = "1.36.0")] + pub fn wake_by_ref(&self) { + // The actual wakeup call is delegated through a virtual function call + // to the implementation which is defined by the executor. + + // SAFETY: see `wake` + unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) } + } +} +#[unstable(feature = "local_waker", issue = "none")] +impl Clone for LocalWaker { + #[inline] + fn clone(&self) -> Self { + LocalWaker { + // SAFETY: This is safe because `Waker::from_raw` is the only way + // to initialize `clone` and `data` requiring the user to acknowledge + // that the contract of [`RawWaker`] is upheld. + waker: unsafe { (self.waker.vtable.clone)(self.waker.data) }, + } + } + + #[inline] + fn clone_from(&mut self, source: &Self) { + if !self.will_wake(source) { + *self = source.clone(); + } + } +} + +#[stable(feature = "futures_api", since = "1.36.0")] +impl fmt::Debug for LocalWaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let vtable_ptr = self.waker.vtable as *const RawWakerVTable; + f.debug_struct("LocalWaker") + .field("data", &self.waker.data) + .field("vtable", &vtable_ptr) + .finish() + } +} |
