From e7efc5bc675b7f46c31e98428ca84a3e7990462b Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Thu, 14 Jan 2021 19:33:09 -0600 Subject: [PATCH 1/6] Refactor run_async to custom future implementation --- .../lucet-runtime-internals/src/future.rs | 313 ++++++++++-------- .../lucet-runtime-internals/src/instance.rs | 34 +- .../src/instance/state.rs | 19 +- .../guests/async_hostcall/bindings.json | 3 +- .../guests/async_hostcall/hostcall_block_on.c | 11 + .../lucet-runtime-tests/src/async_hostcall.rs | 58 +++- lucet-runtime/src/lib.rs | 1 + lucet-runtime/tests/instruction_counting.rs | 6 +- 8 files changed, 282 insertions(+), 163 deletions(-) diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index 313917fd6..ecff975fc 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -6,22 +6,27 @@ use crate::vmctx::{Vmctx, VmctxInternal}; use std::any::Any; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; +use std::{ + cell::UnsafeCell, + mem::{transmute, MaybeUninit}, +}; -/// This is the same type defined by the `futures` library, but we don't need the rest of the -/// library for this purpose. -type BoxFuture<'a, T> = Pin + Send + 'a>>; +const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; -/// A unique type that wraps a boxed future with a boxed return value. +/// A unique type that wraps a future and its returned value /// /// Type and lifetime guarantees are maintained by `Vmctx::block_on` and `Instance::run_async`. The /// user never sees this type. -struct YieldedFuture(BoxFuture<'static, ResumeVal>); +struct YieldedFuture<'a>(Pin<&'a mut (dyn Future)>); -/// A unique type for a boxed return value. The user never sees this type. -pub struct ResumeVal(Box); +unsafe impl<'a> Send for YieldedFuture<'a> {} -unsafe impl Send for ResumeVal {} +/// The return value for a blocked async. +/// +/// SAFETY: should only be constructed if the future has been polled to completion +struct ResumeAsync; impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. @@ -47,67 +52,74 @@ impl Vmctx { /// `run_async`, avoiding problems of nesting, and allowing the current OS thread to continue /// performing other async work. /// - /// Note that this method may only be used if `Instance::run_async` was used to run the VM, - /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. - pub fn block_on<'a, R>(&'a self, f: impl Future + Send + 'a) -> R + /// Note: + /// - This method may only be used if `Instance::run_async` was used to run the VM, + /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. + /// - This method is not reentrant. Use `.await` rather than `block_on` within the future. + /// Calling block_on from within the future will result in a panic. + /// - It is not valid to re-enter guest code from the future, as guest execution may be paused. + pub fn block_on(&self, f: impl Future + Send) -> R where R: Any + Send + 'static, { - // Die if we aren't in Instance::run_async - match self.instance().state { - State::Running { async_context } => { - if !async_context { - panic!(TerminationDetails::BlockOnNeedsAsync) - } + // Get the std::task::Context, or die if we aren't async + let mut cx = match &self.instance().state { + State::Running { + async_context: Some(cx), + } => cx.borrow_mut(), + State::Running { + async_context: None, + } => { + panic!(TerminationDetails::BlockOnNeedsAsync); } _ => unreachable!("Access to vmctx implies instance is Running"), - } - // Wrap the Output of `f` as a boxed ResumeVal. Then, box the entire - // async computation. - let f = Box::pin(async move { ResumeVal(Box::new(f.await)) }); - // Change the lifetime of the async computation from `'a` to `'static. - // We need to lie about this lifetime so that `YieldedFuture` may impl - // `Any` and be passed through the yield. `Instance::run_async` - // rehydrates this lifetime to be at most as long as the Vmctx's `'a`. - // This is safe because the stack frame that `'a` is tied to gets - // frozen in place as part of `self.yield_val_expecting_val`. - let f = unsafe { - std::mem::transmute::, BoxFuture<'static, ResumeVal>>(f) }; - // Wrap the computation in `YieldedFuture` so that - // `Instance::run_async` can catch and run it. We will get the - // `ResumeVal` we applied to `f` above. - self.yield_impl::(YieldedFuture(f), false, false); - let ResumeVal(v) = self.take_resumed_val(); - // We may now downcast and unbox the returned Box into an `R` - // again. - *v.downcast().expect("run_async broke invariant") - } -} -/// A simple future that yields once. We use this to yield when a runtime bound is reached. -/// -/// Inspired by Tokio's `yield_now()`. -struct YieldNow { - yielded: bool, -} + // Wrap the future, so that we don't have to worry about sending back the return value + let ret = UnsafeCell::new(MaybeUninit::uninit()); -impl YieldNow { - fn new() -> Self { - Self { yielded: false } - } -} + let ret_ptr = ret.get(); -impl Future for YieldNow { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.yielded { - Poll::Ready(()) - } else { - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending + let mut f = async move { + let ret = f.await; + // SAFETY: we are the only possible writer to ret_ptr, and the future must be polled to completion before this function returns + unsafe { + std::ptr::write(ret_ptr, MaybeUninit::new(ret)); + } + ResumeAsync + }; + + // We pin the future to the stack (a requirement for being able to poll the future). + // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for immediately-ready futures. + // + // SAFETY: We must uphold the invariants of Pin, namely that future does not move until it is dropped. + // By overriding the variable named `f`, it is impossible to access f again, except through the pinned reference. + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + + if let Poll::Pending = f.as_mut().poll(*cx) { + // The future is pending, so we need to yield it to the async executor to be polled to completion + + // SAFTEY: YieldedFuture is marked Send, which would not normally be the case due to ownership of ret_ptr, a raw pointer. + // Safe because: + // 1) We ensure that the inner future is Send + // 2) The pointer is only written to once, preventing race conditions + // 3) the pointer is not read from until after the future is polled to completion + let f = YieldedFuture(f); + + // We need to lie about this lifetime so that `YieldedFuture` may be passed through the yield. + // `Instance::run_async` rehydrates this lifetime to be at most as long as the Vmctx's `'_`. + // This is safe because the stack frame that `'_` is tied to gets frozen in place as part of `self.yield_val_expecting_val`. + let f = unsafe { transmute::, YieldedFuture<'static>>(f) }; + + // Yield so that `Instance::run_async` can catch and execute our future. + self.yield_impl::, ResumeAsync>(f, false, false); + + // Resuming with a ResumeAsync asserts that the future has been polled to completion + let ResumeAsync = self.take_resumed_val(); } + + // SAFETY: the future must have been polled to completion + unsafe { ret.into_inner().assume_init() } } } @@ -126,14 +138,9 @@ impl InstanceHandle { /// This method permits the use of `Vmctx::block_on`, but disallows all other uses of `Vmctx:: /// yield_val_expecting_val` and family (`Vmctx::yield_`, `Vmctx::yield_expecting_val`, /// `Vmctx::yield_val`). - pub async fn run_async<'a>( - &'a mut self, - entrypoint: &'a str, - args: &'a [Val], - runtime_bound: Option, - ) -> Result { - let func = self.module.get_export_func(entrypoint)?; - self.run_async_internal(func, args, runtime_bound).await + pub fn run_async<'a>(&'a mut self, entrypoint: &'a str, args: &'a [Val]) -> RunAsync<'a> { + let func = self.module.get_export_func(entrypoint); + self.run_async_internal(func, args) } /// Run the module's [start function][start], if one exists. @@ -150,91 +157,123 @@ impl InstanceHandle { /// runtime between async future yields (invocations of `.poll()` on the /// underlying generated future) if `runtime_bound` is provided. This /// behaves the same way as `Instance::run_async()`. - pub async fn run_async_start<'a>( - &'a mut self, - runtime_bound: Option, - ) -> Result<(), Error> { - if !self.is_not_started() { - return Err(Error::StartAlreadyRun); - } - let start = match self.module.get_start_func()? { - Some(start) => start, - None => return Ok(()), + pub fn run_async_start<'a>(&'a mut self) -> RunAsync<'a> { + let func = if self.is_not_started() { + self.module + .get_start_func() + // Invariant: can only be in NotStarted state if a start function exists + .map(|start| start.expect("NotStarted, but no start function")) + } else { + Err(Error::StartAlreadyRun) }; - self.run_async_internal(start, &[], runtime_bound).await?; - Ok(()) + + self.run_async_internal(func, &[]) } - /// Shared async run-loop implementation for both `run_async()` and - /// `run_start_async()`. - async fn run_async_internal<'a>( + fn run_async_internal<'a>( &'a mut self, - func: FunctionHandle, + func: Result, args: &'a [Val], - runtime_bound: Option, - ) -> Result { - if self.is_yielded() { - return Err(Error::Unsupported( - "cannot run_async a yielded instance".to_owned(), - )); + ) -> RunAsync<'a> { + let state = match func { + Ok(func) => RunAsyncState::Start(func, args), + Err(err) => RunAsyncState::Failed(Some(err)), + }; + + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state, } + } +} - // Store the ResumeVal here when we get it. - let mut resume_val: Option = None; - loop { - // Run the WebAssembly program - let run_result = if self.is_yielded() { - // A previous iteration of the loop stored the ResumeVal in - // `resume_val`, send it back to the guest ctx and continue - // running: - self.resume_with_val_impl( - resume_val - .take() - .expect("is_yielded implies resume_value is some"), - true, - runtime_bound, - ) - } else if self.is_bound_expired() { - self.resume_bounded( - runtime_bound.expect("should have bound if guest had expired bound"), - ) - } else { +pub struct RunAsync<'a> { + inst: &'a mut InstanceHandle, + state: RunAsyncState<'a>, + /// The instance count bound. Can be changed at any time, taking effect on the next guest entry + pub inst_count_bound: u64, +} + +impl<'a> RunAsync<'a> { + /// Set the instance count bound + pub fn bound_inst_count(&mut self, inst_count_bound: u64) -> &mut Self { + self.inst_count_bound = inst_count_bound; + self + } +} + +enum RunAsyncState<'a> { + Start(FunctionHandle, &'a [Val]), + Blocked(YieldedFuture<'a>), + Yielded, + Failed(Option), +} + +impl<'a> Future for RunAsync<'a> { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inst_count_bound = self.inst_count_bound; + + let run_result = match self.state { + RunAsyncState::Failed(ref mut err) => { + return Poll::Ready(Err(err.take().expect("failed future polled twice"))) + } + RunAsyncState::Start(func, args) => { // This is the first iteration, call the entrypoint: - self.run_func(func, args, true, runtime_bound) - }; - match run_result? { - InternalRunResult::Normal(RunResult::Returned(rval)) => { - // Finished running, return UntypedReturnValue - return Ok(rval); - } - InternalRunResult::Normal(RunResult::Yielded(yval)) => { - // Check if the yield came from Vmctx::block_on: - if yval.is::() { - let YieldedFuture(future) = *yval.downcast::().unwrap(); + self.inst + .run_func(func, args, Some(cx), Some(inst_count_bound)) + } + RunAsyncState::Blocked(ref mut fut) => { + let resume = match fut.0.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(resume) => resume, + }; + + // Resume the instance now that the future is ready + self.inst + .resume_with_val_impl(resume, Some(cx), Some(inst_count_bound)) + } + RunAsyncState::Yielded => self.inst.resume_bounded(inst_count_bound), + }; + + match run_result { + Ok(InternalRunResult::Normal(RunResult::Returned(rval))) => { + // Finished running, return UntypedReturnValue + return Poll::Ready(Ok(rval)); + } + Ok(InternalRunResult::Normal(RunResult::Yielded(yval))) => { + match yval.downcast::>() { + Ok(future) => { // Rehydrate the lifetime from `'static` to `'a`, which // is morally the same lifetime as was passed into // `Vmctx::block_on`. - let future = future as BoxFuture<'a, ResumeVal>; - - // await on the computation. Store its result in - // `resume_val`. - resume_val = Some(future.await); - // Now we want to `Instance::resume_with_val` and start - // this cycle over. - } else { + let future = unsafe { + transmute::, YieldedFuture<'a>>(*future) + }; + + self.state = RunAsyncState::Blocked(future); + } + Err(_) => { // Any other yielded value is not supported - die with an error. - return Err(Error::Unsupported( + return Poll::Ready(Err(Error::Unsupported( "cannot yield anything besides a future in Instance::run_async" .to_owned(), - )); + ))); } } - InternalRunResult::BoundExpired => { - // Await on a simple future that yields once then is ready. - YieldNow::new().await - } } + Ok(InternalRunResult::BoundExpired) => { + self.state = RunAsyncState::Yielded; + + // Yield, giving control back to the async executor + cx.waker().wake_by_ref(); + } + Err(err) => return Poll::Ready(Err(err)), } + + return Poll::Pending; } } @@ -251,8 +290,8 @@ mod test { #[allow(unreachable_code)] fn _dont_run_me() { let mut _inst: InstanceHandle = unimplemented!(); - _assert_send(&_inst.run_async("", &[], None)); - _assert_send(&_inst.run_async_start(None)); + _assert_send(&_inst.run_async("", &[])); + _assert_send(&_inst.run_async_start()); } } } diff --git a/lucet-runtime/lucet-runtime-internals/src/instance.rs b/lucet-runtime/lucet-runtime-internals/src/instance.rs index cfcf9b050..ff89ab5f2 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance.rs @@ -20,6 +20,7 @@ use crate::val::{UntypedRetVal, Val}; use crate::WASM_PAGE_SIZE; use libc::{c_void, pthread_self, siginfo_t, uintptr_t}; use lucet_module::InstanceRuntimeData; +use mem::transmute; use memoffset::offset_of; use std::any::Any; use std::cell::{BorrowError, BorrowMutError, Ref, RefCell, RefMut, UnsafeCell}; @@ -29,6 +30,7 @@ use std::mem; use std::ops::{Deref, DerefMut}; use std::ptr::{self, NonNull}; use std::sync::Arc; +use std::task; pub const LUCET_INSTANCE_MAGIC: u64 = 746_932_922; @@ -515,7 +517,7 @@ impl Instance { /// in the future. pub fn run(&mut self, entrypoint: &str, args: &[Val]) -> Result { let func = self.module.get_export_func(entrypoint)?; - Ok(self.run_func(func, &args, false, None)?.unwrap()) + Ok(self.run_func(func, &args, None, None)?.unwrap()) } /// Run a function with arguments in the guest context from the [WebAssembly function @@ -531,7 +533,7 @@ impl Instance { args: &[Val], ) -> Result { let func = self.module.get_func_from_idx(table_idx, func_idx)?; - Ok(self.run_func(func, &args, false, None)?.unwrap()) + Ok(self.run_func(func, &args, None, None)?.unwrap()) } /// Resume execution of an instance that has yielded without providing a value to the guest. @@ -562,13 +564,13 @@ impl Instance { /// The foreign code safety caveat of [`Instance::run()`](struct.Instance.html#method.run) /// applies. pub fn resume_with_val(&mut self, val: A) -> Result { - Ok(self.resume_with_val_impl(val, false, None)?.unwrap()) + Ok(self.resume_with_val_impl(val, None, None)?.unwrap()) } pub(crate) fn resume_with_val_impl( &mut self, val: A, - async_context: bool, + async_context: Option<&mut task::Context<'_>>, max_insn_count: Option, ) -> Result { match &self.state { @@ -610,7 +612,7 @@ impl Instance { )); } self.set_instruction_bound_delta(Some(max_insn_count)); - self.swap_and_return(true) + self.swap_and_return(None) } /// Run the module's [start function][start], if one exists. @@ -648,7 +650,7 @@ impl Instance { if !self.is_not_started() { return Err(Error::StartAlreadyRun); } - self.run_func(start, &[], false, None)?; + self.run_func(start, &[], None, None)?; } Ok(()) } @@ -1090,7 +1092,7 @@ impl Instance { &mut self, func: FunctionHandle, args: &[Val], - async_context: bool, + async_context: Option<&mut task::Context<'_>>, inst_count_bound: Option, ) -> Result { let needs_start = self.state.is_not_started() && !func.is_start_func; @@ -1191,7 +1193,10 @@ impl Instance { /// This must only be called for an instance in a ready, non-fatally faulted, or yielded state, /// or in the not-started state on the start function. The public wrappers around this function /// should make sure the state is appropriate. - fn swap_and_return(&mut self, async_context: bool) -> Result { + fn swap_and_return( + &mut self, + async_context: Option<&mut task::Context<'_>>, + ) -> Result { let is_start_func = self .entrypoint .expect("we always have an entrypoint by now") @@ -1203,7 +1208,18 @@ impl Instance { || self.state.is_yielded() || self.state.is_bound_expired() ); - self.state = State::Running { async_context }; + + self.state = State::Running { + async_context: async_context.map(|ctx| { + // SAFETY: We have to lie about the lifetime of async_context to pass it into the instance. + // As State::Running will only last for as long as this function's lifespan, this is safe. + let ctx = unsafe { + transmute::<&mut task::Context<'_>, &'static mut task::Context<'static>>(ctx) + }; + + RefCell::new(ctx) + }), + }; let res = self.with_current_instance(|i| { i.with_signals_on(|i| { diff --git a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs index 97247b0e5..f2e391c4c 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs @@ -2,8 +2,9 @@ use crate::instance::siginfo_ext::SiginfoExt; use crate::instance::{FaultDetails, TerminationDetails, YieldedVal}; use crate::sysdeps::UContext; use libc::{SIGBUS, SIGSEGV}; -use std::any::Any; use std::ffi::{CStr, CString}; +use std::task; +use std::{any::Any, cell::RefCell}; /// The representation of a Lucet instance's state machine. pub enum State { @@ -25,7 +26,10 @@ pub enum State { Running { /// Indicates whether the instance is running in an async context (`Instance::run_async`) /// or not. Needed by `Vmctx::block_on`. - async_context: bool, + /// + /// Safety: the context must be valid for as long as the instance remains in the running state + /// The logic in swap_and_return guarantees this. + async_context: Option>>, }, /// The instance has faulted, potentially fatally. @@ -97,10 +101,10 @@ impl std::fmt::Display for State { State::NotStarted => write!(f, "not started"), State::Ready => write!(f, "ready"), State::Running { - async_context: false, + async_context: None, } => write!(f, "running"), State::Running { - async_context: true, + async_context: Some(_), } => write!(f, "running (in async context)"), State::Faulted { details, siginfo, .. @@ -163,8 +167,11 @@ impl State { } pub fn is_running_async(&self) -> bool { - if let State::Running { async_context } = self { - *async_context + if let State::Running { + async_context: Some(_), + } = self + { + true } else { false } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json index f9837ac99..2df36b291 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json @@ -1,5 +1,6 @@ { "env": { - "hostcall_containing_block_on": "hostcall_containing_block_on" + "hostcall_containing_block_on": "hostcall_containing_block_on", + "hostcall_containing_yielding_block_on": "hostcall_containing_yielding_block_on" } } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c index 846e12a0f..3fddf9de7 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c @@ -1,9 +1,20 @@ #include extern void hostcall_containing_block_on(int); +extern void hostcall_containing_yielding_block_on(int); + int main(void) { hostcall_containing_block_on(1312); return 0; } + +int yielding() +{ + hostcall_containing_yielding_block_on(0); + hostcall_containing_yielding_block_on(1); + hostcall_containing_yielding_block_on(2); + hostcall_containing_yielding_block_on(3); + return 0; +} \ No newline at end of file diff --git a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs index 2af09d06b..d8319f014 100644 --- a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs +++ b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs @@ -1,8 +1,8 @@ #[macro_export] macro_rules! async_hostcall_tests { ( $( $region_id:ident => $TestRegion:path ),* ) => { - use lucet_runtime::{vmctx::Vmctx, lucet_hostcall}; + use std::future::Future; #[lucet_hostcall] #[no_mangle] @@ -11,6 +11,31 @@ macro_rules! async_hostcall_tests { assert_eq!(asynced_value, value); } + #[lucet_hostcall] + #[no_mangle] + pub fn hostcall_containing_yielding_block_on(vmctx: &Vmctx, times: u32) { + + struct YieldingFuture { times: u32 } + + impl Future for YieldingFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + if self.times == 0 { + return std::task::Poll::Ready(()) + } else { + self.get_mut().times -= 1; + + cx.waker().wake_by_ref(); + + return std::task::Poll::Pending + } + } + } + + vmctx.block_on(YieldingFuture { times }); + } + $( mod $region_id { use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails}; @@ -45,10 +70,11 @@ macro_rules! async_hostcall_tests { futures_executor::block_on( inst.run_async( "main", - &[0u32.into(), 0i32.into()], - // Run with bounded execution to test its interaction with block_on - Some(1), - )); + &[0u32.into(), 0i32.into()] + ) + // Run with bounded execution to test its interaction with block_on + .bound_inst_count(1) + ); match correct_run_res { Ok(_) => {} // expected - UntypedRetVal is (), so no reason to inspect value _ => panic!( @@ -73,9 +99,8 @@ macro_rules! async_hostcall_tests { futures_executor::block_on( inst.run_async( "main", - &[0u32.into(), 0i32.into()], - Some(1), - )); + &[0u32.into(), 0i32.into()] + ).bound_inst_count(1)); match correct_run_res_2 { Ok(_) => {} // expected _ => panic!( @@ -83,6 +108,23 @@ macro_rules! async_hostcall_tests { correct_run_res_2 ), } + + + + let correct_run_res_3 = + futures_executor::block_on( + inst.run_async( + "yielding", + &[] + ).bound_inst_count(10)); + + match correct_run_res_3 { + Ok(_) => {} // expected + _ => panic!( + "run_async yielding should return successfully, got {:?}", + correct_run_res_3 + ), + } } } diff --git a/lucet-runtime/src/lib.rs b/lucet-runtime/src/lib.rs index a235c9788..c732e1495 100644 --- a/lucet-runtime/src/lib.rs +++ b/lucet-runtime/src/lib.rs @@ -406,6 +406,7 @@ pub mod c_api; pub use lucet_module::{PublicKey, TrapCode}; pub use lucet_runtime_internals::alloc::{AllocStrategy, Limits, DEFAULT_SIGNAL_STACK_SIZE}; pub use lucet_runtime_internals::error::Error; +pub use lucet_runtime_internals::future::RunAsync; pub use lucet_runtime_internals::instance::signals::{ install_lucet_signal_handler, remove_lucet_signal_handler, }; diff --git a/lucet-runtime/tests/instruction_counting.rs b/lucet-runtime/tests/instruction_counting.rs index 4a5e8d267..74a8697de 100644 --- a/lucet-runtime/tests/instruction_counting.rs +++ b/lucet-runtime/tests/instruction_counting.rs @@ -187,10 +187,12 @@ fn check_instruction_count_with_periodic_yields_internal(want_start_function: bo } let yields = if want_start_function { - let future = Box::pin(inst.run_async_start(Some(1000))); + let mut future = Box::pin(inst.run_async_start()); + future.bound_inst_count(1000); future_loop(future) } else { - let future = Box::pin(inst.run_async("test_function", &[], Some(1000))); + let mut future = Box::pin(inst.run_async("test_function", &[])); + future.bound_inst_count(1000); future_loop(future) }; From 9c7acdb4af37e744199f6ac0207e413a594ec00e Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Wed, 10 Feb 2021 18:08:31 -0600 Subject: [PATCH 2/6] Remove unsafe code from run_async --- .../lucet-runtime-internals/src/future.rs | 142 ++++++------------ .../lucet-runtime-internals/src/instance.rs | 29 ++-- .../src/instance/state.rs | 10 +- 3 files changed, 61 insertions(+), 120 deletions(-) diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index ecff975fc..96948983d 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -3,30 +3,23 @@ use crate::instance::{InstanceHandle, InternalRunResult, RunResult, State, Termi use crate::module::FunctionHandle; use crate::val::{UntypedRetVal, Val}; use crate::vmctx::{Vmctx, VmctxInternal}; -use std::any::Any; use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use std::{ - cell::UnsafeCell, - mem::{transmute, MaybeUninit}, -}; +use std::task::Waker; -const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; - -/// A unique type that wraps a future and its returned value -/// -/// Type and lifetime guarantees are maintained by `Vmctx::block_on` and `Instance::run_async`. The -/// user never sees this type. -struct YieldedFuture<'a>(Pin<&'a mut (dyn Future)>); +/// a representation of AsyncContext which can be freely cloned +#[doc(hidden)] +#[derive(Clone)] +pub struct AsyncContext { + waker: Waker, +} -unsafe impl<'a> Send for YieldedFuture<'a> {} +const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; -/// The return value for a blocked async. -/// -/// SAFETY: should only be constructed if the future has been polled to completion -struct ResumeAsync; +struct AsyncYielded; +struct AsyncResume; impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. @@ -45,50 +38,14 @@ impl Vmctx { /// threads than otherwise would be necessary. /// /// Instead, this block_on operator is designed to work only when called within an invocation - /// of [`Instance::run_async`]. The `run_async` method executes instance code within a - /// trampoline, itself running within an async context, making it possible to temporarily pause - /// guest execution, jump back to the trampoline, and await there. The future given to block_on - /// is in passed back to that trampoline, and runs on the same runtime that invoked - /// `run_async`, avoiding problems of nesting, and allowing the current OS thread to continue - /// performing other async work. + /// of [`Instance::run_async`]. When a future needs to be polled, the instance will yield and + /// RunAsync will return control to the async executor. When the future is ready to be polled/make progress, + /// it will be polled from within the guest context. /// /// Note: /// - This method may only be used if `Instance::run_async` was used to run the VM, /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. - /// - This method is not reentrant. Use `.await` rather than `block_on` within the future. - /// Calling block_on from within the future will result in a panic. - /// - It is not valid to re-enter guest code from the future, as guest execution may be paused. - pub fn block_on(&self, f: impl Future + Send) -> R - where - R: Any + Send + 'static, - { - // Get the std::task::Context, or die if we aren't async - let mut cx = match &self.instance().state { - State::Running { - async_context: Some(cx), - } => cx.borrow_mut(), - State::Running { - async_context: None, - } => { - panic!(TerminationDetails::BlockOnNeedsAsync); - } - _ => unreachable!("Access to vmctx implies instance is Running"), - }; - - // Wrap the future, so that we don't have to worry about sending back the return value - let ret = UnsafeCell::new(MaybeUninit::uninit()); - - let ret_ptr = ret.get(); - - let mut f = async move { - let ret = f.await; - // SAFETY: we are the only possible writer to ret_ptr, and the future must be polled to completion before this function returns - unsafe { - std::ptr::write(ret_ptr, MaybeUninit::new(ret)); - } - ResumeAsync - }; - + pub fn block_on(&self, mut f: impl Future) -> R { // We pin the future to the stack (a requirement for being able to poll the future). // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for immediately-ready futures. // @@ -96,30 +53,33 @@ impl Vmctx { // By overriding the variable named `f`, it is impossible to access f again, except through the pinned reference. let mut f = unsafe { Pin::new_unchecked(&mut f) }; - if let Poll::Pending = f.as_mut().poll(*cx) { - // The future is pending, so we need to yield it to the async executor to be polled to completion - - // SAFTEY: YieldedFuture is marked Send, which would not normally be the case due to ownership of ret_ptr, a raw pointer. - // Safe because: - // 1) We ensure that the inner future is Send - // 2) The pointer is only written to once, preventing race conditions - // 3) the pointer is not read from until after the future is polled to completion - let f = YieldedFuture(f); + loop { + // Get the AsyncContext, or die if we aren't async + let cx = match &self.instance().state { + State::Running { + async_context: Some(cx), + } => cx, + State::Running { + async_context: None, + } => { + panic!(TerminationDetails::BlockOnNeedsAsync) + } + _ => unreachable!("Access to vmctx implies instance is Running"), + }; - // We need to lie about this lifetime so that `YieldedFuture` may be passed through the yield. - // `Instance::run_async` rehydrates this lifetime to be at most as long as the Vmctx's `'_`. - // This is safe because the stack frame that `'_` is tied to gets frozen in place as part of `self.yield_val_expecting_val`. - let f = unsafe { transmute::, YieldedFuture<'static>>(f) }; + // build an std::task::Context + let mut cx = Context::from_waker(&cx.waker); - // Yield so that `Instance::run_async` can catch and execute our future. - self.yield_impl::, ResumeAsync>(f, false, false); + match f.as_mut().poll(&mut cx) { + Poll::Ready(ret) => return ret, + Poll::Pending => { + // The future is pending, so we need to yield to the async executor + self.yield_impl::(AsyncYielded, false, false); - // Resuming with a ResumeAsync asserts that the future has been polled to completion - let ResumeAsync = self.take_resumed_val(); + let AsyncResume = self.take_resumed_val::(); + } + } } - - // SAFETY: the future must have been polled to completion - unsafe { ret.into_inner().assume_init() } } } @@ -205,7 +165,7 @@ impl<'a> RunAsync<'a> { enum RunAsyncState<'a> { Start(FunctionHandle, &'a [Val]), - Blocked(YieldedFuture<'a>), + Blocked, Yielded, Failed(Option), } @@ -221,19 +181,22 @@ impl<'a> Future for RunAsync<'a> { return Poll::Ready(Err(err.take().expect("failed future polled twice"))) } RunAsyncState::Start(func, args) => { + let cx = AsyncContext { + waker: cx.waker().clone(), + }; + // This is the first iteration, call the entrypoint: self.inst .run_func(func, args, Some(cx), Some(inst_count_bound)) } - RunAsyncState::Blocked(ref mut fut) => { - let resume = match fut.0.as_mut().poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(resume) => resume, + RunAsyncState::Blocked => { + let cx = AsyncContext { + waker: cx.waker().clone(), }; // Resume the instance now that the future is ready self.inst - .resume_with_val_impl(resume, Some(cx), Some(inst_count_bound)) + .resume_with_val_impl(AsyncResume, Some(cx), Some(inst_count_bound)) } RunAsyncState::Yielded => self.inst.resume_bounded(inst_count_bound), }; @@ -244,16 +207,9 @@ impl<'a> Future for RunAsync<'a> { return Poll::Ready(Ok(rval)); } Ok(InternalRunResult::Normal(RunResult::Yielded(yval))) => { - match yval.downcast::>() { - Ok(future) => { - // Rehydrate the lifetime from `'static` to `'a`, which - // is morally the same lifetime as was passed into - // `Vmctx::block_on`. - let future = unsafe { - transmute::, YieldedFuture<'a>>(*future) - }; - - self.state = RunAsyncState::Blocked(future); + match yval.downcast::() { + Ok(_) => { + self.state = RunAsyncState::Blocked; } Err(_) => { // Any other yielded value is not supported - die with an error. diff --git a/lucet-runtime/lucet-runtime-internals/src/instance.rs b/lucet-runtime/lucet-runtime-internals/src/instance.rs index ff89ab5f2..c9cd05607 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance.rs @@ -7,7 +7,6 @@ pub use crate::instance::execution::{KillError, KillState, KillSuccess, KillSwit pub use crate::instance::signals::{signal_handler_none, SignalBehavior, SignalHandler}; pub use crate::instance::state::State; -use crate::alloc::Alloc; use crate::context::Context; use crate::embed_ctx::CtxMap; use crate::error::Error; @@ -18,9 +17,9 @@ use crate::region::RegionInternal; use crate::sysdeps::HOST_PAGE_SIZE_EXPECTED; use crate::val::{UntypedRetVal, Val}; use crate::WASM_PAGE_SIZE; +use crate::{alloc::Alloc, future::AsyncContext}; use libc::{c_void, pthread_self, siginfo_t, uintptr_t}; use lucet_module::InstanceRuntimeData; -use mem::transmute; use memoffset::offset_of; use std::any::Any; use std::cell::{BorrowError, BorrowMutError, Ref, RefCell, RefMut, UnsafeCell}; @@ -30,7 +29,6 @@ use std::mem; use std::ops::{Deref, DerefMut}; use std::ptr::{self, NonNull}; use std::sync::Arc; -use std::task; pub const LUCET_INSTANCE_MAGIC: u64 = 746_932_922; @@ -570,7 +568,7 @@ impl Instance { pub(crate) fn resume_with_val_impl( &mut self, val: A, - async_context: Option<&mut task::Context<'_>>, + async_context: Option, max_insn_count: Option, ) -> Result { match &self.state { @@ -604,7 +602,8 @@ impl Instance { /// applies. pub(crate) fn resume_bounded( &mut self, - max_insn_count: u64, + async_context: AsyncContext, + max_insn_count: u64 ) -> Result { if !self.state.is_bound_expired() { return Err(Error::InvalidArgument( @@ -612,7 +611,7 @@ impl Instance { )); } self.set_instruction_bound_delta(Some(max_insn_count)); - self.swap_and_return(None) + self.swap_and_return(Some(async_context)) } /// Run the module's [start function][start], if one exists. @@ -1092,7 +1091,7 @@ impl Instance { &mut self, func: FunctionHandle, args: &[Val], - async_context: Option<&mut task::Context<'_>>, + async_context: Option, inst_count_bound: Option, ) -> Result { let needs_start = self.state.is_not_started() && !func.is_start_func; @@ -1193,9 +1192,9 @@ impl Instance { /// This must only be called for an instance in a ready, non-fatally faulted, or yielded state, /// or in the not-started state on the start function. The public wrappers around this function /// should make sure the state is appropriate. - fn swap_and_return( + fn swap_and_return<'a>( &mut self, - async_context: Option<&mut task::Context<'_>>, + async_context: Option, ) -> Result { let is_start_func = self .entrypoint @@ -1209,17 +1208,7 @@ impl Instance { || self.state.is_bound_expired() ); - self.state = State::Running { - async_context: async_context.map(|ctx| { - // SAFETY: We have to lie about the lifetime of async_context to pass it into the instance. - // As State::Running will only last for as long as this function's lifespan, this is safe. - let ctx = unsafe { - transmute::<&mut task::Context<'_>, &'static mut task::Context<'static>>(ctx) - }; - - RefCell::new(ctx) - }), - }; + self.state = State::Running { async_context }; let res = self.with_current_instance(|i| { i.with_signals_on(|i| { diff --git a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs index f2e391c4c..d6713bd97 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs @@ -1,10 +1,9 @@ -use crate::instance::siginfo_ext::SiginfoExt; use crate::instance::{FaultDetails, TerminationDetails, YieldedVal}; use crate::sysdeps::UContext; +use crate::{future::AsyncContext, instance::siginfo_ext::SiginfoExt}; use libc::{SIGBUS, SIGSEGV}; +use std::any::Any; use std::ffi::{CStr, CString}; -use std::task; -use std::{any::Any, cell::RefCell}; /// The representation of a Lucet instance's state machine. pub enum State { @@ -26,10 +25,7 @@ pub enum State { Running { /// Indicates whether the instance is running in an async context (`Instance::run_async`) /// or not. Needed by `Vmctx::block_on`. - /// - /// Safety: the context must be valid for as long as the instance remains in the running state - /// The logic in swap_and_return guarantees this. - async_context: Option>>, + async_context: Option, }, /// The instance has faulted, potentially fatally. From bd50d2efc8ac23ade62406a35e808a31accc02cc Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Wed, 10 Feb 2021 22:18:18 -0600 Subject: [PATCH 3/6] Add support to lucet_hostcall macro for async fn hostcalls --- lucet-runtime/lucet-runtime-macros/src/lib.rs | 19 +++++++++++- .../guests/async_hostcall/bindings.json | 3 +- .../guests/async_hostcall/hostcall_block_on.c | 8 +++-- .../lucet-runtime-tests/src/async_hostcall.rs | 31 ++++++++++++++++++- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/lucet-runtime/lucet-runtime-macros/src/lib.rs b/lucet-runtime/lucet-runtime-macros/src/lib.rs index 9cf89e230..7c91b7857 100644 --- a/lucet-runtime/lucet-runtime-macros/src/lib.rs +++ b/lucet-runtime/lucet-runtime-macros/src/lib.rs @@ -101,6 +101,17 @@ pub fn lucet_hostcall(_attr: TokenStream, item: TokenStream) -> TokenStream { quote! { lucet_runtime::TerminationDetails } }; + let res_ident = quote::format_ident!("res"); + + let block_if_async = match raw_sig.asyncness.take() { + Some(_) => { + quote! { let #res_ident = vmctx.block_on(#res_ident); } + } + None => { + quote! {} + } + }; + let raw_hostcall = quote! { #(#attrs)* #vis @@ -111,7 +122,13 @@ pub fn lucet_hostcall(_attr: TokenStream, item: TokenStream) -> TokenStream { let vmctx = #vmctx_mod::Vmctx::from_raw(vmctx_raw); #vmctx_mod::VmctxInternal::instance_mut(&vmctx).uninterruptable(|| { let res = std::panic::catch_unwind(move || { - #hostcall_ident(&#vmctx_mod::Vmctx::from_raw(vmctx_raw), #(#impl_args),*) + let vmctx = #vmctx_mod::Vmctx::from_raw(vmctx_raw); + + let #res_ident = #hostcall_ident(&vmctx, #(#impl_args),*); + + #block_if_async + + #res_ident }); match res { Ok(res) => res, diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json index 2df36b291..eeba06095 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json @@ -1,6 +1,7 @@ { "env": { "hostcall_containing_block_on": "hostcall_containing_block_on", - "hostcall_containing_yielding_block_on": "hostcall_containing_yielding_block_on" + "hostcall_containing_yielding_block_on": "hostcall_containing_yielding_block_on", + "hostcall_async_containing_yielding_block_on": "hostcall_async_containing_yielding_block_on" } } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c index 3fddf9de7..fdea50353 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c @@ -2,7 +2,7 @@ extern void hostcall_containing_block_on(int); extern void hostcall_containing_yielding_block_on(int); - +extern int hostcall_async_containing_yielding_block_on(int, int); int main(void) { @@ -16,5 +16,9 @@ int yielding() hostcall_containing_yielding_block_on(1); hostcall_containing_yielding_block_on(2); hostcall_containing_yielding_block_on(3); + + int six = hostcall_async_containing_yielding_block_on(3, 6); + hostcall_async_containing_yielding_block_on(3, six); + return 0; -} \ No newline at end of file +} diff --git a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs index d8319f014..12dbd40de 100644 --- a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs +++ b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs @@ -14,7 +14,6 @@ macro_rules! async_hostcall_tests { #[lucet_hostcall] #[no_mangle] pub fn hostcall_containing_yielding_block_on(vmctx: &Vmctx, times: u32) { - struct YieldingFuture { times: u32 } impl Future for YieldingFuture { @@ -36,6 +35,36 @@ macro_rules! async_hostcall_tests { vmctx.block_on(YieldingFuture { times }); } + #[lucet_hostcall] + #[no_mangle] + pub async fn hostcall_async_containing_yielding_block_on(vmctx: &Vmctx, times: u32, times_double: u32) -> u32 { + assert_eq!(times * 2, times_double); + + struct YieldingFuture { times: u32 } + + impl Future for YieldingFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + if self.times == 0 { + return std::task::Poll::Ready(()) + } else { + self.get_mut().times -= 1; + + cx.waker().wake_by_ref(); + + return std::task::Poll::Pending + } + } + } + + for i in 0..times { + YieldingFuture { times: 2 }.await + } + + return times * 2; + } + $( mod $region_id { use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails}; From 7cd9d834f371f8f6008d6419e202978b0b3d4087 Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Wed, 10 Feb 2021 23:19:31 -0600 Subject: [PATCH 4/6] Cleanup and improve documentation of RunAsync --- .../lucet-runtime-internals/src/future.rs | 244 ++++++++++++------ .../lucet-runtime-internals/src/instance.rs | 2 +- 2 files changed, 167 insertions(+), 79 deletions(-) diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index 96948983d..61697dcf9 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -18,34 +18,60 @@ pub struct AsyncContext { const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; +/// A value representing that the guest instance yielded because it was blocked on a future. +/// +/// In the future, we could provide a value from the guest that can be accessed before resuming the future, +/// such as if we wanted to do something from the host context. struct AsyncYielded; + +/// Providing the private `AsyncResume` as a resume value certifies that +/// RunAsync upheld the invarriants necessary to safely resume the instance. struct AsyncResume; +/// An error representing a failure of `try_block_on` +#[doc(hidden)] +pub enum BlockOnError { + NeedsAsyncContext, +} + +impl From for TerminationDetails { + fn from(err: BlockOnError) -> Self { + match err { + BlockOnError::NeedsAsyncContext => TerminationDetails::BlockOnNeedsAsync, + } + } +} + impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. /// - /// Lucet hostcalls are synchronous `extern "C" fn` functions called from WebAssembly. In that - /// context, we cannot use `.await` directly because the hostcall is not `async`. While we could - /// block on an executor using `futures::executor::block_on` or - /// `tokio::runtime::Runtime::block_on`, that has two drawbacks: - /// - /// - If the Lucet instance was originally invoked from an async context, trying to block on the - /// same runtime will fail if the executor cannot be nested (all executors we know of have this - /// restriction). + /// While this method is supported and part of the public API, it's easiest to define the hostcall + /// function itself as async. The `#[lucet_hostcall]` macro simply calls this function. /// - /// - The current OS thread would be blocked on the result of the computation, rather than being - /// able to run other async tasks while awaiting. This means an application will need more - /// threads than otherwise would be necessary. - /// - /// Instead, this block_on operator is designed to work only when called within an invocation - /// of [`Instance::run_async`]. When a future needs to be polled, the instance will yield and - /// RunAsync will return control to the async executor. When the future is ready to be polled/make progress, - /// it will be polled from within the guest context. + /// There's no performance penalty for doing so: futures that are immediately ready without waiting + /// don't require a context switch, just like using `.await`. /// /// Note: /// - This method may only be used if `Instance::run_async` was used to run the VM, /// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. - pub fn block_on(&self, mut f: impl Future) -> R { + #[doc(hidden)] + #[inline(always)] + pub fn block_on(&self, f: impl Future) -> R { + match self.try_block_on(f) { + Ok(res) => res, + Err(err) => panic!(TerminationDetails::from(err)), + } + } + + /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. + /// + /// The primary reason you may want to use `try_block_on` manually is to provide a fallback + /// implementation for if your hostcall is called from outside of an asynchronous context. + /// + /// If `Instance::run_async` is not being used to run the VM, this function will return + /// `Err(BlockOnError::NeedsAsyncContext)`. + #[doc(hidden)] + pub fn try_block_on(&self, mut f: impl Future) -> Result { // We pin the future to the stack (a requirement for being able to poll the future). // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for immediately-ready futures. // @@ -61,9 +87,7 @@ impl Vmctx { } => cx, State::Running { async_context: None, - } => { - panic!(TerminationDetails::BlockOnNeedsAsync) - } + } => return Err(BlockOnError::NeedsAsyncContext), _ => unreachable!("Access to vmctx implies instance is Running"), }; @@ -71,11 +95,16 @@ impl Vmctx { let mut cx = Context::from_waker(&cx.waker); match f.as_mut().poll(&mut cx) { - Poll::Ready(ret) => return ret, + Poll::Ready(ret) => return Ok(ret), Poll::Pending => { // The future is pending, so we need to yield to the async executor self.yield_impl::(AsyncYielded, false, false); + // Providing the private `AsyncResume` as a resume value certifies that + // RunAsync upheld the invarriants necessary for us to avoid a borrow check. + // + // If we resume with any other value, the instance may have been modified, and it is + // unsound to resume the instance. let AsyncResume = self.take_resumed_val::(); } } @@ -86,21 +115,36 @@ impl Vmctx { impl InstanceHandle { /// Run a WebAssembly function with arguments in the guest context at the given entrypoint. /// - /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke hostcalls - /// that use `Vmctx::block_on` and provides the trampoline that `.await`s those futures on - /// behalf of the guest. + /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke async hostcalls + /// and provides the trampoline that `.await`s those futures on behalf of the guest. + /// + /// To define an async hostcall, simply add an `async` modifier to your hostcall: + /// + /// ```ignore + /// #[lucet_hostcall] + /// #[no_mangle] + /// pub async fn hostcall_async(vmctx: &Vmctx) { + /// foobar().await + /// } + /// ``` + /// + /// See `[RunAsync]` for details. /// /// If `runtime_bound` is provided, it will also pause the Wasm execution and yield a future /// that resumes it after (approximately) that many Wasm opcodes have executed. /// /// # `Vmctx` Restrictions /// - /// This method permits the use of `Vmctx::block_on`, but disallows all other uses of `Vmctx:: + /// This method permits the use of async hostcalls, but disallows all other uses of `Vmctx:: /// yield_val_expecting_val` and family (`Vmctx::yield_`, `Vmctx::yield_expecting_val`, /// `Vmctx::yield_val`). pub fn run_async<'a>(&'a mut self, entrypoint: &'a str, args: &'a [Val]) -> RunAsync<'a> { let func = self.module.get_export_func(entrypoint); - self.run_async_internal(func, args) + + match func { + Ok(func) => self.run_async_internal(func, args), + Err(err) => self.run_async_failed(err), + } } /// Run the module's [start function][start], if one exists. @@ -117,41 +161,80 @@ impl InstanceHandle { /// runtime between async future yields (invocations of `.poll()` on the /// underlying generated future) if `runtime_bound` is provided. This /// behaves the same way as `Instance::run_async()`. + /// + /// Just like `Instance::run_start()`, hostcalls, including async hostcalls, + /// cannot be called from the instance start function. + /// + /// The result of the `RunAsync` future is unspecified, and should not be relied on. pub fn run_async_start<'a>(&'a mut self) -> RunAsync<'a> { - let func = if self.is_not_started() { - self.module - .get_start_func() - // Invariant: can only be in NotStarted state if a start function exists - .map(|start| start.expect("NotStarted, but no start function")) - } else { - Err(Error::StartAlreadyRun) + if !self.is_not_started() { + return self.run_async_failed(Error::StartAlreadyRun); + } + + let func = match self.module.get_start_func() { + Ok(start) => start.expect("NotStarted, but no start function"), // can only be in NotStarted state if a start function exists, + Err(err) => return self.run_async_failed(err), }; self.run_async_internal(func, &[]) } - fn run_async_internal<'a>( - &'a mut self, - func: Result, - args: &'a [Val], - ) -> RunAsync<'a> { - let state = match func { - Ok(func) => RunAsyncState::Start(func, args), - Err(err) => RunAsyncState::Failed(Some(err)), - }; + /// Returns a `RunAsync` that will asynchronously execute the guest instnace. + fn run_async_internal<'a>(&'a mut self, func: FunctionHandle, args: &'a [Val]) -> RunAsync<'a> { + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state: RunAsyncState::Start(func, args), + } + } + /// Returns a `RunAsync` that will immediately fail with the given error, without executing the guest instance. + fn run_async_failed<'a>(&'a mut self, err: Error) -> RunAsync<'a> { RunAsync { inst: self, inst_count_bound: DEFAULT_INST_COUNT_BOUND, - state, + state: RunAsyncState::Failed(err), } } } +/// A future implementation that enables running a guest instance which can call async hostcalls. +/// +/// Lucet hostcalls are synchronous `extern "C" fn` functions called from WebAssembly. In that +/// context, we cannot use `.await` directly because the hostcall is not `async`. While we could +/// block on an executor such as `futures::executor::block_on`, that would block the OS thread, +/// preventing us from running other async tasks while awaiting. This means an application will need more +/// threads than otherwise would be necessary. +/// +/// `RunAsync` allows the guest to call async hostcalls just as if the guest had called the async function +/// and immediately `.await`ed it. +/// +/// To define a async hostcall, simply add the async modifier to a hostcall definition: +/// +/// ```ignore +/// #[lucet_hostcall] +/// #[no_mangle] +/// pub async fn hostcall_async(vmctx: &Vmctx) { +/// foobar().await +/// } +/// ``` +/// +/// Note: Async hostcalls may only be used if `Instance::run_async` was used to run the VM, +/// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. +/// +/// Behind the scenes, lucet polls the future from within the guest execution context. If the future is not immediately ready, +/// the instance will yield and return control to the async executor. Later, when the future is ready to make progress, +/// the async executor will return to the guest context, where lucet will poll the future to completion. +/// +/// Just like `.await`, there is no overhead for futures that are immediately ready (such as `async { 5 }`). +/// +/// For async hostcalls that may yield to the async executor many times, it's recommended that you use `tokio::spawn`, +/// or the equivalent from your async executor, which will spawn the task to be run from the host execution context. +/// This avoids the overhead of context switching into the guest execution context every time the future needs to make progress. pub struct RunAsync<'a> { inst: &'a mut InstanceHandle, state: RunAsyncState<'a>, - /// The instance count bound. Can be changed at any time, taking effect on the next guest entry + /// The instance count bound. Can be changed at any time, taking effect on the next entry to the guest execution context pub inst_count_bound: u64, } @@ -165,9 +248,15 @@ impl<'a> RunAsync<'a> { enum RunAsyncState<'a> { Start(FunctionHandle, &'a [Val]), - Blocked, - Yielded, - Failed(Option), + /// The instance is currently blocked on a future. + /// + /// We keep the async yielded around - although the value of AsyncYielded isn't used for + /// anything, there's not much additional overhead in keeping it around, and it gives us + /// the option to pass data from the yield and potentially add other types of execution + /// in the future (such as bringing back host-context future execution). + BlockedOnFuture(Box), + BoundExpired, + Failed(Error), } impl<'a> Future for RunAsync<'a> { @@ -176,60 +265,59 @@ impl<'a> Future for RunAsync<'a> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inst_count_bound = self.inst_count_bound; - let run_result = match self.state { - RunAsyncState::Failed(ref mut err) => { - return Poll::Ready(Err(err.take().expect("failed future polled twice"))) - } - RunAsyncState::Start(func, args) => { - let cx = AsyncContext { - waker: cx.waker().clone(), - }; + let waker = cx.waker(); + let cx = AsyncContext { + waker: waker.clone(), + }; + let state = std::mem::replace( + &mut self.state, + RunAsyncState::Failed(Error::InvalidArgument("Polled an invalid future")), + ); + let run_result = match state { + RunAsyncState::Start(func, args) => { // This is the first iteration, call the entrypoint: self.inst .run_func(func, args, Some(cx), Some(inst_count_bound)) } - RunAsyncState::Blocked => { - let cx = AsyncContext { - waker: cx.waker().clone(), - }; - - // Resume the instance now that the future is ready + RunAsyncState::BlockedOnFuture(_) => { + // Resume the instance and poll the future self.inst .resume_with_val_impl(AsyncResume, Some(cx), Some(inst_count_bound)) } - RunAsyncState::Yielded => self.inst.resume_bounded(inst_count_bound), + RunAsyncState::BoundExpired => self.inst.resume_bounded(cx, inst_count_bound), + RunAsyncState::Failed(err) => Err(err), }; - match run_result { - Ok(InternalRunResult::Normal(RunResult::Returned(rval))) => { - // Finished running, return UntypedReturnValue - return Poll::Ready(Ok(rval)); - } + let res = match run_result { + Ok(InternalRunResult::Normal(RunResult::Returned(rval))) => Ok(rval), Ok(InternalRunResult::Normal(RunResult::Yielded(yval))) => { match yval.downcast::() { - Ok(_) => { - self.state = RunAsyncState::Blocked; + Ok(ye) => { + self.state = RunAsyncState::BlockedOnFuture(ye); + return Poll::Pending; } Err(_) => { // Any other yielded value is not supported - die with an error. - return Poll::Ready(Err(Error::Unsupported( + Err(Error::Unsupported( "cannot yield anything besides a future in Instance::run_async" .to_owned(), - ))); + )) } } } Ok(InternalRunResult::BoundExpired) => { - self.state = RunAsyncState::Yielded; - - // Yield, giving control back to the async executor - cx.waker().wake_by_ref(); + // The instruction count bound expired. Yield to the async exeuctor and immediately wake. + // + // By immediately waking, the future will be scheduled to run later (similar to tokio's yield_now()) + self.state = RunAsyncState::BoundExpired; + waker.wake_by_ref(); + return Poll::Pending; } - Err(err) => return Poll::Ready(Err(err)), - } + Err(err) => Err(err), + }; - return Poll::Pending; + Poll::Ready(res) } } diff --git a/lucet-runtime/lucet-runtime-internals/src/instance.rs b/lucet-runtime/lucet-runtime-internals/src/instance.rs index c9cd05607..eccf29b94 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance.rs @@ -603,7 +603,7 @@ impl Instance { pub(crate) fn resume_bounded( &mut self, async_context: AsyncContext, - max_insn_count: u64 + max_insn_count: u64, ) -> Result { if !self.state.is_bound_expired() { return Err(Error::InvalidArgument( From 56f68e111bb3eaa19126b5e61176c329c063def7 Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Thu, 11 Feb 2021 10:57:29 -0600 Subject: [PATCH 5/6] Allow yielding from an async execution context By replacing the expected yield value from a Box> to TypeId, it becomes possible to resume instance execution when the size of a boxed type is unknown and cannot be passed as an argument. This makes it possible to yield and resume from an async context by passing the resumption value through RunAsyncState. Finally, we have to check that the context didn't change between resumption within try_block_on, in order to make sure the right Waker is scheduled and the future will be resumed when it is ready. --- .../lucet-runtime-internals/src/future.rs | 100 +++++++++++------ .../lucet-runtime-internals/src/instance.rs | 17 +-- .../src/instance/state.rs | 16 +-- .../lucet-runtime-internals/src/vmctx.rs | 6 +- .../guests/async_hostcall/bindings.json | 3 +- .../guests/async_hostcall/hostcall_block_on.c | 6 + .../lucet-runtime-tests/src/async_hostcall.rs | 104 +++++++++++++++++- 7 files changed, 189 insertions(+), 63 deletions(-) diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index 61697dcf9..54e9e4fd6 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -1,13 +1,14 @@ -use crate::error::Error; use crate::instance::{InstanceHandle, InternalRunResult, RunResult, State, TerminationDetails}; use crate::module::FunctionHandle; -use crate::val::{UntypedRetVal, Val}; +use crate::val::Val; use crate::vmctx::{Vmctx, VmctxInternal}; -use std::future::Future; +use crate::{error::Error, instance::EmptyYieldVal}; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::task::Waker; +use std::{any::Any, future::Future}; /// a representation of AsyncContext which can be freely cloned #[doc(hidden)] @@ -19,9 +20,6 @@ pub struct AsyncContext { const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; /// A value representing that the guest instance yielded because it was blocked on a future. -/// -/// In the future, we could provide a value from the guest that can be accessed before resuming the future, -/// such as if we wanted to do something from the host context. struct AsyncYielded; /// Providing the private `AsyncResume` as a resume value certifies that @@ -81,10 +79,10 @@ impl Vmctx { loop { // Get the AsyncContext, or die if we aren't async - let cx = match &self.instance().state { + let arc_cx = match &self.instance().state { State::Running { async_context: Some(cx), - } => cx, + } => cx.clone(), State::Running { async_context: None, } => return Err(BlockOnError::NeedsAsyncContext), @@ -92,7 +90,7 @@ impl Vmctx { }; // build an std::task::Context - let mut cx = Context::from_waker(&cx.waker); + let mut cx = Context::from_waker(&arc_cx.waker); match f.as_mut().poll(&mut cx) { Poll::Ready(ret) => return Ok(ret), @@ -100,6 +98,22 @@ impl Vmctx { // The future is pending, so we need to yield to the async executor self.yield_impl::(AsyncYielded, false, false); + // Check that the async context hasn't changed (this could happen if the instance yielded) + match &self.instance().state { + State::Running { async_context: Some(new_cx) } => { + let same_waker = Arc::ptr_eq(&arc_cx, &new_cx) || arc_cx.waker.will_wake(&new_cx.waker); + + if !same_waker { + // The AsyncContext changed on us. This is because the instance is running from a new RunAsync. + // This probably happened because the instance yielded and was resumed up by `resume_async`. + // + // Poll the future again before yielding to the executor in order to register the new waker. + continue; + } + }, + _ => panic!("Lucet instance blocked on a future, but no longer running in async context. Make sure to use resume_async when resuming an async guest.") + } + // Providing the private `AsyncResume` as a resume value certifies that // RunAsync upheld the invarriants necessary for us to avoid a borrow check. // @@ -132,12 +146,6 @@ impl InstanceHandle { /// /// If `runtime_bound` is provided, it will also pause the Wasm execution and yield a future /// that resumes it after (approximately) that many Wasm opcodes have executed. - /// - /// # `Vmctx` Restrictions - /// - /// This method permits the use of async hostcalls, but disallows all other uses of `Vmctx:: - /// yield_val_expecting_val` and family (`Vmctx::yield_`, `Vmctx::yield_expecting_val`, - /// `Vmctx::yield_val`). pub fn run_async<'a>(&'a mut self, entrypoint: &'a str, args: &'a [Val]) -> RunAsync<'a> { let func = self.module.get_export_func(entrypoint); @@ -179,6 +187,38 @@ impl InstanceHandle { self.run_async_internal(func, &[]) } + /// Resume async execution of an instance that has yielded, providing a value to the guest. + /// + /// If an async execution context yields from within a future, resuming with [`Instance::resume()`], + /// [`Instance::resume_with_val()`], may panic if the instance needs to block on an async function. + /// Use this function instead, which will resume the instance within an async context. + /// + /// The provided value will be dynamically typechecked against the type the guest expects to + /// receive, and if that check fails, this call will fail with `Error::InvalidArgument`. + /// + /// See [`Instance::resume()`], [`Instance::resume_with_val()`], and [`Instance::run_async()`]. + /// + /// # Safety + /// + /// The foreign code safety caveat of [`Instance::run()`](struct.Instance.html#method.run) + /// applies. + pub fn resume_async_with_val<'a>(&'a mut self, val: impl Any + 'static + Send) -> RunAsync<'a> { + let val = Box::new(val) as Box; + + RunAsync { + inst: self, + inst_count_bound: DEFAULT_INST_COUNT_BOUND, + state: RunAsyncState::ResumeYielded(val), + } + } + + /// Resume execution of an instance that has yielded without providing a value to the guest. + /// + /// See [`Instance::resume_async_with_val()`] + pub fn resume_async<'a>(&'a mut self) -> RunAsync<'a> { + self.resume_async_with_val(EmptyYieldVal) + } + /// Returns a `RunAsync` that will asynchronously execute the guest instnace. fn run_async_internal<'a>(&'a mut self, func: FunctionHandle, args: &'a [Val]) -> RunAsync<'a> { RunAsync { @@ -248,19 +288,13 @@ impl<'a> RunAsync<'a> { enum RunAsyncState<'a> { Start(FunctionHandle, &'a [Val]), - /// The instance is currently blocked on a future. - /// - /// We keep the async yielded around - although the value of AsyncYielded isn't used for - /// anything, there's not much additional overhead in keeping it around, and it gives us - /// the option to pass data from the yield and potentially add other types of execution - /// in the future (such as bringing back host-context future execution). - BlockedOnFuture(Box), + ResumeYielded(Box), BoundExpired, Failed(Error), } impl<'a> Future for RunAsync<'a> { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inst_count_bound = self.inst_count_bound; @@ -280,30 +314,24 @@ impl<'a> Future for RunAsync<'a> { self.inst .run_func(func, args, Some(cx), Some(inst_count_bound)) } - RunAsyncState::BlockedOnFuture(_) => { - // Resume the instance and poll the future + RunAsyncState::ResumeYielded(val) => { self.inst - .resume_with_val_impl(AsyncResume, Some(cx), Some(inst_count_bound)) + .resume_with_val_impl(val, Some(cx), Some(inst_count_bound)) } RunAsyncState::BoundExpired => self.inst.resume_bounded(cx, inst_count_bound), RunAsyncState::Failed(err) => Err(err), }; let res = match run_result { - Ok(InternalRunResult::Normal(RunResult::Returned(rval))) => Ok(rval), + Ok(InternalRunResult::Normal(r @ RunResult::Returned(_))) => Ok(r), Ok(InternalRunResult::Normal(RunResult::Yielded(yval))) => { match yval.downcast::() { - Ok(ye) => { - self.state = RunAsyncState::BlockedOnFuture(ye); + Ok(_) => { + // When this future is polled next, we'll resume the guest instance using `AsyncResume` + self.state = RunAsyncState::ResumeYielded(Box::new(AsyncResume)); return Poll::Pending; } - Err(_) => { - // Any other yielded value is not supported - die with an error. - Err(Error::Unsupported( - "cannot yield anything besides a future in Instance::run_async" - .to_owned(), - )) - } + Err(yval) => Ok(RunResult::Yielded(yval)), } } Ok(InternalRunResult::BoundExpired) => { diff --git a/lucet-runtime/lucet-runtime-internals/src/instance.rs b/lucet-runtime/lucet-runtime-internals/src/instance.rs index eccf29b94..1efa8a732 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance.rs @@ -24,7 +24,6 @@ use memoffset::offset_of; use std::any::Any; use std::cell::{BorrowError, BorrowMutError, Ref, RefCell, RefMut, UnsafeCell}; use std::convert::TryFrom; -use std::marker::PhantomData; use std::mem; use std::ops::{Deref, DerefMut}; use std::ptr::{self, NonNull}; @@ -562,19 +561,21 @@ impl Instance { /// The foreign code safety caveat of [`Instance::run()`](struct.Instance.html#method.run) /// applies. pub fn resume_with_val(&mut self, val: A) -> Result { - Ok(self.resume_with_val_impl(val, None, None)?.unwrap()) + Ok(self + .resume_with_val_impl(Box::new(val), None, None)? + .unwrap()) } - pub(crate) fn resume_with_val_impl( + pub(crate) fn resume_with_val_impl( &mut self, - val: A, + val: Box, async_context: Option, max_insn_count: Option, ) -> Result { match &self.state { State::Yielded { expecting, .. } => { // make sure the resumed value is of the right type - if !expecting.is::>() { + if &(*val).type_id() != expecting { return Err(Error::InvalidArgument( "type mismatch between yielded instance expected value and resumed value", )); @@ -583,7 +584,7 @@ impl Instance { _ => return Err(Error::InvalidArgument("can only resume a yielded instance")), } - self.resumed_val = Some(Box::new(val) as Box); + self.resumed_val = Some(val); self.set_instruction_bound_delta(max_insn_count); self.swap_and_return(async_context) @@ -1208,7 +1209,9 @@ impl Instance { || self.state.is_bound_expired() ); - self.state = State::Running { async_context }; + self.state = State::Running { + async_context: async_context.map(|cx| Arc::new(cx)), + }; let res = self.with_current_instance(|i| { i.with_signals_on(|i| { diff --git a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs index d6713bd97..36c19c394 100644 --- a/lucet-runtime/lucet-runtime-internals/src/instance/state.rs +++ b/lucet-runtime/lucet-runtime-internals/src/instance/state.rs @@ -2,7 +2,7 @@ use crate::instance::{FaultDetails, TerminationDetails, YieldedVal}; use crate::sysdeps::UContext; use crate::{future::AsyncContext, instance::siginfo_ext::SiginfoExt}; use libc::{SIGBUS, SIGSEGV}; -use std::any::Any; +use std::any::TypeId; use std::ffi::{CStr, CString}; /// The representation of a Lucet instance's state machine. @@ -25,7 +25,7 @@ pub enum State { Running { /// Indicates whether the instance is running in an async context (`Instance::run_async`) /// or not. Needed by `Vmctx::block_on`. - async_context: Option, + async_context: Option>, }, /// The instance has faulted, potentially fatally. @@ -56,11 +56,8 @@ pub enum State { /// `RunResult` before anything else happens to the instance. Yielding { val: YieldedVal, - /// A phantom value carrying the type of the expected resumption value. - /// - /// Concretely, this should only ever be `Box>` where `R` is the type - /// the guest expects upon resumption. - expecting: Box, + /// The type of the expected resumption value + expecting: TypeId, }, /// The instance has yielded. @@ -69,10 +66,7 @@ pub enum State { /// instance is reset. Yielded { /// A phantom value carrying the type of the expected resumption value. - /// - /// Concretely, this should only ever be `Box>` where `R` is the type - /// the guest expects upon resumption. - expecting: Box, + expecting: TypeId, }, /// The instance has reached an instruction-count bound. diff --git a/lucet-runtime/lucet-runtime-internals/src/vmctx.rs b/lucet-runtime/lucet-runtime-internals/src/vmctx.rs index 11e14937d..e7ef8bffa 100644 --- a/lucet-runtime/lucet-runtime-internals/src/vmctx.rs +++ b/lucet-runtime/lucet-runtime-internals/src/vmctx.rs @@ -13,10 +13,9 @@ use crate::instance::{ CURRENT_INSTANCE, HOST_CTX, }; use lucet_module::{FunctionHandle, GlobalValue}; -use std::any::Any; +use std::any::{Any, TypeId}; use std::borrow::{Borrow, BorrowMut}; use std::cell::{Ref, RefCell, RefMut}; -use std::marker::PhantomData; /// An opaque handle to a running instance's context. #[derive(Debug)] @@ -436,10 +435,9 @@ impl Vmctx { if is_bound_expiration { inst.state = State::BoundExpired; } else { - let expecting: Box> = Box::new(PhantomData); inst.state = State::Yielding { val: YieldedVal::new(val), - expecting: expecting as Box, + expecting: TypeId::of::(), }; } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json index eeba06095..ce0eb173b 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/bindings.json @@ -2,6 +2,7 @@ "env": { "hostcall_containing_block_on": "hostcall_containing_block_on", "hostcall_containing_yielding_block_on": "hostcall_containing_yielding_block_on", - "hostcall_async_containing_yielding_block_on": "hostcall_async_containing_yielding_block_on" + "hostcall_async_containing_yielding_block_on": "hostcall_async_containing_yielding_block_on", + "await_manual_future": "await_manual_future" } } diff --git a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c index fdea50353..5b6422a70 100644 --- a/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c +++ b/lucet-runtime/lucet-runtime-tests/guests/async_hostcall/hostcall_block_on.c @@ -22,3 +22,9 @@ int yielding() return 0; } + +int manual_future() +{ + await_manual_future(); + return 0; +} \ No newline at end of file diff --git a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs index 12dbd40de..065a435bd 100644 --- a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs +++ b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs @@ -1,8 +1,54 @@ + + +use std::future::Future; +use std::task::{Waker, Context, Poll}; +use std::sync::{Arc, Mutex}; + +enum StubFutureInner { + NeverPolled, + Polled(Waker), + Ready +} +#[derive(Clone)] +pub struct StubFuture(Arc>); +impl StubFuture { + pub fn new() -> Self { StubFuture(Arc::new(Mutex::new(StubFutureInner::NeverPolled))) } + pub fn make_ready(&self) { + let mut inner = self.0.lock().unwrap(); + match std::mem::replace(&mut *inner, StubFutureInner::Ready) { + StubFutureInner::Polled(waker) => { + waker.wake(); + } + _ => panic!("never polled") + } + } +} + +impl Future for StubFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut inner = self.0.lock().unwrap(); + + match *inner { + StubFutureInner::Ready => Poll::Ready(()), + _ => { + *inner = StubFutureInner::Polled(cx.waker().clone()); + Poll::Pending + } + } + } +} + + #[macro_export] macro_rules! async_hostcall_tests { ( $( $region_id:ident => $TestRegion:path ),* ) => { use lucet_runtime::{vmctx::Vmctx, lucet_hostcall}; use std::future::Future; + use std::task::{Waker, Context}; + use $crate::async_hostcall::StubFuture; + #[lucet_hostcall] #[no_mangle] @@ -65,13 +111,22 @@ macro_rules! async_hostcall_tests { return times * 2; } + #[lucet_hostcall] + #[no_mangle] + pub async fn await_manual_future(vmctx: &Vmctx) { + vmctx.yield_(); + vmctx.get_embed_ctx_mut::>().take().unwrap().await; + } + $( mod $region_id { - use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails}; - use std::sync::Arc; + use lucet_runtime::{DlModule, Error, Limits, Region, RegionCreate, TerminationDetails, RunResult}; + use std::sync::{Arc}; use $TestRegion as TestRegion; use $crate::build::test_module_c; + use $crate::async_hostcall::StubFuture; + #[test] fn ensure_linked() { lucet_runtime::lucet_internal_ensure_linked(); @@ -138,8 +193,6 @@ macro_rules! async_hostcall_tests { ), } - - let correct_run_res_3 = futures_executor::block_on( inst.run_async( @@ -156,6 +209,49 @@ macro_rules! async_hostcall_tests { } } + + #[test] + fn yield_from_within_future() { + let module = test_module_c("async_hostcall", "hostcall_block_on.c") + .expect("module compiled and loaded"); + let region = ::create(1, &Limits::default()) + .expect("region can be created"); + let mut inst = region + .new_instance(module) + .expect("instance can be created"); + + inst.run_start().expect("start section runs"); + + let manual_future = StubFuture::new(); + + inst.insert_embed_ctx(Some(manual_future.clone())); + + let run_res = + futures_executor::block_on( + inst.run_async( + "manual_future", + &[] + )); + + if let Ok(RunResult::Yielded(_)) = run_res { /* expected */ } else { panic!("did not yield"); } + + // The loop within try_block_on polled the future returned by await_manual_future, + // and the waker that will be passed to poll `manuall_future` is from the old + // executor. + // + // However, we yielded from the guest prior to polling manual_future, so we + // need to spawn a thread that will wake manual_future _after_ it has been polled. + std::thread::spawn(move || { + // the instance some time, so that it polls and blocks on manual_future + std::thread::sleep(std::time::Duration::from_millis(5)); + // wake the future manually. this will force us to miss the wakeup + manual_future.make_ready(); + }); + + let run_res = futures_executor::block_on(inst.resume_async()); + + if let Ok(RunResult::Returned(_)) = run_res { /* expected */ } else { panic!("did not return"); } + } } )* }; From 4773acb648f6777203bf6252b205e6c096c650db Mon Sep 17 00:00:00 2001 From: Ben Aubin Date: Thu, 18 Feb 2021 12:28:55 -0600 Subject: [PATCH 6/6] Improve RunAsync documentation and formatting --- .../lucet-runtime-internals/src/future.rs | 110 ++++++++++-------- .../lucet-runtime-tests/src/async_hostcall.rs | 21 ++-- 2 files changed, 69 insertions(+), 62 deletions(-) diff --git a/lucet-runtime/lucet-runtime-internals/src/future.rs b/lucet-runtime/lucet-runtime-internals/src/future.rs index 54e9e4fd6..bb918b3a1 100644 --- a/lucet-runtime/lucet-runtime-internals/src/future.rs +++ b/lucet-runtime/lucet-runtime-internals/src/future.rs @@ -23,7 +23,7 @@ const DEFAULT_INST_COUNT_BOUND: u64 = i64::MAX as u64; struct AsyncYielded; /// Providing the private `AsyncResume` as a resume value certifies that -/// RunAsync upheld the invarriants necessary to safely resume the instance. +/// RunAsync upheld the invariants necessary to safely resume the instance. struct AsyncResume; /// An error representing a failure of `try_block_on` @@ -44,10 +44,8 @@ impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. /// /// While this method is supported and part of the public API, it's easiest to define the hostcall - /// function itself as async. The `#[lucet_hostcall]` macro simply calls this function. - /// - /// There's no performance penalty for doing so: futures that are immediately ready without waiting - /// don't require a context switch, just like using `.await`. + /// function itself as async (the `#[lucet_hostcall]` macro simply calls this function). + /// If you need to provide a synchronous fallback, use [`Vmctx::try_block_on()`] instead. /// /// Note: /// - This method may only be used if `Instance::run_async` was used to run the VM, @@ -63,18 +61,48 @@ impl Vmctx { /// Block on the result of an `async` computation from an instance run by `Instance::run_async`. /// - /// The primary reason you may want to use `try_block_on` manually is to provide a fallback - /// implementation for if your hostcall is called from outside of an asynchronous context. + /// The easiest way to make a hostcall async is simply to add the `async` modifier: + /// + /// ```ignore + /// #[lucet_hostcall] + /// #[no_mangle] + /// pub async fn hostcall_async(vmctx: &Vmctx) { + /// foobar().await + /// } + /// ``` + /// + /// Of course, we can only expose synchronous interfaces to Wasm guests. To implement + /// async hostcalls, Lucet blocks guest execution while waiting the future to complete, + /// yielding control of the thread back the async executor so that other tasks can make + /// progress in the meantime. + /// + /// It's recommended to use the async modifier, rather than call this function directly. + /// The primary reason you may want to do so directly is if you want to provide a fallback + /// implementation for the case when calling a hostcall from outside of an async context. + /// + /// There is no additional yield to the host when the future passed is immediately ready. + /// This behavior is the same as implemented by the `.await` operator. + /// + /// The future passed is polled from within the guest execution context. If the future is not + /// immediately ready, the instance will yield and return control to the async executor. + /// Later, once the future is ready to make progress, the async executor will return us to + /// the guest context, where Lucet will resume polling the future to completion. + /// + /// For async hostcalls that may yield to the async executor many times, it's recommended that + /// you use `tokio::spawn`, or the equivalent from your async executor, which will spawn the task + /// to be run from the host execution context. This avoids the overhead of context switching into + /// the guest execution context every time the future needs to make progress. /// /// If `Instance::run_async` is not being used to run the VM, this function will return /// `Err(BlockOnError::NeedsAsyncContext)`. - #[doc(hidden)] pub fn try_block_on(&self, mut f: impl Future) -> Result { // We pin the future to the stack (a requirement for being able to poll the future). - // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for immediately-ready futures. + // By pinning to the stack instead of using `Box::pin`, we avoid heap allocations for + // immediately-ready futures. // - // SAFETY: We must uphold the invariants of Pin, namely that future does not move until it is dropped. - // By overriding the variable named `f`, it is impossible to access f again, except through the pinned reference. + // SAFETY: We must uphold the invariants of Pin, namely that future does not move until + // it is dropped. By overriding the variable named `f`, it is impossible to access f again, + // except through the pinned reference. let mut f = unsafe { Pin::new_unchecked(&mut f) }; loop { @@ -98,15 +126,24 @@ impl Vmctx { // The future is pending, so we need to yield to the async executor self.yield_impl::(AsyncYielded, false, false); - // Check that the async context hasn't changed (this could happen if the instance yielded) + // Poll is synchronous and may have yielded back to this host, so it is possible that we are + // executing from a new RunAsync future (and thus, a new async executor). If the future has awoken + // the previous waker, we would miss a wakeup. + // + // Rather than try to prevent this from happening at all, we check if the waker changed from under us, + // and if so, we simply poll the future again. If the future is Ready by that point, there's no need + // for another wakeup (we've already done so!). Otherwise, polling the future registers the new waker, + // allowing us to yield back to the async executor without risking a missed wakup. + // + // Note: Waking a waker unnecessarily will not cause unsafety or logic errors. In the worst case, the + // async executor may waste CPU time polling pending futures an extra time. + match &self.instance().state { State::Running { async_context: Some(new_cx) } => { let same_waker = Arc::ptr_eq(&arc_cx, &new_cx) || arc_cx.waker.will_wake(&new_cx.waker); if !same_waker { - // The AsyncContext changed on us. This is because the instance is running from a new RunAsync. - // This probably happened because the instance yielded and was resumed up by `resume_async`. - // + // The AsyncContext changed on us. // Poll the future again before yielding to the executor in order to register the new waker. continue; } @@ -115,7 +152,7 @@ impl Vmctx { } // Providing the private `AsyncResume` as a resume value certifies that - // RunAsync upheld the invarriants necessary for us to avoid a borrow check. + // RunAsync upheld the invariants necessary for us to avoid a borrow check. // // If we resume with any other value, the instance may have been modified, and it is // unsound to resume the instance. @@ -129,8 +166,7 @@ impl Vmctx { impl InstanceHandle { /// Run a WebAssembly function with arguments in the guest context at the given entrypoint. /// - /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke async hostcalls - /// and provides the trampoline that `.await`s those futures on behalf of the guest. + /// This method is similar to `Instance::run()`, but allows the Wasm program to invoke async hostcalls. /// /// To define an async hostcall, simply add an `async` modifier to your hostcall: /// @@ -142,7 +178,7 @@ impl InstanceHandle { /// } /// ``` /// - /// See `[RunAsync]` for details. + /// See [`Vmctx::try_block_on()`] for details. /// /// If `runtime_bound` is provided, it will also pause the Wasm execution and yield a future /// that resumes it after (approximately) that many Wasm opcodes have executed. @@ -238,39 +274,11 @@ impl InstanceHandle { } } -/// A future implementation that enables running a guest instance which can call async hostcalls. -/// -/// Lucet hostcalls are synchronous `extern "C" fn` functions called from WebAssembly. In that -/// context, we cannot use `.await` directly because the hostcall is not `async`. While we could -/// block on an executor such as `futures::executor::block_on`, that would block the OS thread, -/// preventing us from running other async tasks while awaiting. This means an application will need more -/// threads than otherwise would be necessary. -/// -/// `RunAsync` allows the guest to call async hostcalls just as if the guest had called the async function -/// and immediately `.await`ed it. -/// -/// To define a async hostcall, simply add the async modifier to a hostcall definition: -/// -/// ```ignore -/// #[lucet_hostcall] -/// #[no_mangle] -/// pub async fn hostcall_async(vmctx: &Vmctx) { -/// foobar().await -/// } -/// ``` -/// -/// Note: Async hostcalls may only be used if `Instance::run_async` was used to run the VM, -/// otherwise it will terminate the instance with `TerminationDetails::BlockOnNeedsAsync`. -/// -/// Behind the scenes, lucet polls the future from within the guest execution context. If the future is not immediately ready, -/// the instance will yield and return control to the async executor. Later, when the future is ready to make progress, -/// the async executor will return to the guest context, where lucet will poll the future to completion. -/// -/// Just like `.await`, there is no overhead for futures that are immediately ready (such as `async { 5 }`). +/// A Future implementation that enables asynchronous execution of bounded slices of +/// WebAssembly, and of underlying async hostcalls that it invokes. /// -/// For async hostcalls that may yield to the async executor many times, it's recommended that you use `tokio::spawn`, -/// or the equivalent from your async executor, which will spawn the task to be run from the host execution context. -/// This avoids the overhead of context switching into the guest execution context every time the future needs to make progress. +/// See [`Vmctx::try_block_on()`] for more details about how this works, and how +/// to define an async hostcall. pub struct RunAsync<'a> { inst: &'a mut InstanceHandle, state: RunAsyncState<'a>, diff --git a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs index 065a435bd..cd23836c9 100644 --- a/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs +++ b/lucet-runtime/lucet-runtime-tests/src/async_hostcall.rs @@ -1,25 +1,25 @@ - - use std::future::Future; -use std::task::{Waker, Context, Poll}; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; enum StubFutureInner { NeverPolled, Polled(Waker), - Ready + Ready, } #[derive(Clone)] pub struct StubFuture(Arc>); impl StubFuture { - pub fn new() -> Self { StubFuture(Arc::new(Mutex::new(StubFutureInner::NeverPolled))) } + pub fn new() -> Self { + StubFuture(Arc::new(Mutex::new(StubFutureInner::NeverPolled))) + } pub fn make_ready(&self) { let mut inner = self.0.lock().unwrap(); match std::mem::replace(&mut *inner, StubFutureInner::Ready) { StubFutureInner::Polled(waker) => { waker.wake(); } - _ => panic!("never polled") + _ => panic!("never polled"), } } } @@ -40,7 +40,6 @@ impl Future for StubFuture { } } - #[macro_export] macro_rules! async_hostcall_tests { ( $( $region_id:ident => $TestRegion:path ),* ) => { @@ -232,8 +231,8 @@ macro_rules! async_hostcall_tests { "manual_future", &[] )); - - if let Ok(RunResult::Yielded(_)) = run_res { /* expected */ } else { panic!("did not yield"); } + + if let Ok(RunResult::Yielded(_)) = run_res { /* expected */ } else { panic!("did not yield"); } // The loop within try_block_on polled the future returned by await_manual_future, // and the waker that will be passed to poll `manuall_future` is from the old @@ -249,8 +248,8 @@ macro_rules! async_hostcall_tests { }); let run_res = futures_executor::block_on(inst.resume_async()); - - if let Ok(RunResult::Returned(_)) = run_res { /* expected */ } else { panic!("did not return"); } + + if let Ok(RunResult::Returned(_)) = run_res { /* expected */ } else { panic!("did not return"); } } } )*