From 5b7c7064582203bb5158e8d0592da632c2839e9e Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 6 Dec 2024 12:50:16 -0800 Subject: [PATCH 01/12] wip: idk why the sleep future is dropping in this new reactor --- src/runtime/reactor.rs | 135 +++++++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 31 deletions(-) diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 5525c8b..cae11f6 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -5,12 +5,72 @@ use super::{ use core::cell::RefCell; use core::future; -use core::task::Poll; -use core::task::Waker; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use std::collections::HashMap; use std::rc::Rc; use wasi::io::poll::Pollable; +#[derive(Debug)] +struct Registration { + key: EventKey, +} + +impl Drop for Registration { + fn drop(&mut self) { + Reactor::current().deregister_event(self.key) + } +} + +#[derive(Debug, Clone)] +pub struct AsyncPollable(Rc); + +impl AsyncPollable { + pub fn wait_for(&self) -> WaitFor { + use std::sync::atomic::{AtomicUsize, Ordering}; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + let unique = COUNTER.fetch_add(1, Ordering::Relaxed); + let key = self.0.key; + WaitFor { + waitee: Waitee { key, unique }, + needs_deregistration: false, + } + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +struct Waitee { + key: EventKey, + unique: usize, +} + +#[must_use = "futures do nothing unless polled or .awaited"] +#[derive(Debug)] +pub struct WaitFor { + waitee: Waitee, + needs_deregistration: bool, +} +impl future::Future for WaitFor { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let reactor = Reactor::current(); + if reactor.ready(&self.as_ref().waitee, cx.waker()) { + Poll::Ready(()) + } else { + self.as_mut().needs_deregistration = true; + Poll::Pending + } + } +} +impl Drop for WaitFor { + fn drop(&mut self) { + println!("dropping {:?}", self); + if self.needs_deregistration { + Reactor::current().deregister_waitee(&self.waitee) + } + } +} + /// Manage async system resources for WASI 0.2 #[derive(Debug, Clone)] pub struct Reactor { @@ -22,7 +82,7 @@ pub struct Reactor { #[derive(Debug)] struct InnerReactor { poller: Poller, - wakers: HashMap, + wakers: HashMap, } impl Reactor { @@ -64,39 +124,52 @@ impl Reactor { pub(crate) fn block_until(&self) { let mut reactor = self.inner.borrow_mut(); for key in reactor.poller.block_until() { - match reactor.wakers.get(&key) { - Some(waker) => waker.wake_by_ref(), - None => panic!("tried to wake the waker for non-existent `{:?}`", key), + for (waitee, waker) in reactor.wakers.iter() { + if waitee.key == key { + waker.wake_by_ref() + } } } } + /// Turn a wasi [`Pollable`] into an [`AsyncPollable`] + pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { + let mut reactor = self.inner.borrow_mut(); + let key = reactor.poller.insert(pollable); + println!("schedule pollable as {key:?}"); + AsyncPollable(Rc::new(Registration { key })) + } + + fn deregister_event(&self, key: EventKey) { + let mut reactor = self.inner.borrow_mut(); + println!("deregister {key:?}",); + reactor.poller.remove(key); + } + + fn deregister_waitee(&self, waitee: &Waitee) { + let mut reactor = self.inner.borrow_mut(); + println!("deregister waker for {waitee:?}",); + reactor.wakers.remove(waitee); + } + + fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool { + let mut reactor = self.inner.borrow_mut(); + let ready = reactor + .poller + .get(&waitee.key) + .expect("only live EventKey can be checked for readiness") + .ready(); + if !ready { + println!("register waker for {waitee:?}"); + reactor.wakers.insert(waitee.clone(), waker.clone()); + } + println!("ready {ready} {waitee:?}"); + ready + } + /// Wait for the pollable to resolve. pub async fn wait_for(&self, pollable: Pollable) { - let mut pollable = Some(pollable); - let mut key = None; - // This function is the core loop of our function; it will be called - // multiple times as the future is resolving. - future::poll_fn(|cx| { - // Start by taking a lock on the reactor. This is single-threaded - // and short-lived, so it will never be contended. - let mut reactor = self.inner.borrow_mut(); - - // Schedule interest in the `pollable` on the first iteration. On - // every iteration, register the waker with the reactor. - let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap())); - reactor.wakers.insert(*key, cx.waker().clone()); - - // Check whether we're ready or need to keep waiting. If we're - // ready, we clean up after ourselves. - if reactor.poller.get(key).unwrap().ready() { - reactor.poller.remove(*key); - reactor.wakers.remove(key); - Poll::Ready(()) - } else { - Poll::Pending - } - }) - .await + let p = self.schedule(pollable); + p.wait_for().await } } From b44be3f4fe57adcb21db6bdcff97c7cc537b60d4 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 6 Dec 2024 12:58:53 -0800 Subject: [PATCH 02/12] use of AsyncPollable in WaitFor future should increment reference count of Registration --- src/runtime/reactor.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index cae11f6..c89ce6b 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::rc::Rc; use wasi::io::poll::Pollable; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash)] struct Registration { key: EventKey, } @@ -22,7 +22,7 @@ impl Drop for Registration { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AsyncPollable(Rc); impl AsyncPollable { @@ -30,9 +30,11 @@ impl AsyncPollable { use std::sync::atomic::{AtomicUsize, Ordering}; static COUNTER: AtomicUsize = AtomicUsize::new(0); let unique = COUNTER.fetch_add(1, Ordering::Relaxed); - let key = self.0.key; WaitFor { - waitee: Waitee { key, unique }, + waitee: Waitee { + pollable: self.clone(), + unique, + }, needs_deregistration: false, } } @@ -40,7 +42,7 @@ impl AsyncPollable { #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct Waitee { - key: EventKey, + pollable: AsyncPollable, unique: usize, } @@ -125,7 +127,7 @@ impl Reactor { let mut reactor = self.inner.borrow_mut(); for key in reactor.poller.block_until() { for (waitee, waker) in reactor.wakers.iter() { - if waitee.key == key { + if waitee.pollable.0.key == key { waker.wake_by_ref() } } @@ -156,7 +158,7 @@ impl Reactor { let mut reactor = self.inner.borrow_mut(); let ready = reactor .poller - .get(&waitee.key) + .get(&waitee.pollable.0.key) .expect("only live EventKey can be checked for readiness") .ready(); if !ready { From a39473ac5d848dfe1a04d7184b305cb3e69e6f16 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 10:38:17 -0800 Subject: [PATCH 03/12] reactor AsyncPollable works with subscribe duration directly... --- src/runtime/polling.rs | 4 ++-- src/runtime/reactor.rs | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/runtime/polling.rs b/src/runtime/polling.rs index 8ddec0a..c2de263 100644 --- a/src/runtime/polling.rs +++ b/src/runtime/polling.rs @@ -75,10 +75,10 @@ impl Poller { // to convert it back to the right keys for the wakers. Earlier we // established a positional index -> waker key relationship, so we can // go right ahead and perform a lookup there. - ready_indexes + dbg!(ready_indexes .into_iter() .map(|index| EventKey(indexes[index as usize] as u32)) - .collect() + .collect()) } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index c89ce6b..3732d21 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -128,6 +128,7 @@ impl Reactor { for key in reactor.poller.block_until() { for (waitee, waker) in reactor.wakers.iter() { if waitee.pollable.0.key == key { + println!("waking {key:?}"); waker.wake_by_ref() } } @@ -175,3 +176,17 @@ impl Reactor { p.wait_for().await } } + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn reactor_subscribe_duration() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let pollable = wasi::clocks::monotonic_clock::subscribe_duration(1000); + let sched = reactor.schedule(pollable); + sched.wait_for().await; + }) + } +} From 6582494fd6e906f86ab51abe5655d0bea66b7c3a Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 15:08:21 -0800 Subject: [PATCH 04/12] inline poller into reactor, only poll on pollables that have waiters, add more unit tests --- src/runtime/mod.rs | 1 - src/runtime/polling.rs | 88 ----------------------------- src/runtime/reactor.rs | 125 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 111 insertions(+), 103 deletions(-) delete mode 100644 src/runtime/polling.rs diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cd01535..13fa657 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -11,7 +11,6 @@ #![warn(missing_docs, unreachable_pub)] mod block_on; -mod polling; mod reactor; pub use block_on::block_on; diff --git a/src/runtime/polling.rs b/src/runtime/polling.rs deleted file mode 100644 index c2de263..0000000 --- a/src/runtime/polling.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! This module handles the conversion from `Pollable` -> `Future`. We do this -//! by creating an equivalent implementation to the `polling` crate. Once -//! has been resolved, this module -//! will likely no longer be needed. - -use slab::Slab; -use std::vec::Vec; -use wasi::io::poll::{poll, Pollable}; - -/// Waits for I/O events. -#[derive(Debug)] -pub(crate) struct Poller { - pub(crate) targets: Slab, -} - -impl Poller { - /// Create a new instance of `Poller` - pub(crate) fn new() -> Self { - Self::with_capacity(0) - } - - /// Create a new instance of `Poller` with a given capacity - pub(crate) fn with_capacity(capacity: usize) -> Self { - Self { - targets: Slab::with_capacity(capacity), - } - } - - /// Insert a new `Pollable` target into `Poller` - pub(crate) fn insert(&mut self, target: Pollable) -> EventKey { - let key = self.targets.insert(target); - EventKey(key as u32) - } - - /// Get a `Pollable` if it exists. - pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> { - self.targets.get(key.0 as usize) - } - - /// Remove an instance of `Pollable` from `Poller`. - /// - /// Returns `None` if no entry was found for `key`. - pub(crate) fn remove(&mut self, key: EventKey) -> Option { - self.targets.try_remove(key.0 as usize) - } - - /// Block the current thread until a new event has triggered. - /// - /// This will clear the value of `ready_list`. - pub(crate) fn block_until(&mut self) -> Vec { - // We're about to wait for a number of pollables. When they wake we get - // the *indexes* back for the pollables whose events were available - so - // we need to be able to associate the index with the right waker. - - // We start by iterating over the pollables, and keeping note of which - // pollable belongs to which waker index - let mut indexes = Vec::with_capacity(self.targets.len()); - let mut targets = Vec::with_capacity(self.targets.len()); - for (index, target) in self.targets.iter() { - indexes.push(index); - targets.push(target); - } - - debug_assert_ne!( - targets.len(), - 0, - "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and the program may spin indefinitely" - ); - - // Now that we have that association, we're ready to poll our targets. - // This will block until an event has completed. - let ready_indexes = poll(&targets); - - // Once we have the indexes for which pollables are available, we need - // to convert it back to the right keys for the wakers. Earlier we - // established a positional index -> waker key relationship, so we can - // go right ahead and perform a lookup there. - dbg!(ready_indexes - .into_iter() - .map(|index| EventKey(indexes[index as usize] as u32)) - .collect()) - } -} - -/// A key representing an entry into the poller -#[repr(transparent)] -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub(crate) struct EventKey(pub(crate) u32); diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 3732d21..2f69dd9 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -1,16 +1,19 @@ -use super::{ - polling::{EventKey, Poller}, - REACTOR, -}; +use super::REACTOR; use core::cell::RefCell; use core::future; use core::pin::Pin; use core::task::{Context, Poll, Waker}; +use slab::Slab; use std::collections::HashMap; use std::rc::Rc; use wasi::io::poll::Pollable; +/// A key representing an entry into the poller +#[repr(transparent)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub(crate) struct EventKey(pub(crate) usize); + #[derive(Debug, PartialEq, Eq, Hash)] struct Registration { key: EventKey, @@ -83,7 +86,7 @@ pub struct Reactor { /// a lock of the whole. #[derive(Debug)] struct InnerReactor { - poller: Poller, + pollables: Slab, wakers: HashMap, } @@ -105,7 +108,7 @@ impl Reactor { pub(crate) fn new() -> Self { Self { inner: Rc::new(RefCell::new(InnerReactor { - poller: Poller::new(), + pollables: Slab::new(), wakers: HashMap::new(), })), } @@ -124,8 +127,42 @@ impl Reactor { /// reason that we have to call all the wakers - even if by default they /// will do nothing. pub(crate) fn block_until(&self) { - let mut reactor = self.inner.borrow_mut(); - for key in reactor.poller.block_until() { + let reactor = self.inner.borrow(); + + // We're about to wait for a number of pollables. When they wake we get + // the *indexes* back for the pollables whose events were available - so + // we need to be able to associate the index with the right waker. + + // We start by iterating over the pollables, and keeping note of which + // pollable belongs to which waker index + let mut indexes = Vec::with_capacity(reactor.wakers.len()); + let mut targets = Vec::with_capacity(reactor.wakers.len()); + for waitee in reactor.wakers.keys() { + let pollable_index = waitee.pollable.0.key; + indexes.push(pollable_index); + targets.push(&reactor.pollables[pollable_index.0]); + } + + debug_assert_ne!( + targets.len(), + 0, + "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" + ); + + println!("polling for {indexes:?}"); + // Now that we have that association, we're ready to poll our targets. + // This will block until an event has completed. + let ready_indexes = wasi::io::poll::poll(&targets); + + // Once we have the indexes for which pollables are available, we need + // to convert it back to the right keys for the wakers. Earlier we + // established a positional index -> waker key relationship, so we can + // go right ahead and perform a lookup there. + let ready_keys = ready_indexes + .into_iter() + .map(|index| indexes[index as usize]); + + for key in ready_keys { for (waitee, waker) in reactor.wakers.iter() { if waitee.pollable.0.key == key { println!("waking {key:?}"); @@ -138,7 +175,7 @@ impl Reactor { /// Turn a wasi [`Pollable`] into an [`AsyncPollable`] pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { let mut reactor = self.inner.borrow_mut(); - let key = reactor.poller.insert(pollable); + let key = EventKey(reactor.pollables.insert(pollable)); println!("schedule pollable as {key:?}"); AsyncPollable(Rc::new(Registration { key })) } @@ -146,7 +183,7 @@ impl Reactor { fn deregister_event(&self, key: EventKey) { let mut reactor = self.inner.borrow_mut(); println!("deregister {key:?}",); - reactor.poller.remove(key); + reactor.pollables.remove(key.0); } fn deregister_waitee(&self, waitee: &Waitee) { @@ -158,8 +195,8 @@ impl Reactor { fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool { let mut reactor = self.inner.borrow_mut(); let ready = reactor - .poller - .get(&waitee.pollable.0.key) + .pollables + .get(waitee.pollable.0.key.0) .expect("only live EventKey can be checked for readiness") .ready(); if !ready { @@ -180,13 +217,73 @@ impl Reactor { #[cfg(test)] mod test { use super::*; + // Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready + // immediately. #[test] - fn reactor_subscribe_duration() { + fn subscribe_no_duration() { crate::runtime::block_on(async { let reactor = Reactor::current(); - let pollable = wasi::clocks::monotonic_clock::subscribe_duration(1000); + let pollable = wasi::clocks::monotonic_clock::subscribe_duration(0); let sched = reactor.schedule(pollable); sched.wait_for().await; }) } + // Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready. + #[test] + fn subscribe_some_duration() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let pollable = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let sched = reactor.schedule(pollable); + sched.wait_for().await; + }) + } + + // Using WASMTIME_LOG, observe that this test results in a single poll() on the second + // subscription, rather than spinning in poll() with first subscription, which is instantly + // ready, but not what the waker requests. + #[test] + fn subscribe_multiple_durations() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let now = wasi::clocks::monotonic_clock::subscribe_duration(0); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let now = reactor.schedule(now); + let soon = reactor.schedule(soon); + soon.wait_for().await; + drop(now) + }) + } + + // Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both + // pollables because both are awaiting, and one with just the later pollable. + #[test] + fn subscribe_multiple_durations_zipped() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let start = wasi::clocks::monotonic_clock::now(); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let later = wasi::clocks::monotonic_clock::subscribe_duration(40_000_000); + let soon = reactor.schedule(soon); + let later = reactor.schedule(later); + + futures_lite::future::zip( + async move { + soon.wait_for().await; + println!( + "*** subscribe_duration(soon) ready ({})", + wasi::clocks::monotonic_clock::now() - start + ); + }, + async move { + later.wait_for().await; + println!( + "*** subscribe_duration(later) ready ({})", + wasi::clocks::monotonic_clock::now() - start + ); + }, + ) + .await; + }) + } } From c09276377ee9557dadc5e8c3b9be563148c9f3cd Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 15:15:03 -0800 Subject: [PATCH 05/12] get rid of debug printlns --- src/runtime/mod.rs | 2 +- src/runtime/reactor.rs | 10 +--------- src/time/mod.rs | 8 ++++++-- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 13fa657..31f6fb8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -14,7 +14,7 @@ mod block_on; mod reactor; pub use block_on::block_on; -pub use reactor::Reactor; +pub use reactor::{AsyncPollable, Reactor}; use std::cell::RefCell; // There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 2f69dd9..be833a7 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -69,7 +69,6 @@ impl future::Future for WaitFor { } impl Drop for WaitFor { fn drop(&mut self) { - println!("dropping {:?}", self); if self.needs_deregistration { Reactor::current().deregister_waitee(&self.waitee) } @@ -149,7 +148,6 @@ impl Reactor { "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" ); - println!("polling for {indexes:?}"); // Now that we have that association, we're ready to poll our targets. // This will block until an event has completed. let ready_indexes = wasi::io::poll::poll(&targets); @@ -165,30 +163,26 @@ impl Reactor { for key in ready_keys { for (waitee, waker) in reactor.wakers.iter() { if waitee.pollable.0.key == key { - println!("waking {key:?}"); waker.wake_by_ref() } } } } - /// Turn a wasi [`Pollable`] into an [`AsyncPollable`] + /// Turn a Wasi [`Pollable`] into an [`AsyncPollable`] pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { let mut reactor = self.inner.borrow_mut(); let key = EventKey(reactor.pollables.insert(pollable)); - println!("schedule pollable as {key:?}"); AsyncPollable(Rc::new(Registration { key })) } fn deregister_event(&self, key: EventKey) { let mut reactor = self.inner.borrow_mut(); - println!("deregister {key:?}",); reactor.pollables.remove(key.0); } fn deregister_waitee(&self, waitee: &Waitee) { let mut reactor = self.inner.borrow_mut(); - println!("deregister waker for {waitee:?}",); reactor.wakers.remove(waitee); } @@ -200,10 +194,8 @@ impl Reactor { .expect("only live EventKey can be checked for readiness") .ready(); if !ready { - println!("register waker for {waitee:?}"); reactor.wakers.insert(waitee.clone(), waker.clone()); } - println!("ready {ready} {waitee:?}"); ready } diff --git a/src/time/mod.rs b/src/time/mod.rs index 419b35e..0ddf14f 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -42,8 +42,7 @@ impl AsyncIterator for Interval { type Item = Instant; async fn next(&mut self) -> Option { - Timer::after(self.duration).await; - Some(Instant::now()) + Some(Timer::after(self.duration).await) } } @@ -86,3 +85,8 @@ impl Future for Timer { } } } + +#[cfg(test)] +mod test { + use super::*; +} From 87919ec8ddd9fd9d4398a6cdcb2050ba8d53bdca Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 15:23:07 -0800 Subject: [PATCH 06/12] rewrite Timer in terms of AsyncPollable --- src/time/mod.rs | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/time/mod.rs b/src/time/mod.rs index 0ddf14f..e046850 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -10,9 +10,15 @@ pub use instant::Instant; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use wasi::clocks::{monotonic_clock::subscribe_instant, wall_clock}; +use wasi::clocks::{ + monotonic_clock::{subscribe_duration, subscribe_instant}, + wall_clock, +}; -use crate::{iter::AsyncIterator, runtime::Reactor}; +use crate::{ + iter::AsyncIterator, + runtime::{AsyncPollable, Reactor}, +}; /// A measurement of the system clock, useful for talking to external entities /// like the file system or other processes. @@ -47,28 +53,26 @@ impl AsyncIterator for Interval { } #[derive(Debug)] -pub struct Timer(Option); +pub struct Timer(Option); impl Timer { pub fn never() -> Timer { Timer(None) } pub fn at(deadline: Instant) -> Timer { - Timer(Some(deadline)) + let pollable = Reactor::current().schedule(subscribe_instant(*deadline)); + Timer(Some(pollable)) } pub fn after(duration: Duration) -> Timer { - Timer(Some(Instant::now() + duration)) + let pollable = Reactor::current().schedule(subscribe_duration(*duration)); + Timer(Some(pollable)) } pub fn set_after(&mut self, duration: Duration) { *self = Self::after(duration); } pub async fn wait(&self) { - match self.0 { - Some(deadline) => { - Reactor::current() - .wait_for(subscribe_instant(*deadline)) - .await - } + match &self.0 { + Some(pollable) => pollable.wait_for().await, None => std::future::pending().await, } } @@ -89,4 +93,16 @@ impl Future for Timer { #[cfg(test)] mod test { use super::*; + + #[test] + fn timer_now() { + crate::runtime::block_on(async { + let start = Instant::now(); + let timer = Timer::at(start); + let now = timer.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("timer_now awaited for {} s", d.as_secs_f32()); + }); + } } From 419466c53fb78676c7627e02c7748ea6feee1c58 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 15:37:48 -0800 Subject: [PATCH 07/12] this timer test crashes --- src/time/mod.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/time/mod.rs b/src/time/mod.rs index e046850..01bd757 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -70,11 +70,12 @@ impl Timer { pub fn set_after(&mut self, duration: Duration) { *self = Self::after(duration); } - pub async fn wait(&self) { + pub async fn wait(&self) -> Instant { match &self.0 { Some(pollable) => pollable.wait_for().await, None => std::future::pending().await, } + Instant::now() } } @@ -83,10 +84,7 @@ impl Future for Timer { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_ref(); let pinned = std::pin::pin!(this.wait()); - match pinned.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Instant::now()), - } + pinned.poll(cx) } } @@ -94,15 +92,25 @@ impl Future for Timer { mod test { use super::*; + async fn debug_duration(what: &str, f: impl Future) { + let start = Instant::now(); + let now = f.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("{what} awaited for {} s", d.as_secs_f32()); + } + #[test] fn timer_now() { - crate::runtime::block_on(async { - let start = Instant::now(); - let timer = Timer::at(start); - let now = timer.await; - let d = now.duration_since(start); - let d: std::time::Duration = d.into(); - println!("timer_now awaited for {} s", d.as_secs_f32()); - }); + crate::runtime::block_on(debug_duration("timer_now", async { + Timer::at(Instant::now()).await + })); + } + + #[test] + fn timer_after_100_milliseconds() { + crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { + Timer::after(Duration::from_millis(100)).await + })); } } From 1276ee308a9acd6b12b89304d1770e37333c5230 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 15:59:48 -0800 Subject: [PATCH 08/12] name the timer wait() future explicitly, now thats how task::sleep, sleep_until work --- src/runtime/mod.rs | 2 +- src/task/mod.rs | 4 ++-- src/task/sleep.rs | 44 +++-------------------------------------- src/task/sleep_until.rs | 40 +++---------------------------------- src/time/duration.rs | 5 ++--- src/time/instant.rs | 5 ++--- src/time/mod.rs | 39 ++++++++++++++++++++++++------------ 7 files changed, 39 insertions(+), 100 deletions(-) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 31f6fb8..0e01554 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -14,7 +14,7 @@ mod block_on; mod reactor; pub use block_on::block_on; -pub use reactor::{AsyncPollable, Reactor}; +pub use reactor::{AsyncPollable, Reactor, WaitFor}; use std::cell::RefCell; // There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all diff --git a/src/task/mod.rs b/src/task/mod.rs index fa9f4b9..b93f9b8 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -3,5 +3,5 @@ mod sleep; mod sleep_until; -pub use sleep::{sleep, Sleep}; -pub use sleep_until::{sleep_until, SleepUntil}; +pub use sleep::sleep; +pub use sleep_until::sleep_until; diff --git a/src/task/sleep.rs b/src/task/sleep.rs index 84ad95e..5aebb5d 100644 --- a/src/task/sleep.rs +++ b/src/task/sleep.rs @@ -1,46 +1,8 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::time::Timer as AsyncTimer; -use pin_project_lite::pin_project; - -use crate::time::{Duration, Instant}; +use crate::time::{Duration, Timer, Wait}; /// Sleeps for the specified amount of time. /// /// This future can be `push_deadline` to be moved -pub fn sleep(dur: Duration) -> Sleep { - Sleep { - dur, - timer: AsyncTimer::after(dur.into()), - completed: false, - } -} - -pin_project! { - /// Sleeps for the specified amount of time. - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Sleep { - #[pin] - timer: AsyncTimer, - completed: bool, - dur: Duration, - } -} - -impl Future for Sleep { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.completed, "future polled after completing"); - let this = self.project(); - match this.timer.poll(cx) { - Poll::Ready(instant) => { - *this.completed = true; - Poll::Ready(instant.into()) - } - Poll::Pending => Poll::Pending, - } - } +pub fn sleep(dur: Duration) -> Wait { + Timer::after(dur).wait() } diff --git a/src/task/sleep_until.rs b/src/task/sleep_until.rs index bcadf41..03e6d3a 100644 --- a/src/task/sleep_until.rs +++ b/src/task/sleep_until.rs @@ -1,40 +1,6 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::time::{Instant, Timer}; -use pin_project_lite::pin_project; +use crate::time::{Instant, Timer, Wait}; /// Sleeps until the specified instant. -pub fn sleep_until(deadline: Instant) -> SleepUntil { - SleepUntil { - timer: Timer::at(deadline.into()), - completed: false, - } -} - -pin_project! { - /// Sleeps until the specified instant. - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct SleepUntil { - #[pin] - timer: Timer, - completed: bool, - } -} - -impl Future for SleepUntil { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.completed, "future polled after completing"); - let this = self.project(); - match this.timer.poll(cx) { - Poll::Ready(instant) => { - *this.completed = true; - Poll::Ready(instant.into()) - } - Poll::Pending => Poll::Pending, - } - } +pub fn sleep_until(deadline: Instant) -> Wait { + Timer::at(deadline).wait() } diff --git a/src/time/duration.rs b/src/time/duration.rs index 6386154..10d8103 100644 --- a/src/time/duration.rs +++ b/src/time/duration.rs @@ -1,5 +1,4 @@ -use super::Instant; -use crate::task::Sleep; +use super::{Instant, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; use wasi::clocks::monotonic_clock; @@ -135,7 +134,7 @@ impl SubAssign for Duration { impl IntoFuture for Duration { type Output = Instant; - type IntoFuture = Sleep; + type IntoFuture = Wait; fn into_future(self) -> Self::IntoFuture { crate::task::sleep(self) diff --git a/src/time/instant.rs b/src/time/instant.rs index bd99f43..b9db4b9 100644 --- a/src/time/instant.rs +++ b/src/time/instant.rs @@ -1,5 +1,4 @@ -use super::Duration; -use crate::task::SleepUntil; +use super::{Duration, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; use wasi::clocks::monotonic_clock; @@ -85,7 +84,7 @@ impl std::ops::DerefMut for Instant { impl IntoFuture for Instant { type Output = Instant; - type IntoFuture = SleepUntil; + type IntoFuture = Wait; fn into_future(self) -> Self::IntoFuture { crate::task::sleep_until(self) diff --git a/src/time/mod.rs b/src/time/mod.rs index 01bd757..2132329 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -7,6 +7,7 @@ mod instant; pub use duration::Duration; pub use instant::Instant; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -48,7 +49,7 @@ impl AsyncIterator for Interval { type Item = Instant; async fn next(&mut self) -> Option { - Some(Timer::after(self.duration).await) + Some(Timer::after(self.duration).wait().await) } } @@ -70,21 +71,33 @@ impl Timer { pub fn set_after(&mut self, duration: Duration) { *self = Self::after(duration); } - pub async fn wait(&self) -> Instant { - match &self.0 { - Some(pollable) => pollable.wait_for().await, - None => std::future::pending().await, - } - Instant::now() + pub fn wait(&self) -> Wait { + let wait_for = self.0.as_ref().map(|pollable| pollable.wait_for()); + Wait { wait_for } + } +} + +pin_project! { + /// Future created by [`Timer::wait`] + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Wait { + #[pin] + wait_for: Option } } -impl Future for Timer { +impl Future for Wait { type Output = Instant; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_ref(); - let pinned = std::pin::pin!(this.wait()); - pinned.poll(cx) + let this = self.project(); + match this.wait_for.as_pin_mut() { + None => Poll::Pending, + Some(f) => match f.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + }, + } } } @@ -103,14 +116,14 @@ mod test { #[test] fn timer_now() { crate::runtime::block_on(debug_duration("timer_now", async { - Timer::at(Instant::now()).await + Timer::at(Instant::now()).wait().await })); } #[test] fn timer_after_100_milliseconds() { crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { - Timer::after(Duration::from_millis(100)).await + Timer::after(Duration::from_millis(100)).wait().await })); } } From 6cf89c898ed1ae2d5d37e92ff7baeec59be3ad3d Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 9 Dec 2024 16:01:36 -0800 Subject: [PATCH 09/12] inline trivial sleep and sleep_until into task.rs --- src/task.rs | 13 +++++++++++++ src/task/mod.rs | 7 ------- src/task/sleep.rs | 8 -------- src/task/sleep_until.rs | 6 ------ 4 files changed, 13 insertions(+), 21 deletions(-) create mode 100644 src/task.rs delete mode 100644 src/task/mod.rs delete mode 100644 src/task/sleep.rs delete mode 100644 src/task/sleep_until.rs diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..3e06fda --- /dev/null +++ b/src/task.rs @@ -0,0 +1,13 @@ +//! Types and Traits for working with asynchronous tasks. + +use crate::time::{Duration, Instant, Timer, Wait}; + +/// Sleeps for the specified amount of time. +pub fn sleep(dur: Duration) -> Wait { + Timer::after(dur).wait() +} + +/// Sleeps until the specified instant. +pub fn sleep_until(deadline: Instant) -> Wait { + Timer::at(deadline).wait() +} diff --git a/src/task/mod.rs b/src/task/mod.rs deleted file mode 100644 index b93f9b8..0000000 --- a/src/task/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Types and Traits for working with asynchronous tasks. - -mod sleep; -mod sleep_until; - -pub use sleep::sleep; -pub use sleep_until::sleep_until; diff --git a/src/task/sleep.rs b/src/task/sleep.rs deleted file mode 100644 index 5aebb5d..0000000 --- a/src/task/sleep.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::time::{Duration, Timer, Wait}; - -/// Sleeps for the specified amount of time. -/// -/// This future can be `push_deadline` to be moved -pub fn sleep(dur: Duration) -> Wait { - Timer::after(dur).wait() -} diff --git a/src/task/sleep_until.rs b/src/task/sleep_until.rs deleted file mode 100644 index 03e6d3a..0000000 --- a/src/task/sleep_until.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::time::{Instant, Timer, Wait}; - -/// Sleeps until the specified instant. -pub fn sleep_until(deadline: Instant) -> Wait { - Timer::at(deadline).wait() -} From d359e143de565975ca0ae0315423cfc93105de76 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 25 Nov 2024 18:23:15 -0800 Subject: [PATCH 10/12] add test showing that timeout on http::Client::send panics --- tests/http_timeout.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tests/http_timeout.rs diff --git a/tests/http_timeout.rs b/tests/http_timeout.rs new file mode 100644 index 0000000..de67f1b --- /dev/null +++ b/tests/http_timeout.rs @@ -0,0 +1,23 @@ +use wstd::future::FutureExt; +use wstd::http::{Client, Method, Request}; +use wstd::time::Duration; + +#[wstd::test] +async fn http_timeout() -> Result<(), Box> { + // This get request will connect to the server, which will then wait 1 second before + // returning a response. + let request = Request::new(Method::GET, "https://postman-echo.com/delay/1".parse()?); + let result = Client::new() + .send(request) + .timeout(Duration::from_millis(500)) + .await; + + assert!(result.is_err(), "response should be an error"); + let error = result.unwrap_err(); + assert!( + matches!(error.kind(), std::io::ErrorKind::TimedOut), + "expected TimedOut error, got: {error:?>}" + ); + + Ok(()) +} From d1693d85ad4490d5f3231bbb89bb0eebf82698ac Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 12 Dec 2024 10:49:46 -0800 Subject: [PATCH 11/12] improve comments pair-programming with yosh --- src/runtime/reactor.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index be833a7..ac4ca38 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -9,11 +9,13 @@ use std::collections::HashMap; use std::rc::Rc; use wasi::io::poll::Pollable; -/// A key representing an entry into the poller +/// A key for a Pollable, which is an index into the Slab in Reactor. #[repr(transparent)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] pub(crate) struct EventKey(pub(crate) usize); +/// A Registration is a reference to the Reactor's owned Pollable. When the registration is +/// dropped, the reactor will drop the Pollable resource. #[derive(Debug, PartialEq, Eq, Hash)] struct Registration { key: EventKey, @@ -25,10 +27,13 @@ impl Drop for Registration { } } +/// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create +/// as many WaitFor futures on a Pollable that the user needs. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AsyncPollable(Rc); impl AsyncPollable { + /// Create a Future that waits for the Pollable's readiness. pub fn wait_for(&self) -> WaitFor { use std::sync::atomic::{AtomicUsize, Ordering}; static COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -45,10 +50,13 @@ impl AsyncPollable { #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct Waitee { + /// This needs to be a reference counted registration, because it may outlive the AsyncPollable + /// &self that it was created from. pollable: AsyncPollable, unique: usize, } +/// A Future that waits for the Pollable's readiness. #[must_use = "futures do nothing unless polled or .awaited"] #[derive(Debug)] pub struct WaitFor { @@ -138,6 +146,8 @@ impl Reactor { let mut targets = Vec::with_capacity(reactor.wakers.len()); for waitee in reactor.wakers.keys() { let pollable_index = waitee.pollable.0.key; + // FIXME: instead of storing the indexes, we can actually just stick the waker in here, + // and make the quadratic lookup at the end of this function into a linear lookup. indexes.push(pollable_index); targets.push(&reactor.pollables[pollable_index.0]); } @@ -160,6 +170,7 @@ impl Reactor { .into_iter() .map(|index| indexes[index as usize]); + // FIXME this doesn't have to be quadratic. for key in ready_keys { for (waitee, waker) in reactor.wakers.iter() { if waitee.pollable.0.key == key { From 04950cdf34e47274f16a55d8786f40814aaeb82a Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Wed, 18 Dec 2024 14:42:44 -0800 Subject: [PATCH 12/12] eliminate quadratic waker lookup --- src/runtime/reactor.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index ac4ca38..831d598 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -141,14 +141,12 @@ impl Reactor { // we need to be able to associate the index with the right waker. // We start by iterating over the pollables, and keeping note of which - // pollable belongs to which waker index - let mut indexes = Vec::with_capacity(reactor.wakers.len()); + // pollable belongs to which waker + let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len()); let mut targets = Vec::with_capacity(reactor.wakers.len()); - for waitee in reactor.wakers.keys() { + for (waitee, waker) in reactor.wakers.iter() { let pollable_index = waitee.pollable.0.key; - // FIXME: instead of storing the indexes, we can actually just stick the waker in here, - // and make the quadratic lookup at the end of this function into a linear lookup. - indexes.push(pollable_index); + indexed_wakers.push(waker); targets.push(&reactor.pollables[pollable_index.0]); } @@ -166,17 +164,12 @@ impl Reactor { // to convert it back to the right keys for the wakers. Earlier we // established a positional index -> waker key relationship, so we can // go right ahead and perform a lookup there. - let ready_keys = ready_indexes + let ready_wakers = ready_indexes .into_iter() - .map(|index| indexes[index as usize]); + .map(|index| indexed_wakers[index as usize]); - // FIXME this doesn't have to be quadratic. - for key in ready_keys { - for (waitee, waker) in reactor.wakers.iter() { - if waitee.pollable.0.key == key { - waker.wake_by_ref() - } - } + for waker in ready_wakers { + waker.wake_by_ref() } }