Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
#![warn(missing_docs, unreachable_pub)]

mod block_on;
mod polling;
mod reactor;

pub use block_on::block_on;
pub use reactor::Reactor;
pub use reactor::{AsyncPollable, Reactor, WaitFor};
use std::cell::RefCell;

// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all
Expand Down
88 changes: 0 additions & 88 deletions src/runtime/polling.rs

This file was deleted.

259 changes: 221 additions & 38 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,88 @@
use super::{
polling::{EventKey, Poller},
REACTOR,
};
use super::REACTOR;

use core::cell::RefCell;
use core::future;
use core::task::Poll;
use core::task::Waker;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use slab::Slab;
use std::collections::HashMap;
use std::rc::Rc;
use wasi::io::poll::Pollable;

/// A key for a Pollable, which is an index into the Slab<Pollable> in Reactor.
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) usize);

/// A Registration is a reference to the Reactor's owned Pollable. When the registration is
/// dropped, the reactor will drop the Pollable resource.
#[derive(Debug, PartialEq, Eq, Hash)]
struct Registration {
key: EventKey,
}

impl Drop for Registration {
fn drop(&mut self) {
Reactor::current().deregister_event(self.key)
}
}

/// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create
/// as many WaitFor futures on a Pollable that the user needs.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AsyncPollable(Rc<Registration>);

impl AsyncPollable {
/// Create a Future that waits for the Pollable's readiness.
pub fn wait_for(&self) -> WaitFor {
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
WaitFor {
waitee: Waitee {
pollable: self.clone(),
unique,
},
needs_deregistration: false,
}
}
}

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct Waitee {
/// This needs to be a reference counted registration, because it may outlive the AsyncPollable
/// &self that it was created from.
pollable: AsyncPollable,
unique: usize,
}

/// A Future that waits for the Pollable's readiness.
#[must_use = "futures do nothing unless polled or .awaited"]
#[derive(Debug)]
pub struct WaitFor {
waitee: Waitee,
needs_deregistration: bool,
}
impl future::Future for WaitFor {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let reactor = Reactor::current();
if reactor.ready(&self.as_ref().waitee, cx.waker()) {
Poll::Ready(())
} else {
self.as_mut().needs_deregistration = true;
Poll::Pending
}
}
}
impl Drop for WaitFor {
fn drop(&mut self) {
if self.needs_deregistration {
Reactor::current().deregister_waitee(&self.waitee)
}
}
}

