From 949bddc1ecc239e20871ca995f6235b538037421 Mon Sep 17 00:00:00 2001 From: SilverMira <66930495+SilverMira@users.noreply.github.com> Date: Fri, 20 Jun 2025 23:39:53 +0800 Subject: [PATCH] Allow futures independent of wasi pollables to progress This commit moves away from using noop waker for the main task. Instead, it keeps track if the main task is ready to progress again immediately after `Future::poll()`. If it is, the runtime will skip calling `wasi::io::poll::poll` if there are no wasi pollables scheduled, or otherwise call in a non-blocking manner to quickly get an update for all wasi pollables before progressing the main task again. --- src/runtime/block_on.rs | 39 ++++++++++++++----- src/runtime/reactor.rs | 83 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 9a96d98..d0bbc5c 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -4,6 +4,7 @@ use core::future::Future; use core::pin::pin; use core::task::Waker; use core::task::{Context, Poll}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::Wake; @@ -24,7 +25,8 @@ where let mut fut = pin!(fut); // Create a new context to be passed to the future. - let waker = noop_waker(); + let waker_impl = Arc::new(ReactorWaker::new()); + let waker = Waker::from(Arc::clone(&waker_impl)); let mut cx = Context::from_waker(&waker); // Either the future completes and we return, or some IO is happening @@ -32,7 +34,10 @@ where let res = loop { match fut.as_mut().poll(&mut cx) { Poll::Ready(res) => break res, - Poll::Pending => reactor.block_until(), + Poll::Pending => { + reactor.block_until(waker_impl.awake()); + waker_impl.set_awake(false); + } } }; // Clear the singleton @@ -40,14 +45,30 @@ where res } -/// Construct a new no-op waker -// NOTE: we can remove this once lands -fn noop_waker() -> Waker { - struct NoopWaker; +struct ReactorWaker { + awake: AtomicBool, +} + +impl ReactorWaker { + fn new() -> Self { + Self { + awake: AtomicBool::new(false), + } + } - impl Wake for NoopWaker { - fn wake(self: Arc) {} + #[inline] + fn set_awake(&self, awake: bool) { + self.awake.store(awake, Ordering::Relaxed); } - Waker::from(Arc::new(NoopWaker)) + #[inline] + fn awake(&self) -> bool { + self.awake.load(Ordering::Relaxed) + } +} + +impl Wake for ReactorWaker { + fn wake(self: Arc) { + self.set_awake(true); + } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index afe3deb..9222861 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -5,8 +5,11 @@ use core::future; use core::pin::Pin; use core::task::{Context, Poll, Waker}; use slab::Slab; +use std::cell::LazyCell; use std::collections::HashMap; use std::rc::Rc; +use std::sync::Arc; +use std::task::Wake; use wasi::io::poll::Pollable; /// A key for a `Pollable`, which is an index into the `Slab` in `Reactor`. @@ -100,6 +103,7 @@ pub struct Reactor { struct InnerReactor { pollables: Slab, wakers: HashMap, + immediate: LazyCell<(Pollable, Waker)>, } impl Reactor { @@ -122,6 +126,12 @@ impl Reactor { inner: Rc::new(RefCell::new(InnerReactor { pollables: Slab::new(), wakers: HashMap::new(), + immediate: LazyCell::new(|| { + ( + wasi::clocks::monotonic_clock::subscribe_duration(0), + noop_waker(), + ) + }), })), } } @@ -131,30 +141,49 @@ impl Reactor { /// # On Wakers and single-threaded runtimes /// /// At first glance it might seem silly that this goes through the motions - /// of calling the wakers. The main waker we create here is a `noop` waker: - /// it does nothing. However, it is common and encouraged to use wakers to + /// of calling the wakers. However, it is common and encouraged to use wakers to /// distinguish between events. Concurrency primitives may construct their /// own wakers to keep track of identity and wake more precisely. We do not /// control the wakers construted by other libraries, and it is for this /// reason that we have to call all the wakers - even if by default they /// will do nothing. - pub(crate) fn block_until(&self) { + /// + /// The [awake] argument indicates whether the main task is ready to be re-polled + /// to make progress. If it is, we will just poll all pollables without blocking + /// by including an always ready pollable, or choose to skip calling poll at all + /// if no pollables are registered. + pub(crate) fn block_until(&self, awake: bool) { let reactor = self.inner.borrow(); + // If no tasks are interested in any pollables currently, and the main task + // is already awake, run the next poll loop instead + if reactor.wakers.is_empty() && awake { + return; + } + // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so // we need to be able to associate the index with the right waker. // We start by iterating over the pollables, and keeping note of which // pollable belongs to which waker - let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len()); - let mut targets = Vec::with_capacity(reactor.wakers.len()); + let capacity = reactor.wakers.len() + 1; + let mut indexed_wakers = Vec::with_capacity(capacity); + let mut targets = Vec::with_capacity(capacity); for (waitee, waker) in reactor.wakers.iter() { let pollable_index = waitee.pollable.0.key; indexed_wakers.push(waker); targets.push(&reactor.pollables[pollable_index.0]); } + // If the task is already awake, don't blockingly wait for the pollables, + // instead ask the host to give us an update immediately + if awake { + let (immediate, waker) = &*reactor.immediate; + indexed_wakers.push(waker); + targets.push(immediate); + } + debug_assert_ne!( targets.len(), 0, @@ -209,6 +238,18 @@ impl Reactor { } } +/// Construct a new no-op waker +// NOTE: we can remove this once lands +fn noop_waker() -> Waker { + struct NoopWaker; + + impl Wake for NoopWaker { + fn wake(self: Arc) {} + } + + Waker::from(Arc::new(NoopWaker)) +} + #[cfg(test)] mod test { use super::*; @@ -281,4 +322,36 @@ mod test { .await; }) } + + #[test] + fn progresses_wasi_independent_futures() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let later = wasi::clocks::monotonic_clock::subscribe_duration(1_000_000_000); + let later = reactor.schedule(later); + let mut polled_before = false; + let wasi_independent_future = futures_lite::future::poll_fn(|cx| { + if polled_before { + std::task::Poll::Ready(true) + } else { + polled_before = true; + cx.waker().wake_by_ref(); + std::task::Poll::Pending + } + }); + let later = async { + later.wait_for().await; + false + }; + let wasi_independent_future_won = + futures_lite::future::race(wasi_independent_future, later).await; + assert!( + wasi_independent_future_won, + "wasi_independent_future should win the race" + ); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let soon = reactor.schedule(soon); + soon.wait_for().await; + }) + } }