diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cd01535..0e01554 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -11,11 +11,10 @@ #![warn(missing_docs, unreachable_pub)] mod block_on; -mod polling; mod reactor; pub use block_on::block_on; -pub use reactor::Reactor; +pub use reactor::{AsyncPollable, Reactor, WaitFor}; use std::cell::RefCell; // There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all diff --git a/src/runtime/polling.rs b/src/runtime/polling.rs deleted file mode 100644 index 8ddec0a..0000000 --- a/src/runtime/polling.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! This module handles the conversion from `Pollable` -> `Future`. We do this -//! by creating an equivalent implementation to the `polling` crate. Once -//! has been resolved, this module -//! will likely no longer be needed. - -use slab::Slab; -use std::vec::Vec; -use wasi::io::poll::{poll, Pollable}; - -/// Waits for I/O events. -#[derive(Debug)] -pub(crate) struct Poller { - pub(crate) targets: Slab, -} - -impl Poller { - /// Create a new instance of `Poller` - pub(crate) fn new() -> Self { - Self::with_capacity(0) - } - - /// Create a new instance of `Poller` with a given capacity - pub(crate) fn with_capacity(capacity: usize) -> Self { - Self { - targets: Slab::with_capacity(capacity), - } - } - - /// Insert a new `Pollable` target into `Poller` - pub(crate) fn insert(&mut self, target: Pollable) -> EventKey { - let key = self.targets.insert(target); - EventKey(key as u32) - } - - /// Get a `Pollable` if it exists. - pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> { - self.targets.get(key.0 as usize) - } - - /// Remove an instance of `Pollable` from `Poller`. - /// - /// Returns `None` if no entry was found for `key`. - pub(crate) fn remove(&mut self, key: EventKey) -> Option { - self.targets.try_remove(key.0 as usize) - } - - /// Block the current thread until a new event has triggered. - /// - /// This will clear the value of `ready_list`. - pub(crate) fn block_until(&mut self) -> Vec { - // We're about to wait for a number of pollables. When they wake we get - // the *indexes* back for the pollables whose events were available - so - // we need to be able to associate the index with the right waker. - - // We start by iterating over the pollables, and keeping note of which - // pollable belongs to which waker index - let mut indexes = Vec::with_capacity(self.targets.len()); - let mut targets = Vec::with_capacity(self.targets.len()); - for (index, target) in self.targets.iter() { - indexes.push(index); - targets.push(target); - } - - debug_assert_ne!( - targets.len(), - 0, - "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and the program may spin indefinitely" - ); - - // Now that we have that association, we're ready to poll our targets. - // This will block until an event has completed. - let ready_indexes = poll(&targets); - - // Once we have the indexes for which pollables are available, we need - // to convert it back to the right keys for the wakers. Earlier we - // established a positional index -> waker key relationship, so we can - // go right ahead and perform a lookup there. - ready_indexes - .into_iter() - .map(|index| EventKey(indexes[index as usize] as u32)) - .collect() - } -} - -/// A key representing an entry into the poller -#[repr(transparent)] -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub(crate) struct EventKey(pub(crate) u32); diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 5525c8b..831d598 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -1,16 +1,88 @@ -use super::{ - polling::{EventKey, Poller}, - REACTOR, -}; +use super::REACTOR; use core::cell::RefCell; use core::future; -use core::task::Poll; -use core::task::Waker; +use core::pin::Pin; +use core::task::{Context, Poll, Waker}; +use slab::Slab; use std::collections::HashMap; use std::rc::Rc; use wasi::io::poll::Pollable; +/// A key for a Pollable, which is an index into the Slab in Reactor. +#[repr(transparent)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub(crate) struct EventKey(pub(crate) usize); + +/// A Registration is a reference to the Reactor's owned Pollable. When the registration is +/// dropped, the reactor will drop the Pollable resource. +#[derive(Debug, PartialEq, Eq, Hash)] +struct Registration { + key: EventKey, +} + +impl Drop for Registration { + fn drop(&mut self) { + Reactor::current().deregister_event(self.key) + } +} + +/// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create +/// as many WaitFor futures on a Pollable that the user needs. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AsyncPollable(Rc); + +impl AsyncPollable { + /// Create a Future that waits for the Pollable's readiness. + pub fn wait_for(&self) -> WaitFor { + use std::sync::atomic::{AtomicUsize, Ordering}; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + let unique = COUNTER.fetch_add(1, Ordering::Relaxed); + WaitFor { + waitee: Waitee { + pollable: self.clone(), + unique, + }, + needs_deregistration: false, + } + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +struct Waitee { + /// This needs to be a reference counted registration, because it may outlive the AsyncPollable + /// &self that it was created from. + pollable: AsyncPollable, + unique: usize, +} + +/// A Future that waits for the Pollable's readiness. +#[must_use = "futures do nothing unless polled or .awaited"] +#[derive(Debug)] +pub struct WaitFor { + waitee: Waitee, + needs_deregistration: bool, +} +impl future::Future for WaitFor { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let reactor = Reactor::current(); + if reactor.ready(&self.as_ref().waitee, cx.waker()) { + Poll::Ready(()) + } else { + self.as_mut().needs_deregistration = true; + Poll::Pending + } + } +} +impl Drop for WaitFor { + fn drop(&mut self) { + if self.needs_deregistration { + Reactor::current().deregister_waitee(&self.waitee) + } + } +} + /// Manage async system resources for WASI 0.2 #[derive(Debug, Clone)] pub struct Reactor { @@ -21,8 +93,8 @@ pub struct Reactor { /// a lock of the whole. #[derive(Debug)] struct InnerReactor { - poller: Poller, - wakers: HashMap, + pollables: Slab, + wakers: HashMap, } impl Reactor { @@ -43,7 +115,7 @@ impl Reactor { pub(crate) fn new() -> Self { Self { inner: Rc::new(RefCell::new(InnerReactor { - poller: Poller::new(), + pollables: Slab::new(), wakers: HashMap::new(), })), } @@ -62,41 +134,152 @@ impl Reactor { /// reason that we have to call all the wakers - even if by default they /// will do nothing. pub(crate) fn block_until(&self) { + let reactor = self.inner.borrow(); + + // We're about to wait for a number of pollables. When they wake we get + // the *indexes* back for the pollables whose events were available - so + // we need to be able to associate the index with the right waker. + + // We start by iterating over the pollables, and keeping note of which + // pollable belongs to which waker + let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len()); + let mut targets = Vec::with_capacity(reactor.wakers.len()); + for (waitee, waker) in reactor.wakers.iter() { + let pollable_index = waitee.pollable.0.key; + indexed_wakers.push(waker); + targets.push(&reactor.pollables[pollable_index.0]); + } + + debug_assert_ne!( + targets.len(), + 0, + "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" + ); + + // Now that we have that association, we're ready to poll our targets. + // This will block until an event has completed. + let ready_indexes = wasi::io::poll::poll(&targets); + + // Once we have the indexes for which pollables are available, we need + // to convert it back to the right keys for the wakers. Earlier we + // established a positional index -> waker key relationship, so we can + // go right ahead and perform a lookup there. + let ready_wakers = ready_indexes + .into_iter() + .map(|index| indexed_wakers[index as usize]); + + for waker in ready_wakers { + waker.wake_by_ref() + } + } + + /// Turn a Wasi [`Pollable`] into an [`AsyncPollable`] + pub fn schedule(&self, pollable: Pollable) -> AsyncPollable { + let mut reactor = self.inner.borrow_mut(); + let key = EventKey(reactor.pollables.insert(pollable)); + AsyncPollable(Rc::new(Registration { key })) + } + + fn deregister_event(&self, key: EventKey) { + let mut reactor = self.inner.borrow_mut(); + reactor.pollables.remove(key.0); + } + + fn deregister_waitee(&self, waitee: &Waitee) { + let mut reactor = self.inner.borrow_mut(); + reactor.wakers.remove(waitee); + } + + fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool { let mut reactor = self.inner.borrow_mut(); - for key in reactor.poller.block_until() { - match reactor.wakers.get(&key) { - Some(waker) => waker.wake_by_ref(), - None => panic!("tried to wake the waker for non-existent `{:?}`", key), - } + let ready = reactor + .pollables + .get(waitee.pollable.0.key.0) + .expect("only live EventKey can be checked for readiness") + .ready(); + if !ready { + reactor.wakers.insert(waitee.clone(), waker.clone()); } + ready } /// Wait for the pollable to resolve. pub async fn wait_for(&self, pollable: Pollable) { - let mut pollable = Some(pollable); - let mut key = None; - // This function is the core loop of our function; it will be called - // multiple times as the future is resolving. - future::poll_fn(|cx| { - // Start by taking a lock on the reactor. This is single-threaded - // and short-lived, so it will never be contended. - let mut reactor = self.inner.borrow_mut(); - - // Schedule interest in the `pollable` on the first iteration. On - // every iteration, register the waker with the reactor. - let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap())); - reactor.wakers.insert(*key, cx.waker().clone()); - - // Check whether we're ready or need to keep waiting. If we're - // ready, we clean up after ourselves. - if reactor.poller.get(key).unwrap().ready() { - reactor.poller.remove(*key); - reactor.wakers.remove(key); - Poll::Ready(()) - } else { - Poll::Pending - } + let p = self.schedule(pollable); + p.wait_for().await + } +} + +#[cfg(test)] +mod test { + use super::*; + // Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready + // immediately. + #[test] + fn subscribe_no_duration() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let pollable = wasi::clocks::monotonic_clock::subscribe_duration(0); + let sched = reactor.schedule(pollable); + sched.wait_for().await; + }) + } + // Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready. + #[test] + fn subscribe_some_duration() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let pollable = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let sched = reactor.schedule(pollable); + sched.wait_for().await; + }) + } + + // Using WASMTIME_LOG, observe that this test results in a single poll() on the second + // subscription, rather than spinning in poll() with first subscription, which is instantly + // ready, but not what the waker requests. + #[test] + fn subscribe_multiple_durations() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let now = wasi::clocks::monotonic_clock::subscribe_duration(0); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let now = reactor.schedule(now); + let soon = reactor.schedule(soon); + soon.wait_for().await; + drop(now) + }) + } + + // Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both + // pollables because both are awaiting, and one with just the later pollable. + #[test] + fn subscribe_multiple_durations_zipped() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let start = wasi::clocks::monotonic_clock::now(); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let later = wasi::clocks::monotonic_clock::subscribe_duration(40_000_000); + let soon = reactor.schedule(soon); + let later = reactor.schedule(later); + + futures_lite::future::zip( + async move { + soon.wait_for().await; + println!( + "*** subscribe_duration(soon) ready ({})", + wasi::clocks::monotonic_clock::now() - start + ); + }, + async move { + later.wait_for().await; + println!( + "*** subscribe_duration(later) ready ({})", + wasi::clocks::monotonic_clock::now() - start + ); + }, + ) + .await; }) - .await } } diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..3e06fda --- /dev/null +++ b/src/task.rs @@ -0,0 +1,13 @@ +//! Types and Traits for working with asynchronous tasks. + +use crate::time::{Duration, Instant, Timer, Wait}; + +/// Sleeps for the specified amount of time. +pub fn sleep(dur: Duration) -> Wait { + Timer::after(dur).wait() +} + +/// Sleeps until the specified instant. +pub fn sleep_until(deadline: Instant) -> Wait { + Timer::at(deadline).wait() +} diff --git a/src/task/mod.rs b/src/task/mod.rs deleted file mode 100644 index fa9f4b9..0000000 --- a/src/task/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Types and Traits for working with asynchronous tasks. - -mod sleep; -mod sleep_until; - -pub use sleep::{sleep, Sleep}; -pub use sleep_until::{sleep_until, SleepUntil}; diff --git a/src/task/sleep.rs b/src/task/sleep.rs deleted file mode 100644 index 84ad95e..0000000 --- a/src/task/sleep.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::time::Timer as AsyncTimer; -use pin_project_lite::pin_project; - -use crate::time::{Duration, Instant}; - -/// Sleeps for the specified amount of time. -/// -/// This future can be `push_deadline` to be moved -pub fn sleep(dur: Duration) -> Sleep { - Sleep { - dur, - timer: AsyncTimer::after(dur.into()), - completed: false, - } -} - -pin_project! { - /// Sleeps for the specified amount of time. - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Sleep { - #[pin] - timer: AsyncTimer, - completed: bool, - dur: Duration, - } -} - -impl Future for Sleep { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.completed, "future polled after completing"); - let this = self.project(); - match this.timer.poll(cx) { - Poll::Ready(instant) => { - *this.completed = true; - Poll::Ready(instant.into()) - } - Poll::Pending => Poll::Pending, - } - } -} diff --git a/src/task/sleep_until.rs b/src/task/sleep_until.rs deleted file mode 100644 index bcadf41..0000000 --- a/src/task/sleep_until.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::time::{Instant, Timer}; -use pin_project_lite::pin_project; - -/// Sleeps until the specified instant. -pub fn sleep_until(deadline: Instant) -> SleepUntil { - SleepUntil { - timer: Timer::at(deadline.into()), - completed: false, - } -} - -pin_project! { - /// Sleeps until the specified instant. - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct SleepUntil { - #[pin] - timer: Timer, - completed: bool, - } -} - -impl Future for SleepUntil { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.completed, "future polled after completing"); - let this = self.project(); - match this.timer.poll(cx) { - Poll::Ready(instant) => { - *this.completed = true; - Poll::Ready(instant.into()) - } - Poll::Pending => Poll::Pending, - } - } -} diff --git a/src/time/duration.rs b/src/time/duration.rs index 6386154..10d8103 100644 --- a/src/time/duration.rs +++ b/src/time/duration.rs @@ -1,5 +1,4 @@ -use super::Instant; -use crate::task::Sleep; +use super::{Instant, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; use wasi::clocks::monotonic_clock; @@ -135,7 +134,7 @@ impl SubAssign for Duration { impl IntoFuture for Duration { type Output = Instant; - type IntoFuture = Sleep; + type IntoFuture = Wait; fn into_future(self) -> Self::IntoFuture { crate::task::sleep(self) diff --git a/src/time/instant.rs b/src/time/instant.rs index bd99f43..b9db4b9 100644 --- a/src/time/instant.rs +++ b/src/time/instant.rs @@ -1,5 +1,4 @@ -use super::Duration; -use crate::task::SleepUntil; +use super::{Duration, Wait}; use std::future::IntoFuture; use std::ops::{Add, AddAssign, Sub, SubAssign}; use wasi::clocks::monotonic_clock; @@ -85,7 +84,7 @@ impl std::ops::DerefMut for Instant { impl IntoFuture for Instant { type Output = Instant; - type IntoFuture = SleepUntil; + type IntoFuture = Wait; fn into_future(self) -> Self::IntoFuture { crate::task::sleep_until(self) diff --git a/src/time/mod.rs b/src/time/mod.rs index 419b35e..2132329 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -7,12 +7,19 @@ mod instant; pub use duration::Duration; pub use instant::Instant; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use wasi::clocks::{monotonic_clock::subscribe_instant, wall_clock}; +use wasi::clocks::{ + monotonic_clock::{subscribe_duration, subscribe_instant}, + wall_clock, +}; -use crate::{iter::AsyncIterator, runtime::Reactor}; +use crate::{ + iter::AsyncIterator, + runtime::{AsyncPollable, Reactor}, +}; /// A measurement of the system clock, useful for talking to external entities /// like the file system or other processes. @@ -42,47 +49,81 @@ impl AsyncIterator for Interval { type Item = Instant; async fn next(&mut self) -> Option { - Timer::after(self.duration).await; - Some(Instant::now()) + Some(Timer::after(self.duration).wait().await) } } #[derive(Debug)] -pub struct Timer(Option); +pub struct Timer(Option); impl Timer { pub fn never() -> Timer { Timer(None) } pub fn at(deadline: Instant) -> Timer { - Timer(Some(deadline)) + let pollable = Reactor::current().schedule(subscribe_instant(*deadline)); + Timer(Some(pollable)) } pub fn after(duration: Duration) -> Timer { - Timer(Some(Instant::now() + duration)) + let pollable = Reactor::current().schedule(subscribe_duration(*duration)); + Timer(Some(pollable)) } pub fn set_after(&mut self, duration: Duration) { *self = Self::after(duration); } - pub async fn wait(&self) { - match self.0 { - Some(deadline) => { - Reactor::current() - .wait_for(subscribe_instant(*deadline)) - .await - } - None => std::future::pending().await, - } + pub fn wait(&self) -> Wait { + let wait_for = self.0.as_ref().map(|pollable| pollable.wait_for()); + Wait { wait_for } + } +} + +pin_project! { + /// Future created by [`Timer::wait`] + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Wait { + #[pin] + wait_for: Option } } -impl Future for Timer { +impl Future for Wait { type Output = Instant; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_ref(); - let pinned = std::pin::pin!(this.wait()); - match pinned.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Instant::now()), + let this = self.project(); + match this.wait_for.as_pin_mut() { + None => Poll::Pending, + Some(f) => match f.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + }, } } } + +#[cfg(test)] +mod test { + use super::*; + + async fn debug_duration(what: &str, f: impl Future) { + let start = Instant::now(); + let now = f.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("{what} awaited for {} s", d.as_secs_f32()); + } + + #[test] + fn timer_now() { + crate::runtime::block_on(debug_duration("timer_now", async { + Timer::at(Instant::now()).wait().await + })); + } + + #[test] + fn timer_after_100_milliseconds() { + crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { + Timer::after(Duration::from_millis(100)).wait().await + })); + } +} diff --git a/tests/http_timeout.rs b/tests/http_timeout.rs new file mode 100644 index 0000000..de67f1b --- /dev/null +++ b/tests/http_timeout.rs @@ -0,0 +1,23 @@ +use wstd::future::FutureExt; +use wstd::http::{Client, Method, Request}; +use wstd::time::Duration; + +#[wstd::test] +async fn http_timeout() -> Result<(), Box> { + // This get request will connect to the server, which will then wait 1 second before + // returning a response. + let request = Request::new(Method::GET, "https://postman-echo.com/delay/1".parse()?); + let result = Client::new() + .send(request) + .timeout(Duration::from_millis(500)) + .await; + + assert!(result.is_err(), "response should be an error"); + let error = result.unwrap_err(); + assert!( + matches!(error.kind(), std::io::ErrorKind::TimedOut), + "expected TimedOut error, got: {error:?>}" + ); + + Ok(()) +}