/// Manage async system resources for WASI 0.2
#[derive(Debug, Clone)]
pub struct Reactor {
Expand All @@ -21,8 +93,8 @@ pub struct Reactor {
/// a lock of the whole.
#[derive(Debug)]
struct InnerReactor {
poller: Poller,
wakers: HashMap<EventKey, Waker>,
pollables: Slab<Pollable>,
wakers: HashMap<Waitee, Waker>,
}

impl Reactor {
Expand All @@ -43,7 +115,7 @@ impl Reactor {
pub(crate) fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(InnerReactor {
poller: Poller::new(),
pollables: Slab::new(),
wakers: HashMap::new(),
})),
}
Expand All @@ -62,41 +134,152 @@ impl Reactor {
/// reason that we have to call all the wakers - even if by default they
/// will do nothing.
pub(crate) fn block_until(&self) {
let reactor = self.inner.borrow();

// We're about to wait for a number of pollables. When they wake we get
// the *indexes* back for the pollables whose events were available - so
// we need to be able to associate the index with the right waker.

// We start by iterating over the pollables, and keeping note of which
// pollable belongs to which waker
let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len());
let mut targets = Vec::with_capacity(reactor.wakers.len());
for (waitee, waker) in reactor.wakers.iter() {
let pollable_index = waitee.pollable.0.key;
indexed_wakers.push(waker);
targets.push(&reactor.pollables[pollable_index.0]);
}

debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);

// Now that we have that association, we're ready to poll our targets.
// This will block until an event has completed.
let ready_indexes = wasi::io::poll::poll(&targets);

// Once we have the indexes for which pollables are available, we need
// to convert it back to the right keys for the wakers. Earlier we
// established a positional index -> waker key relationship, so we can
// go right ahead and perform a lookup there.
let ready_wakers = ready_indexes
.into_iter()
.map(|index| indexed_wakers[index as usize]);

for waker in ready_wakers {
waker.wake_by_ref()
}
}

/// Turn a Wasi [`Pollable`] into an [`AsyncPollable`]
pub fn schedule(&self, pollable: Pollable) -> AsyncPollable {
let mut reactor = self.inner.borrow_mut();
let key = EventKey(reactor.pollables.insert(pollable));
AsyncPollable(Rc::new(Registration { key }))
}

fn deregister_event(&self, key: EventKey) {
let mut reactor = self.inner.borrow_mut();
reactor.pollables.remove(key.0);
}

fn deregister_waitee(&self, waitee: &Waitee) {
let mut reactor = self.inner.borrow_mut();
reactor.wakers.remove(waitee);
}

fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool {
let mut reactor = self.inner.borrow_mut();
for key in reactor.poller.block_until() {
match reactor.wakers.get(&key) {
Some(waker) => waker.wake_by_ref(),
None => panic!("tried to wake the waker for non-existent `{:?}`", key),
}
let ready = reactor
.pollables
.get(waitee.pollable.0.key.0)
.expect("only live EventKey can be checked for readiness")
.ready();
if !ready {
reactor.wakers.insert(waitee.clone(), waker.clone());
}
ready
}

/// Wait for the pollable to resolve.
pub async fn wait_for(&self, pollable: Pollable) {
let mut pollable = Some(pollable);
let mut key = None;
// This function is the core loop of our function; it will be called
// multiple times as the future is resolving.
future::poll_fn(|cx| {
// Start by taking a lock on the reactor. This is single-threaded
// and short-lived, so it will never be contended.
let mut reactor = self.inner.borrow_mut();

// Schedule interest in the `pollable` on the first iteration. On
// every iteration, register the waker with the reactor.
let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
reactor.wakers.insert(*key, cx.waker().clone());

// Check whether we're ready or need to keep waiting. If we're
// ready, we clean up after ourselves.
if reactor.poller.get(key).unwrap().ready() {
reactor.poller.remove(*key);
reactor.wakers.remove(key);
Poll::Ready(())
} else {
Poll::Pending
}
let p = self.schedule(pollable);
p.wait_for().await
}
}

#[cfg(test)]
mod test {
use super::*;
// Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready
// immediately.
#[test]
fn subscribe_no_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(0);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}
// Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready.
#[test]
fn subscribe_some_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}

// Using WASMTIME_LOG, observe that this test results in a single poll() on the second
// subscription, rather than spinning in poll() with first subscription, which is instantly
// ready, but not what the waker requests.
#[test]
fn subscribe_multiple_durations() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let now = wasi::clocks::monotonic_clock::subscribe_duration(0);
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let now = reactor.schedule(now);
let soon = reactor.schedule(soon);
soon.wait_for().await;
drop(now)
})
}

// Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both
// pollables because both are awaiting, and one with just the later pollable.
#[test]
fn subscribe_multiple_durations_zipped() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let start = wasi::clocks::monotonic_clock::now();
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let later = wasi::clocks::monotonic_clock::subscribe_duration(40_000_000);
let soon = reactor.schedule(soon);
let later = reactor.schedule(later);

futures_lite::future::zip(
async move {
soon.wait_for().await;
println!(
"*** subscribe_duration(soon) ready ({})",
wasi::clocks::monotonic_clock::now() - start
);
},
async move {
later.wait_for().await;
println!(
"*** subscribe_duration(later) ready ({})",
wasi::clocks::monotonic_clock::now() - start
);
},
)
.await;
})
.await
}
}
Loading