diff --git a/crates/test-programs/command-tests/src/bin/stream_pollable_lifetimes.rs b/crates/test-programs/command-tests/src/bin/stream_pollable_lifetimes.rs new file mode 100644 index 000000000000..1ccc7106436d --- /dev/null +++ b/crates/test-programs/command-tests/src/bin/stream_pollable_lifetimes.rs @@ -0,0 +1,28 @@ +use command_tests::wasi::cli_base::environment; +use command_tests::wasi::cli_base::stdin; +use command_tests::wasi::io::streams; +use command_tests::wasi::poll::poll; + +fn main() { + let args = environment::get_arguments(); + + if args == &["correct"] { + let stdin: streams::InputStream = stdin::get_stdin(); + let stdin_pollable = streams::subscribe_to_input_stream(stdin); + let ready = poll::poll_oneoff(&[stdin_pollable]); + assert_eq!(ready, &[true]); + poll::drop_pollable(stdin_pollable); + streams::drop_input_stream(stdin); + } else if args == &["trap"] { + let stdin: streams::InputStream = stdin::get_stdin(); + let stdin_pollable = streams::subscribe_to_input_stream(stdin); + let ready = poll::poll_oneoff(&[stdin_pollable]); + assert_eq!(ready, &[true]); + streams::drop_input_stream(stdin); + unreachable!( + "execution should have trapped in line above when stream dropped before pollable" + ); + } else { + panic!("bad value for args: expected `[\"correct\"]` or `[\"trap\"]`, got {args:?}") + } +} diff --git a/crates/test-programs/command-tests/src/lib.rs b/crates/test-programs/command-tests/src/lib.rs new file mode 100644 index 000000000000..13a414db4af1 --- /dev/null +++ b/crates/test-programs/command-tests/src/lib.rs @@ -0,0 +1 @@ +wit_bindgen::generate!("test-command" in "../../wasi/wit"); diff --git a/crates/test-programs/tests/command.rs b/crates/test-programs/tests/command.rs index 9dae33b0a409..c779b998abfd 100644 --- a/crates/test-programs/tests/command.rs +++ b/crates/test-programs/tests/command.rs @@ -443,3 +443,50 @@ async fn read_only() -> Result<()> { .await? .map_err(|()| anyhow::anyhow!("command returned with failing exit status")) } + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn stream_pollable_lifetimes() -> Result<()> { + // Test program has two modes, dispatching based on argument. + { + // Correct execution: should succeed + let mut table = Table::new(); + let wasi = WasiCtxBuilder::new() + .set_args(&["correct"]) + .set_stdin(MemoryInputPipe::new(" ".into())) + .build(&mut table)?; + + let (mut store, command) = instantiate( + get_component("stream_pollable_lifetimes"), + CommandCtx { table, wasi }, + ) + .await?; + + command + .call_run(&mut store) + .await? + .map_err(|()| anyhow::anyhow!("command returned with failing exit status"))?; + } + { + // Incorrect execution: should trap with a TableError::HasChildren + let mut table = Table::new(); + let wasi = WasiCtxBuilder::new() + .set_args(&["trap"]) + .set_stdin(MemoryInputPipe::new(" ".into())) + .build(&mut table)?; + + let (mut store, command) = instantiate( + get_component("stream_pollable_lifetimes"), + CommandCtx { table, wasi }, + ) + .await?; + + let trap = command + .call_run(&mut store) + .await + .err() + .expect("should trap"); + use wasmtime_wasi::preview2::TableError; + assert!(matches!(trap.downcast_ref(), Some(TableError::HasChildren))); + } + Ok(()) +} diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 69f2e437a2f2..50fa5aedb4a2 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -37,7 +37,7 @@ pub use self::filesystem::{DirPerms, FilePerms}; pub use self::poll::{ClosureFuture, HostPollable, MakeFuture, PollableFuture, TablePollableExt}; pub use self::random::{thread_rng, Deterministic}; pub use self::stream::{HostInputStream, HostOutputStream, StreamState, TableStreamExt}; -pub use self::table::{Table, TableError}; +pub use self::table::{OccupiedEntry, Table, TableError}; pub use cap_fs_ext::SystemTimeSpec; pub use cap_rand::RngCore; diff --git a/crates/wasi/src/preview2/poll.rs b/crates/wasi/src/preview2/poll.rs index 883edf63d138..a8329b8f2ab7 100644 --- a/crates/wasi/src/preview2/poll.rs +++ b/crates/wasi/src/preview2/poll.rs @@ -23,18 +23,6 @@ pub enum HostPollable { /// Create a Future by calling a fn on another resource in the table. This /// indirection means the created Future can use a mut borrow of another /// resource in the Table (e.g. a stream) - /// - /// FIXME: we currently aren't tracking the lifetime of the resource along - /// with this entry, which means that this index could be occupied by something - /// unrelated by the time we poll it again. This is a crash vector, because - /// the [`MakeFuture`] would panic if the type of the index has changed, and - /// would yield undefined behavior otherwise. We'll likely fix this by making - /// the parent resources of a pollable clean up their pollable entries when - /// they are destroyed (e.g. the HostInputStream would track the pollables it - /// has created). - /// - /// WARNING: do not deploy this library to production until the above issue has - /// been fixed. TableEntry { index: u32, make_future: MakeFuture }, /// Create a future by calling an owned, static closure. This is used for /// pollables which do not share state with another resource in the Table @@ -50,7 +38,10 @@ pub trait TablePollableExt { impl TablePollableExt for Table { fn push_host_pollable(&mut self, p: HostPollable) -> Result { - self.push(Box::new(p)) + match p { + HostPollable::TableEntry { index, .. } => self.push_child(Box::new(p), index), + HostPollable::Closure { .. } => self.push(Box::new(p)), + } } fn get_host_pollable_mut(&mut self, fd: u32) -> Result<&mut HostPollable, TableError> { self.get_mut::(fd) diff --git a/crates/wasi/src/preview2/preview1/mod.rs b/crates/wasi/src/preview2/preview1/mod.rs index 0bbbe1c9b67d..63ff1a4fbfd6 100644 --- a/crates/wasi/src/preview2/preview1/mod.rs +++ b/crates/wasi/src/preview2/preview1/mod.rs @@ -527,18 +527,9 @@ impl TryFrom for types::Error { } } -impl From for types::Errno { - fn from(err: TableError) -> Self { - match err { - TableError::Full => types::Errno::Nomem, - TableError::NotPresent | TableError::WrongType => types::Errno::Badf, - } - } -} - impl From for types::Error { fn from(err: TableError) -> Self { - types::Errno::from(err).into() + types::Error::trap(err.into()) } } diff --git a/crates/wasi/src/preview2/preview2/filesystem.rs b/crates/wasi/src/preview2/preview2/filesystem.rs index 6d582e1e62f5..27f40c7d9393 100644 --- a/crates/wasi/src/preview2/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/preview2/filesystem.rs @@ -9,17 +9,8 @@ use filesystem::ErrorCode; mod sync; impl From for filesystem::Error { - fn from(error: TableError) -> filesystem::Error { - match error { - TableError::Full => filesystem::Error::trap(anyhow::anyhow!(error)), - TableError::NotPresent | TableError::WrongType => ErrorCode::BadDescriptor.into(), - } - } -} - -impl From for filesystem::Error { - fn from(error: tokio::task::JoinError) -> Self { - Self::trap(anyhow::anyhow!(error)) + fn from(error: TableError) -> Self { + Self::trap(error.into()) } } diff --git a/crates/wasi/src/preview2/preview2/io.rs b/crates/wasi/src/preview2/preview2/io.rs index 5b7e58fff168..04fb8ed81f1e 100644 --- a/crates/wasi/src/preview2/preview2/io.rs +++ b/crates/wasi/src/preview2/preview2/io.rs @@ -23,13 +23,7 @@ impl From for streams::Error { impl From for streams::Error { fn from(error: TableError) -> streams::Error { - match error { - TableError::Full => streams::Error::trap(anyhow!(error)), - TableError::NotPresent | TableError::WrongType => { - // wit definition needs to define a badf-equiv variant: - StreamError { dummy: 0 }.into() - } - } + streams::Error::trap(anyhow!(error)) } } diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index 1df5de0346c5..625d4a3a1d32 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -193,7 +193,7 @@ impl TableStreamExt for Table { let occ = self.entry(fd)?; match occ.get().downcast_ref::() { Some(InternalInputStream::Host(_)) => { - let (_, any) = occ.remove_entry(); + let any = occ.remove_entry()?; match *any.downcast().expect("downcast checked above") { InternalInputStream::Host(h) => Ok(h), _ => unreachable!("variant checked above"), @@ -219,7 +219,7 @@ impl TableStreamExt for Table { let occ = self.entry(fd)?; match occ.get().downcast_ref::() { Some(InternalOutputStream::Host(_)) => { - let (_, any) = occ.remove_entry(); + let any = occ.remove_entry()?; match *any.downcast().expect("downcast checked above") { InternalOutputStream::Host(h) => Ok(h), _ => unreachable!("variant checked above"), diff --git a/crates/wasi/src/preview2/table.rs b/crates/wasi/src/preview2/table.rs index 814fdf3f15c4..7761dd74b1e2 100644 --- a/crates/wasi/src/preview2/table.rs +++ b/crates/wasi/src/preview2/table.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; #[derive(thiserror::Error, Debug)] pub enum TableError { @@ -9,6 +9,8 @@ pub enum TableError { NotPresent, #[error("value is of another type")] WrongType, + #[error("entry still has children")] + HasChildren, } /// The `Table` type is designed to map u32 handles to resources. The table is now part of the @@ -20,21 +22,127 @@ pub enum TableError { /// up. Right now it is just an approximation. #[derive(Debug)] pub struct Table { - pub(crate) map: HashMap>, + map: HashMap, next_key: u32, } +/// This structure tracks parent and child relationships for a given table entry. +/// +/// Parents and children are referred to by table index. We maintain the +/// following invariants to prevent orphans and cycles: +/// * parent can only be assigned on creating the entry. +/// * parent, if some, must exist when creating the entry. +/// * whenever a child is created, its index is added to children. +/// * whenever a child is deleted, its index is removed from children. +/// * an entry with children may not be deleted. +#[derive(Debug)] +struct TableEntry { + /// The entry in the table, as a boxed dynamically-typed object + entry: Box, + /// The index of the parent of this entry, if it has one. + parent: Option, + /// The indicies of any children of this entry. + children: BTreeSet, +} + +impl TableEntry { + fn new(entry: Box, parent: Option) -> Self { + Self { + entry, + parent, + children: BTreeSet::new(), + } + } + fn add_child(&mut self, child: u32) { + debug_assert!(!self.children.contains(&child)); + self.children.insert(child); + } + fn remove_child(&mut self, child: u32) { + let was_removed = self.children.remove(&child); + debug_assert!(was_removed); + } +} + +/// Like [`std::collections::hash_map::OccupiedEntry`], with a subset of +/// methods available in order to uphold [`Table`] invariants. +pub struct OccupiedEntry<'a> { + table: &'a mut Table, + index: u32, +} +impl<'a> OccupiedEntry<'a> { + /// Get the dynamically-typed reference to the resource. + pub fn get(&self) -> &(dyn Any + Send + Sync + 'static) { + self.table.map.get(&self.index).unwrap().entry.as_ref() + } + /// Get the dynamically-typed mutable reference to the resource. + pub fn get_mut(&mut self) -> &mut (dyn Any + Send + Sync + 'static) { + self.table.map.get_mut(&self.index).unwrap().entry.as_mut() + } + /// Remove the resource from the table, returning the contents of the + /// resource. + /// May fail with [`TableError::HasChildren`] if the entry has any + /// children, see [`Table::push_child`]. + /// If this method fails, the [`OccupiedEntry`] is consumed, but the + /// resource remains in the table. + pub fn remove_entry(self) -> Result, TableError> { + self.table.delete_entry(self.index).map(|e| e.entry) + } +} + impl Table { - /// Create an empty table. New insertions will begin at 3, above stdio. + /// Create an empty table pub fn new() -> Self { Table { map: HashMap::new(), - next_key: 3, // 0, 1 and 2 are reserved for stdio + // 0, 1 and 2 are formerly (preview 1) for stdio. To prevent users from assuming these + // indicies are still valid ways to access stdio, they are deliberately left empty. + // Once we have a full implementation of resources, this confusion should hopefully be + // impossible :) + next_key: 3, } } /// Insert a resource at the next available index. - pub fn push(&mut self, a: Box) -> Result { + pub fn push(&mut self, entry: Box) -> Result { + self.push_(TableEntry::new(entry, None)) + } + + /// Insert a resource at the next available index, and track that it has a + /// parent resource. + /// + /// The parent must exist to create a child. All children resources must + /// be destroyed before a parent can be destroyed - otherwise [`Table::delete`] + /// or [`OccupiedEntry::remove_entry`] will fail with + /// [`TableError::HasChildren`]. + /// + /// Parent-child relationships are tracked inside the table to ensure that + /// a parent resource is not deleted while it has live children. This + /// allows child resources to hold "references" to a parent by table + /// index, to avoid needing e.g. an `Arc>` and the associated + /// locking overhead and design issues, such as child existence extending + /// lifetime of parent referent even after parent resource is destroyed, + /// possibility for deadlocks. + /// + /// Parent-child relationships may not be modified once created. There + /// is no way to observe these relationships through the [`Table`] methods + /// except for erroring on deletion, or the [`std::fmt::Debug`] impl. + pub fn push_child( + &mut self, + entry: Box, + parent: u32, + ) -> Result { + if !self.contains_key(parent) { + return Err(TableError::NotPresent); + } + let child = self.push_(TableEntry::new(entry, Some(parent)))?; + self.map + .get_mut(&parent) + .expect("parent existence assured above") + .add_child(child); + Ok(child) + } + + fn push_(&mut self, e: TableEntry) -> Result { // NOTE: The performance of this new key calculation could be very bad once keys wrap // around. if self.map.len() == u32::MAX as usize { @@ -46,7 +154,7 @@ impl Table { if self.map.contains_key(&key) { continue; } - self.map.insert(key, a); + self.map.insert(key, e); return Ok(key); } } @@ -60,7 +168,7 @@ impl Table { /// Note: this will always fail if the resource is already borrowed. pub fn is(&self, key: u32) -> bool { if let Some(r) = self.map.get(&key) { - r.is::() + r.entry.is::() } else { false } @@ -71,7 +179,9 @@ impl Table { /// results in a trapping error. pub fn get(&self, key: u32) -> Result<&T, TableError> { if let Some(r) = self.map.get(&key) { - r.downcast_ref::().ok_or_else(|| TableError::WrongType) + r.entry + .downcast_ref::() + .ok_or_else(|| TableError::WrongType) } else { Err(TableError::NotPresent) } @@ -81,42 +191,76 @@ impl Table { /// reference can be borrowed at any given time. Borrow failure results in a trapping error. pub fn get_mut(&mut self, key: u32) -> Result<&mut T, TableError> { if let Some(r) = self.map.get_mut(&key) { - r.downcast_mut::().ok_or_else(|| TableError::WrongType) + r.entry + .downcast_mut::() + .ok_or_else(|| TableError::WrongType) } else { Err(TableError::NotPresent) } } - /// Get an [`std::collections::hash_map::OccupiedEntry`] corresponding to - /// a table entry, if it exists. This allows you to remove or replace the - /// entry based on its contents. - pub fn entry( - &mut self, - key: u32, - ) -> Result< - std::collections::hash_map::OccupiedEntry>, - TableError, - > { - use std::collections::hash_map::Entry; - match self.map.entry(key) { - Entry::Occupied(occ) => Ok(occ), - Entry::Vacant(_) => Err(TableError::NotPresent), + /// Get an [`OccupiedEntry`] corresponding to a table entry, if it exists. This allows you to + /// remove or replace the entry based on its contents. The methods available are a subset of + /// [`std::collections::hash_map::OccupiedEntry`] - it does not give access to the key, it + /// restricts replacing the entry to items of the same type, and it does not allow for deletion. + pub fn entry(&mut self, index: u32) -> Result { + if self.map.contains_key(&index) { + Ok(OccupiedEntry { table: self, index }) + } else { + Err(TableError::NotPresent) } } - /// Remove a resource at a given index from the table. - pub fn delete(&mut self, key: u32) -> Result { - // Optimistically attempt to remove the value stored under key - match self + fn delete_entry(&mut self, key: u32) -> Result { + if !self .map - .remove(&key) + .get(&key) .ok_or(TableError::NotPresent)? - .downcast::() + .children + .is_empty() { + return Err(TableError::HasChildren); + } + let e = self.map.remove(&key).unwrap(); + if let Some(parent) = e.parent { + // Remove deleted resource from parent's child list. + // Parent must still be present because it cant be deleted while still having + // children: + self.map + .get_mut(&parent) + .expect("missing parent") + .remove_child(key); + } + Ok(e) + } + + /// Remove a resource at a given index from the table. + /// + /// If this method fails, the resource remains in the table. + /// + /// May fail with [`TableError::HasChildren`] if the resource has any live + /// children. + pub fn delete(&mut self, key: u32) -> Result { + let e = self.delete_entry(key)?; + match e.entry.downcast::() { Ok(v) => Ok(*v), - Err(v) => { - // Insert the value back, since the downcast failed - self.map.insert(key, v); + Err(entry) => { + // Re-insert into parent list + if let Some(parent) = e.parent { + self.map + .get_mut(&parent) + .expect("already checked parent exists") + .add_child(key); + } + // Insert the value back + self.map.insert( + key, + TableEntry { + entry, + children: e.children, + parent: e.parent, + }, + ); Err(TableError::WrongType) } } @@ -133,7 +277,7 @@ impl Table { let item = self .map .get_mut(&k) - .map(Box::as_mut) + .map(|e| Box::as_mut(&mut e.entry)) // Safety: extending the lifetime of the mutable reference. .map(|item| unsafe { &mut *(item as *mut dyn Any) }) .ok_or(TableError::NotPresent); diff --git a/crates/wasi/wit/test.wit b/crates/wasi/wit/test.wit index a362fd4d906e..9c0fda58d0b8 100644 --- a/crates/wasi/wit/test.wit +++ b/crates/wasi/wit/test.wit @@ -17,3 +17,12 @@ world test-reactor { use wasi:filesystem/filesystem.{descriptor-stat} export pass-an-imported-record: func(d: descriptor-stat) -> string } + +world test-command { + import wasi:poll/poll + import wasi:io/streams + import wasi:cli-base/environment + import wasi:cli-base/stdin + import wasi:cli-base/stdout + import wasi:cli-base/stderr +}