From 2c8ccc692085cc49ddc2c9f5235dc3eab6ed7df9 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Fri, 21 Jul 2023 16:29:42 -0700 Subject: [PATCH 01/18] Add a helper for writing stdin tests --- crates/wasi/Cargo.toml | 3 + crates/wasi/src/preview2/stdio.rs | 142 ++++++++++++++++++++++-------- 2 files changed, 109 insertions(+), 36 deletions(-) diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 8eb123055740..ab6655a79a02 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -42,6 +42,9 @@ tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt [target.'cfg(unix)'.dependencies] rustix = { workspace = true, features = ["fs"], optional = true } +[target.'cfg(unix)'.dev-dependencies] +libc = { workspace = true } + [target.'cfg(windows)'.dependencies] io-extras = { workspace = true } windows-sys = { workspace = true } diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 8386f6611ae8..9cc349ccb2ad 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -23,45 +23,115 @@ pub fn stderr() -> Stderr { #[cfg(all(unix, test))] mod test { + use libc; + use std::fs::File; + use std::io::{BufRead, BufReader, Write}; + use std::os::fd::FromRawFd; + + fn test_child_stdin(child: T, parent: P) + where + T: FnOnce(File), + P: FnOnce(File, BufReader), + { + unsafe { + // Make pipe for emulating stdin. + let mut stdin_fds: [libc::c_int; 2] = [0; 2]; + assert_eq!( + libc::pipe(stdin_fds.as_mut_ptr()), + 0, + "Failed to create stdin pipe" + ); + let [stdin_read, stdin_write] = stdin_fds; + + // Make pipe for getting results. + let mut result_fds: [libc::c_int; 2] = [0; 2]; + assert_eq!( + libc::pipe(result_fds.as_mut_ptr()), + 0, + "Failed to create result pipe" + ); + let [result_read, result_write] = result_fds; + + let child_pid = libc::fork(); + if child_pid == 0 { + libc::close(stdin_write); + libc::close(result_read); + + libc::close(libc::STDIN_FILENO); + libc::dup2(stdin_read, libc::STDIN_FILENO); + + let result_write = File::from_raw_fd(result_write); + child(result_write); + } else { + libc::close(stdin_read); + libc::close(result_write); + + let stdin_write = File::from_raw_fd(stdin_write); + let result_read = BufReader::new(File::from_raw_fd(result_read)); + parent(stdin_write, result_read); + } + } + } + // This could even be parameterized somehow to use the worker thread stdin vs the asyncfd // stdin. #[test] fn test_stdin_by_forking() { - // Make pipe for emulating stdin. - // Make pipe for getting results. - // Fork. - // When child: - // close stdin fd. - // use dup2 to turn the pipe recv end into the stdin fd. - // in a tokio runtime: - // let stdin = super::stdin(); - // // Make sure the initial state is that stdin is not ready: - // if timeout(stdin.ready().await).is_timeout() { - // send "start\n" on result pipe. - // } - // loop { - // match timeout(stdin.ready().await) { - // Ok => { - // let bytes = stdin.read(); - // if bytes == ending sentinel: - // exit - // if bytes == some other sentinel: - // return and go back to the thing where we start the tokio runtime, - // testing that when creating a new super::stdin() it works correctly - // send "got: {bytes:?}\n" on result pipe. - // } - // Err => { - // send "timed out\n" on result pipe. - // } - // } - // } - // When parent: - // wait to recv "start\n" on result pipe (or the child process exits) - // send some bytes to child stdin. - // make sure we get back "got {bytes:?}" on result pipe (or the child process exits) - // sleep for a while. - // make sure we get back "timed out" on result pipe (or the child process exits) - // send some bytes again. and etc. - // + test_child_stdin( + |mut result_write| { + // in a tokio runtime: + // let stdin = super::stdin(); + // // Make sure the initial state is that stdin is not ready: + // if timeout(stdin.ready().await).is_timeout() { + // send "start\n" on result pipe. + // } + // loop { + // match timeout(stdin.ready().await) { + // Ok => { + // let bytes = stdin.read(); + // if bytes == ending sentinel: + // exit + // if bytes == some other sentinel: + // return and go back to the thing where we start the tokio runtime, + // testing that when creating a new super::stdin() it works correctly + // send "got: {bytes:?}\n" on result pipe. + // } + // Err => { + // send "timed out\n" on result pipe. + // } + // } + // } + + tokio::runtime::Runtime::new().unwrap().block_on(async { + use tokio::io::AsyncReadExt; + + let mut stdin = tokio::io::stdin(); + + let mut buf = [0u8; 1024]; + { + let r = tokio::time::timeout( + std::time::Duration::from_millis(100), + stdin.read(&mut buf[..1]), + ) + .await; + assert!(r.is_err(), "stdin available too soon"); + } + + writeln!(&mut result_write, "start").unwrap(); + }); + }, + |mut stdin_write, mut result_read| { + // wait to recv "start\n" on result pipe (or the child process exits) + let mut line = String::new(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "start\n"); + + // send some bytes to child stdin. + // make sure we get back "got {bytes:?}" on result pipe (or the child process exits) + // sleep for a while. + // make sure we get back "timed out" on result pipe (or the child process exits) + // send some bytes again. and etc. + }, + ) } } From ff96ac38e6a764346fae52baff203b526637ef17 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Fri, 21 Jul 2023 16:58:47 -0700 Subject: [PATCH 02/18] Test a use of preview2::stdio::Stdin on unix --- crates/wasi/src/preview2/stdio.rs | 88 ++++++++++++++++--------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 9cc349ccb2ad..291b3f71d143 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -23,6 +23,7 @@ pub fn stderr() -> Stderr { #[cfg(all(unix, test))] mod test { + use crate::preview2::{StreamState, HostInputStream}; use libc; use std::fs::File; use std::io::{BufRead, BufReader, Write}; @@ -79,58 +80,61 @@ mod test { fn test_stdin_by_forking() { test_child_stdin( |mut result_write| { - // in a tokio runtime: - // let stdin = super::stdin(); - // // Make sure the initial state is that stdin is not ready: - // if timeout(stdin.ready().await).is_timeout() { - // send "start\n" on result pipe. - // } - // loop { - // match timeout(stdin.ready().await) { - // Ok => { - // let bytes = stdin.read(); - // if bytes == ending sentinel: - // exit - // if bytes == some other sentinel: - // return and go back to the thing where we start the tokio runtime, - // testing that when creating a new super::stdin() it works correctly - // send "got: {bytes:?}\n" on result pipe. - // } - // Err => { - // send "timed out\n" on result pipe. - // } - // } - // } - tokio::runtime::Runtime::new().unwrap().block_on(async { - use tokio::io::AsyncReadExt; - - let mut stdin = tokio::io::stdin(); - - let mut buf = [0u8; 1024]; - { - let r = tokio::time::timeout( - std::time::Duration::from_millis(100), - stdin.read(&mut buf[..1]), - ) - .await; - assert!(r.is_err(), "stdin available too soon"); - } + let mut stdin = super::stdin(); + + assert!( + tokio::time::timeout(std::time::Duration::from_millis(100), stdin.ready()) + .await + .is_err(), + "stdin available too soon" + ); writeln!(&mut result_write, "start").unwrap(); + + let mut buffer = String::new(); + loop { + println!("child: waiting for stdin to be ready"); + stdin.ready().await.unwrap(); + + println!("child: reading input"); + let (bytes, status) = stdin.read(1024).unwrap(); + + println!("child: {:?}, {:?}", bytes, status); + + // We can't effectively test for the case where stdin was closed. + assert_eq!(status, StreamState::Open); + + buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); + if let Some((line,rest)) = buffer.split_once('\n') { + + if line == "all done" { + writeln!(&mut result_write, "done").unwrap(); + break; + } else { + writeln!(&mut result_write, "{}", line).unwrap(); + } + + buffer = rest.to_owned(); + } + } }); }, |mut stdin_write, mut result_read| { - // wait to recv "start\n" on result pipe (or the child process exits) let mut line = String::new(); result_read.read_line(&mut line).unwrap(); assert_eq!(line, "start\n"); - // send some bytes to child stdin. - // make sure we get back "got {bytes:?}" on result pipe (or the child process exits) - // sleep for a while. - // make sure we get back "timed out" on result pipe (or the child process exits) - // send some bytes again. and etc. + writeln!(&mut stdin_write, "some bytes").unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "some bytes\n"); + + writeln!(&mut stdin_write, "all done").unwrap(); + + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "done\n"); }, ) } From 50fb0ed7726dc94d2528bcf36ed04c22b230a514 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Fri, 21 Jul 2023 17:08:08 -0700 Subject: [PATCH 03/18] Abstract the stdin test, and apply it to tokio::stdin and worker_thread_stdin --- crates/wasi/src/preview2/stdio.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 291b3f71d143..c5c473fb98ac 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -23,7 +23,7 @@ pub fn stderr() -> Stderr { #[cfg(all(unix, test))] mod test { - use crate::preview2::{StreamState, HostInputStream}; + use crate::preview2::{HostInputStream, StreamState}; use libc; use std::fs::File; use std::io::{BufRead, BufReader, Write}; @@ -76,12 +76,15 @@ mod test { // This could even be parameterized somehow to use the worker thread stdin vs the asyncfd // stdin. - #[test] - fn test_stdin_by_forking() { + fn test_stdin_by_forking(mk_stdin: T) + where + S: HostInputStream, + T: FnOnce() -> S, + { test_child_stdin( |mut result_write| { tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stdin = super::stdin(); + let mut stdin = mk_stdin(); assert!( tokio::time::timeout(std::time::Duration::from_millis(100), stdin.ready()) @@ -106,8 +109,7 @@ mod test { assert_eq!(status, StreamState::Open); buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); - if let Some((line,rest)) = buffer.split_once('\n') { - + if let Some((line, rest)) = buffer.split_once('\n') { if line == "all done" { writeln!(&mut result_write, "done").unwrap(); break; @@ -138,4 +140,16 @@ mod test { }, ) } + + #[test] + fn test_async_fd_stdin() { + test_stdin_by_forking(super::stdin); + } + + #[test] + // worker_thread_stdin is currently under development + #[should_panic] + fn test_worker_thread_stdin() { + test_stdin_by_forking(super::worker_thread_stdin::stdin); + } } From 922a90e6bb1017b64520e75b58f6982cfb211e6f Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 24 Jul 2023 16:47:16 -0700 Subject: [PATCH 04/18] re-implement worker thread stdin with a watch --- crates/wasi/src/preview2/stdio.rs | 2 - .../src/preview2/stdio/worker_thread_stdin.rs | 151 +++++++++--------- 2 files changed, 72 insertions(+), 81 deletions(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index c5c473fb98ac..ad96ec8fa05e 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -147,8 +147,6 @@ mod test { } #[test] - // worker_thread_stdin is currently under development - #[should_panic] fn test_worker_thread_stdin() { test_stdin_by_forking(super::worker_thread_stdin::stdin); } diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index 353b5c090e62..457600cde931 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -1,7 +1,9 @@ use crate::preview2::{HostInputStream, StreamState}; use anyhow::{Context, Error}; -use bytes::Bytes; -use tokio::sync::{mpsc, oneshot}; +use bytes::{Bytes, BytesMut}; +use std::io::Read; +use std::sync::Arc; +use tokio::sync::watch; // wasmtime cant use std::sync::OnceLock yet because of a llvm regression in // 1.70. when 1.71 is released, we can switch to using std here. @@ -13,105 +15,96 @@ use std::sync::Mutex; // this instance registers the process's stdin fd with epoll, which will // return an error if an fd is registered more than once. struct GlobalStdin { - tx: mpsc::Sender>>, - // FIXME use a Watch to check for readiness instead of sending a oneshot sender + rx: watch::Receiver<()>, + state: Arc>, } -static STDIN: OnceLock> = OnceLock::new(); +struct StdinState { + buffer: BytesMut, + error: Option, + closed: bool, +} + +static STDIN: OnceLock = OnceLock::new(); + +fn create() -> GlobalStdin { + let (tx, rx) = watch::channel(()); + + let state = Arc::new(Mutex::new(StdinState { + buffer: BytesMut::new(), + error: None, + closed: false, + })); + + let ret = GlobalStdin { + state: state.clone(), + rx, + }; -fn create() -> Mutex { - let (tx, mut rx) = mpsc::channel::>>(1); - std::thread::spawn(move || { - use std::io::BufRead; - // A client is interested in stdin's readiness. - // Don't care about the None case - the GlobalStdin sender on the other - // end of this pipe will live forever, because it lives inside the OnceLock. - while let Some(msg) = rx.blocking_recv() { - // Fill buf - can we skip this if its - // already filled? - // also, this could block forever and the - // client could give up. in that case, - // another client may want to start waiting - let r = std::io::stdin() - .lock() - .fill_buf() - .map(|_| ()) - .map_err(anyhow::Error::from); - // tell the client stdin is ready for reading. - // don't care if the client happens to have died. - let _ = msg.send(r); + std::thread::spawn(move || loop { + let mut bytes = BytesMut::zeroed(1024); + match std::io::stdin().lock().read(&mut bytes) { + Ok(nbytes) => { + bytes.truncate(nbytes); + { + let mut locked = state.lock().unwrap(); + locked.buffer.extend_from_slice(&bytes); + } + tx.send(()).expect("at least one rx exists"); + } + Err(e) => { + { + let mut locked = state.lock().unwrap(); + if locked.error.is_none() { + locked.error = Some(e) + } + locked.closed = true; + } + tx.send(()).expect("at least one rx exists"); + } } }); - - Mutex::new(GlobalStdin { tx }) + ret } pub struct Stdin; impl Stdin { - fn get_global() -> &'static Mutex { + fn get_global() -> &'static GlobalStdin { STDIN.get_or_init(|| create()) } } pub fn stdin() -> Stdin { - // This implementation still needs to be fixed, and we need better test coverage. - // We are deferring that work to a future PR. - // https://github.com/bytecodealliance/wasmtime/pull/6556#issuecomment-1646232646 - panic!("worker-thread based stdin is not yet implemented"); - // Stdin + Stdin } #[async_trait::async_trait] impl HostInputStream for Stdin { fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - use std::io::Read; - let mut buf = vec![0; size]; - // FIXME: this is actually blocking. This whole implementation is likely bogus as a result - let nbytes = std::io::stdin().read(&mut buf)?; - buf.truncate(nbytes); - Ok(( - buf.into(), - if nbytes > 0 { - StreamState::Open - } else { - StreamState::Closed - }, - )) + let mut locked = Stdin::get_global().state.lock().unwrap(); + if let Some(e) = locked.error.take() { + return Err(e.into()); + } + let size = locked.buffer.len().min(size); + let bytes = locked.buffer.split_to(size); + let state = if locked.buffer.is_empty() && locked.closed { + StreamState::Closed + } else { + StreamState::Open + }; + Ok((bytes.freeze(), state)) } async fn ready(&mut self) -> Result<(), Error> { - use mpsc::error::TrySendError; - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - // Custom Future impl takes the std mutex in each invocation of poll. - // Required so we don't have to use a tokio mutex, which we can't take from - // inside a sync context in Self::read. - // - // Take the lock, attempt to - struct Send(Option>>); - impl Future for Send { - type Output = anyhow::Result<()>; - fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll { - let locked = Stdin::get_global().lock().unwrap(); - let to_send = self.as_mut().0.take().expect("to_send should be some"); - match locked.tx.try_send(to_send) { - Ok(()) => Poll::Ready(Ok(())), - Err(TrySendError::Full(to_send)) => { - self.as_mut().0.replace(to_send); - Poll::Pending - } - Err(TrySendError::Closed(_)) => { - Poll::Ready(Err(anyhow::anyhow!("channel to GlobalStdin closed"))) - } - } + let g = Stdin::get_global(); + // Make sure we dont hold this lock across the await: + { + let locked = g.state.lock().unwrap(); + if !locked.buffer.is_empty() || locked.error.is_some() || locked.closed { + return Ok(()); } - } + }; - let (result_tx, rx) = oneshot::channel::>(); - Box::pin(Send(Some(result_tx))) - .await - .context("sending message to worker thread")?; - rx.await.expect("channel is always alive") + let mut rx = g.rx.clone(); + rx.changed().await.context("stdin sender died") } } From bae032deffd0e81d2194cbe2cc44abab47b5e013 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 24 Jul 2023 16:47:56 -0700 Subject: [PATCH 05/18] poll_oneoff_stdio should work on windows now --- crates/test-programs/tests/wasi-preview2-components-sync.rs | 1 - crates/test-programs/tests/wasi-preview2-components.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/crates/test-programs/tests/wasi-preview2-components-sync.rs b/crates/test-programs/tests/wasi-preview2-components-sync.rs index 85e8cea10ca4..37a03d2aa862 100644 --- a/crates/test-programs/tests/wasi-preview2-components-sync.rs +++ b/crates/test-programs/tests/wasi-preview2-components-sync.rs @@ -245,7 +245,6 @@ fn poll_oneoff_files() { run("poll_oneoff_files", false).unwrap() } -#[cfg_attr(windows, should_panic)] #[test_log::test] fn poll_oneoff_stdio() { run("poll_oneoff_stdio", true).unwrap() diff --git a/crates/test-programs/tests/wasi-preview2-components.rs b/crates/test-programs/tests/wasi-preview2-components.rs index 021438d55814..947a9eea04e2 100644 --- a/crates/test-programs/tests/wasi-preview2-components.rs +++ b/crates/test-programs/tests/wasi-preview2-components.rs @@ -251,7 +251,6 @@ async fn poll_oneoff_files() { run("poll_oneoff_files", false).await.unwrap() } -#[cfg_attr(windows, should_panic)] #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn poll_oneoff_stdio() { run("poll_oneoff_stdio", true).await.unwrap() From 6b999138c2a08eb2a2ce56124e9065ef6046c377 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 24 Jul 2023 16:51:02 -0700 Subject: [PATCH 06/18] comments --- crates/wasi/src/preview2/stdio/worker_thread_stdin.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index 457600cde931..1b6f00924417 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -11,16 +11,19 @@ use once_cell::sync::OnceCell as OnceLock; use std::sync::Mutex; -// We need a single global instance of the AsyncFd because creating -// this instance registers the process's stdin fd with epoll, which will -// return an error if an fd is registered more than once. struct GlobalStdin { + // Watch receiver impls Clone, so any interested readers can make a copy and .changed().await. rx: watch::Receiver<()>, + // Worker thread and receivers share this state state: Arc>, } + struct StdinState { + // Bytes read off stdin. buffer: BytesMut, + // Error read off stdin, if any. error: Option, + // If an error has occured in the past, we consider the stream closed. closed: bool, } @@ -66,8 +69,10 @@ fn create() -> GlobalStdin { ret } +/// Only public interface is the [`HostInputStream`] impl. pub struct Stdin; impl Stdin { + // Private! Only required internally. fn get_global() -> &'static GlobalStdin { STDIN.get_or_init(|| create()) } From e883eb43eab88dda08be6e88a30cf45beb17fe2e Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 24 Jul 2023 17:30:51 -0700 Subject: [PATCH 07/18] test: show that restarting --- crates/wasi/src/preview2/stdio.rs | 94 ++++++++++++------- .../src/preview2/stdio/worker_thread_stdin.rs | 60 +++++++----- 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index ad96ec8fa05e..92130c03578f 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -79,48 +79,58 @@ mod test { fn test_stdin_by_forking(mk_stdin: T) where S: HostInputStream, - T: FnOnce() -> S, + T: Fn() -> S, { test_child_stdin( |mut result_write| { - tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stdin = mk_stdin(); - - assert!( - tokio::time::timeout(std::time::Duration::from_millis(100), stdin.ready()) + let mut running = true; + while running { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stdin = mk_stdin(); + + assert!( + tokio::time::timeout( + std::time::Duration::from_millis(100), + stdin.ready() + ) .await .is_err(), - "stdin available too soon" - ); - - writeln!(&mut result_write, "start").unwrap(); - - let mut buffer = String::new(); - loop { - println!("child: waiting for stdin to be ready"); - stdin.ready().await.unwrap(); - - println!("child: reading input"); - let (bytes, status) = stdin.read(1024).unwrap(); - - println!("child: {:?}, {:?}", bytes, status); - - // We can't effectively test for the case where stdin was closed. - assert_eq!(status, StreamState::Open); - - buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); - if let Some((line, rest)) = buffer.split_once('\n') { - if line == "all done" { - writeln!(&mut result_write, "done").unwrap(); - break; - } else { - writeln!(&mut result_write, "{}", line).unwrap(); + "stdin available too soon" + ); + + writeln!(&mut result_write, "start").unwrap(); + + let mut buffer = String::new(); + loop { + println!("child: waiting for stdin to be ready"); + stdin.ready().await.unwrap(); + + println!("child: reading input"); + let (bytes, status) = stdin.read(1024).unwrap(); + + println!("child: {:?}, {:?}", bytes, status); + + // We can't effectively test for the case where stdin was closed. + assert_eq!(status, StreamState::Open); + + buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); + if let Some((line, rest)) = buffer.split_once('\n') { + if line == "all done" { + writeln!(&mut result_write, "done").unwrap(); + running = false; + break; + } else if line == "restart" { + writeln!(&mut result_write, "restarting").unwrap(); + break; + } else { + writeln!(&mut result_write, "{}", line).unwrap(); + } + + buffer = rest.to_owned(); } - - buffer = rest.to_owned(); } - } - }); + }); + } }, |mut stdin_write, mut result_read| { let mut line = String::new(); @@ -132,6 +142,20 @@ mod test { result_read.read_line(&mut line).unwrap(); assert_eq!(line, "some bytes\n"); + writeln!(&mut stdin_write, "restart").unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "restarting\n"); + line.clear(); + + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "start\n"); + + writeln!(&mut stdin_write, "more bytes").unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "more bytes\n"); + writeln!(&mut stdin_write, "all done").unwrap(); line.clear(); diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index 1b6f00924417..c3adc5f0f350 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -1,5 +1,5 @@ use crate::preview2::{HostInputStream, StreamState}; -use anyhow::{Context, Error}; +use anyhow::Error; use bytes::{Bytes, BytesMut}; use std::io::Read; use std::sync::Arc; @@ -12,12 +12,16 @@ use once_cell::sync::OnceCell as OnceLock; use std::sync::Mutex; struct GlobalStdin { - // Watch receiver impls Clone, so any interested readers can make a copy and .changed().await. - rx: watch::Receiver<()>, - // Worker thread and receivers share this state + // Worker thread uses this to notify of new events. Ready checks use this + // to create a new Receiver via .subscribe(). The newly created receiver + // will only wait for events created after the call to subscribe(). + tx: Arc>, + // Worker thread and receivers share this state to get bytes read off + // stdin, or the error/closed state. state: Arc>, } +#[derive(Debug)] struct StdinState { // Bytes read off stdin. buffer: BytesMut, @@ -30,7 +34,8 @@ struct StdinState { static STDIN: OnceLock = OnceLock::new(); fn create() -> GlobalStdin { - let (tx, rx) = watch::channel(()); + let (tx, _rx) = watch::channel(()); + let tx = Arc::new(tx); let state = Arc::new(Mutex::new(StdinState { buffer: BytesMut::new(), @@ -40,31 +45,30 @@ fn create() -> GlobalStdin { let ret = GlobalStdin { state: state.clone(), - rx, + tx: tx.clone(), }; std::thread::spawn(move || loop { let mut bytes = BytesMut::zeroed(1024); match std::io::stdin().lock().read(&mut bytes) { Ok(nbytes) => { + // Append to the buffer: bytes.truncate(nbytes); - { - let mut locked = state.lock().unwrap(); - locked.buffer.extend_from_slice(&bytes); - } - tx.send(()).expect("at least one rx exists"); + let mut locked = state.lock().unwrap(); + locked.buffer.extend_from_slice(&bytes); } Err(e) => { - { - let mut locked = state.lock().unwrap(); - if locked.error.is_none() { - locked.error = Some(e) - } - locked.closed = true; + // Set the error, and mark the stream as closed: + let mut locked = state.lock().unwrap(); + if locked.error.is_none() { + locked.error = Some(e) } - tx.send(()).expect("at least one rx exists"); + locked.closed = true; } } + // Receivers may or may not exist - fine if they dont, new + // ones will be created with subscribe() + let _ = tx.send(()); }); ret } @@ -85,7 +89,9 @@ pub fn stdin() -> Stdin { #[async_trait::async_trait] impl HostInputStream for Stdin { fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - let mut locked = Stdin::get_global().state.lock().unwrap(); + let g = Stdin::get_global(); + let mut locked = g.state.lock().unwrap(); + if let Some(e) = locked.error.take() { return Err(e.into()); } @@ -101,15 +107,23 @@ impl HostInputStream for Stdin { async fn ready(&mut self) -> Result<(), Error> { let g = Stdin::get_global(); - // Make sure we dont hold this lock across the await: - { + + // Block makes sure we dont hold the mutex across the await: + let mut rx = { let locked = g.state.lock().unwrap(); + // read() will only return (empty, open) when the buffer is empty, + // AND there is no error AND the stream is still open: if !locked.buffer.is_empty() || locked.error.is_some() || locked.closed { return Ok(()); } + // Sender will take the mutex before updating the state of + // subscribe, so this ensures we will only await for any stdin + // events that are recorded after we drop the mutex: + g.tx.subscribe() }; - let mut rx = g.rx.clone(); - rx.changed().await.context("stdin sender died") + rx.changed().await.expect("impossible for sender to drop"); + + Ok(()) } } From 0b693d90d49c59f17f6d6f2f3b8dcb389c2c5089 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Tue, 25 Jul 2023 10:37:24 -0700 Subject: [PATCH 08/18] reap tasks from AsyncReadStream and AsyncWriteStream when dropped the tasks will exit on their own if the receiver drops, but this should terminate them while they are awaiting on read/write. --- crates/wasi/src/preview2/pipe.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index 34eb8a34f03a..e579072c9f90 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -102,6 +102,7 @@ pub struct AsyncReadStream { state: StreamState, buffer: Option>, receiver: tokio::sync::mpsc::Receiver>, + join_handle: tokio::task::JoinHandle<()>, } impl AsyncReadStream { @@ -109,7 +110,7 @@ impl AsyncReadStream { /// provided by this struct, the argument must impl [`tokio::io::AsyncRead`]. pub fn new(mut reader: T) -> Self { let (sender, receiver) = tokio::sync::mpsc::channel(1); - crate::preview2::spawn(async move { + let join_handle = crate::preview2::spawn(async move { loop { use tokio::io::AsyncReadExt; let mut buf = bytes::BytesMut::with_capacity(4096); @@ -130,10 +131,17 @@ impl AsyncReadStream { state: StreamState::Open, buffer: None, receiver, + join_handle, } } } +impl Drop for AsyncReadStream { + fn drop(&mut self) { + self.join_handle.abort() + } +} + #[async_trait::async_trait] impl HostInputStream for AsyncReadStream { fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { @@ -213,6 +221,7 @@ pub struct AsyncWriteStream { state: Option, sender: tokio::sync::mpsc::Sender, result_receiver: tokio::sync::mpsc::Receiver>, + join_handle: tokio::task::JoinHandle<()>, } impl AsyncWriteStream { @@ -222,7 +231,7 @@ impl AsyncWriteStream { let (sender, mut receiver) = tokio::sync::mpsc::channel::(1); let (result_sender, result_receiver) = tokio::sync::mpsc::channel(1); - crate::preview2::spawn(async move { + let join_handle = crate::preview2::spawn(async move { 'outer: loop { use tokio::io::AsyncWriteExt; match receiver.recv().await { @@ -260,6 +269,7 @@ impl AsyncWriteStream { state: Some(WriteState::Ready), sender, result_receiver, + join_handle, } } @@ -282,6 +292,12 @@ impl AsyncWriteStream { } } +impl Drop for AsyncWriteStream { + fn drop(&mut self) { + self.join_handle.abort() + } +} + #[async_trait::async_trait] impl HostOutputStream for AsyncWriteStream { fn write(&mut self, bytes: Bytes) -> Result<(usize, StreamState), anyhow::Error> { From f74d6c7354b082d803fa1980b05f6a86c72b02fd Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Tue, 25 Jul 2023 16:58:30 -0700 Subject: [PATCH 09/18] rewrite most of the asyncfd based stdin this has still got at least one serious bug, noted with FIXME. also has a bunch of changes which can be backed out because they were chasing red herrings. the major bug was that, for some inexplicable reason, i didn't actually set the fd to nonblocking just prior to asyncfd creation. switching from the fdflags.difference to fdflags.union operator fixed that bug. --- crates/wasi/src/preview2/mod.rs | 15 +-- crates/wasi/src/preview2/stdio.rs | 105 +++++++++++-------- crates/wasi/src/preview2/stdio/unix.rs | 136 ++++++++++++++++--------- 3 files changed, 159 insertions(+), 97 deletions(-) diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 28271d26070a..30c12ae0ee7c 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -148,13 +148,14 @@ pub mod bindings { pub use self::_internal_rest::wasi::*; } -static RUNTIME: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_time() - .enable_io() - .build() - .unwrap() -}); +pub(crate) static RUNTIME: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .unwrap() + }); pub(crate) fn spawn(f: F) -> tokio::task::JoinHandle where diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 92130c03578f..571bff9cc06d 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -83,54 +83,70 @@ mod test { { test_child_stdin( |mut result_write| { - let mut running = true; - while running { - tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stdin = mk_stdin(); - - assert!( - tokio::time::timeout( - std::time::Duration::from_millis(100), - stdin.ready() - ) - .await - .is_err(), - "stdin available too soon" - ); - - writeln!(&mut result_write, "start").unwrap(); - - let mut buffer = String::new(); - loop { - println!("child: waiting for stdin to be ready"); - stdin.ready().await.unwrap(); - - println!("child: reading input"); - let (bytes, status) = stdin.read(1024).unwrap(); - - println!("child: {:?}, {:?}", bytes, status); - - // We can't effectively test for the case where stdin was closed. - assert_eq!(status, StreamState::Open); - - buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); - if let Some((line, rest)) = buffer.split_once('\n') { - if line == "all done" { - writeln!(&mut result_write, "done").unwrap(); - running = false; - break; - } else if line == "restart" { - writeln!(&mut result_write, "restarting").unwrap(); - break; - } else { - writeln!(&mut result_write, "{}", line).unwrap(); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut running = true; + while running { + println!("child: creating stdin"); + let mut stdin = mk_stdin(); + + println!("child: checking that stdin is not ready"); + assert!( + tokio::time::timeout( + std::time::Duration::from_millis(100), + stdin.ready() + ) + .await + .is_err(), + "stdin available too soon" + ); + + writeln!(&mut result_write, "start").unwrap(); + + println!("child: started"); + + let mut buffer = String::new(); + loop { + println!("child: waiting for stdin to be ready"); + stdin.ready().await.unwrap(); + + println!("child: reading input"); + let (bytes, status) = stdin.read(1024).unwrap(); + + println!("child: {:?}, {:?}", bytes, status); + + // We can't effectively test for the case where stdin was closed. + assert_eq!(status, StreamState::Open); + + println!("sleeping"); + tokio::task::yield_now().await; + //tokio::time::sleep(std::time::Duration::from_millis(1)).await; + println!("done sleeping"); + + buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); + if let Some((line, rest)) = buffer.split_once('\n') { + if line == "all done" { + writeln!(&mut result_write, "done").unwrap(); + println!("child: exiting..."); + running = false; + break; + } else if line == "restart" { + writeln!(&mut result_write, "restarting").unwrap(); + println!("child: restarting..."); + + break; + } else { + writeln!(&mut result_write, "{}", line).unwrap(); + } + + buffer = rest.to_owned(); } - - buffer = rest.to_owned(); } } }); - } }, |mut stdin_write, mut result_read| { let mut line = String::new(); @@ -151,6 +167,7 @@ mod test { result_read.read_line(&mut line).unwrap(); assert_eq!(line, "start\n"); + println!("parent: writing `more bytes`"); writeln!(&mut stdin_write, "more bytes").unwrap(); line.clear(); result_read.read_line(&mut line).unwrap(); diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index 3888b5cdf96d..343198b01c61 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -1,34 +1,84 @@ use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState}; -use anyhow::Error; +use anyhow::{Context as _, Error}; use bytes::Bytes; use futures::ready; use std::future::Future; use std::io::{self, Read}; use std::pin::Pin; +use std::sync::{Arc, Mutex, OnceLock, Weak}; use std::task::{Context, Poll}; use tokio::io::unix::AsyncFd; use tokio::io::{AsyncRead, ReadBuf}; -// wasmtime cant use std::sync::OnceLock yet because of a llvm regression in -// 1.70. when 1.71 is released, we can switch to using std here. -use once_cell::sync::OnceCell as OnceLock; - -use std::sync::Mutex; - // We need a single global instance of the AsyncFd because creating // this instance registers the process's stdin fd with epoll, which will // return an error if an fd is registered more than once. -struct GlobalStdin(Mutex); -static STDIN: OnceLock = OnceLock::new(); +static STDIN: OnceLock> = OnceLock::new(); + +struct GlobalStdin(Weak>); + +pub struct Stdin(Arc>); +pub fn stdin() -> Stdin { + GlobalStdin::get().unwrap() +} impl GlobalStdin { - fn new() -> anyhow::Result { - Ok(Self(Mutex::new(AsyncReadStream::new(InnerStdin::new()?)))) + fn upgrade(&self) -> Option { + Weak::upgrade(&self.0).map(Stdin) + } + + fn new() -> anyhow::Result<(Self, Stdin)> { + use crate::preview2::RUNTIME; + let inner = match tokio::runtime::Handle::try_current() { + Ok(_) => AsyncReadStream::new(InnerStdin::new()?), + Err(_) => { + let _enter = RUNTIME.enter(); + RUNTIME.block_on(async { + Ok::<_, anyhow::Error>(AsyncReadStream::new(InnerStdin::new()?)) + })? + } + }; + let strong = Arc::new(Mutex::new(inner)); + let global = GlobalStdin(Arc::downgrade(&strong)); + Ok((global, Stdin(strong))) } - fn read(&self, size: usize) -> Result<(Bytes, StreamState), Error> { + + fn get() -> anyhow::Result { + match STDIN.get() { + None => { + let (global, strong) = + Self::new().context("creating global stdin resource for first time")?; + match STDIN.set(Mutex::new(global)) { + Ok(_) => Ok(strong), + Err(_) => panic!("fixme: lost race?"), + } + } + Some(g) => { + let mut g = g.lock().unwrap(); + match g.upgrade() { + Some(strong) => Ok(strong), + None => { + // BUG: the Arc can go to zero but the AsyncFd hasnt finished dropping yet, + // so this will fail sometimes because epoll hasnt yet had the fd + // unregistered + let (global, strong) = + Self::new().context("re-creating global stdin resource")?; + *g = global; + Ok(strong) + } + } + } + } + } +} + +#[async_trait::async_trait] +impl crate::preview2::HostInputStream for Stdin { + fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { HostInputStream::read(&mut *self.0.lock().unwrap(), size) } - fn ready<'a>(&'a self) -> impl Future> + 'a { + + async fn ready(&mut self) -> Result<(), Error> { // Custom Future impl takes the std mutex in each invocation of poll. // Required so we don't have to use a tokio mutex, which we can't take from // inside a sync context in Self::read. @@ -37,7 +87,7 @@ impl GlobalStdin { // then releasing the lock is acceptable here because the ready() future // is only ever going to await on a single channel recv, plus some management // of a state machine (for buffering). - struct Ready<'a>(&'a GlobalStdin); + struct Ready<'a>(&'a Stdin); impl<'a> Future for Ready<'a> { type Output = Result<(), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -47,45 +97,35 @@ impl GlobalStdin { fut.poll(cx) } } - Ready(self) + Ready(self).await } } -pub struct Stdin; -impl Stdin { - fn get_global() -> &'static GlobalStdin { - // Creation must be running in a tokio context to succeed. - match tokio::runtime::Handle::try_current() { - Ok(_) => STDIN.get_or_init(|| { - GlobalStdin::new().expect("creating AsyncFd for stdin in existing tokio context") - }), - Err(_) => STDIN.get_or_init(|| { - crate::preview2::in_tokio(async { - GlobalStdin::new() - .expect("creating AsyncFd for stdin in internal tokio context") - }) - }), - } +struct MyStdin(std::os::fd::RawFd); +impl MyStdin { + fn new() -> Self { + MyStdin(libc::STDIN_FILENO) } } - -pub fn stdin() -> Stdin { - Stdin +impl std::os::fd::AsRawFd for MyStdin { + fn as_raw_fd(&self) -> std::os::fd::RawFd { + self.0 + } } - -#[async_trait::async_trait] -impl crate::preview2::HostInputStream for Stdin { - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { - Self::get_global().read(size) +impl rustix::fd::AsFd for MyStdin { + fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> { + unsafe { rustix::fd::BorrowedFd::borrow_raw(self.0) } } +} - async fn ready(&mut self) -> Result<(), Error> { - Self::get_global().ready().await +impl Read for MyStdin { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + Ok(rustix::io::read(self, buf)?) } } struct InnerStdin { - inner: AsyncFd, + inner: AsyncFd, } impl InnerStdin { @@ -93,16 +133,16 @@ impl InnerStdin { use rustix::fs::OFlags; use std::os::fd::AsRawFd; - let stdin = std::io::stdin(); + let stdin = MyStdin::new(); let borrowed_fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(stdin.as_raw_fd()) }; let flags = rustix::fs::fcntl_getfl(borrowed_fd)?; if !flags.contains(OFlags::NONBLOCK) { - rustix::fs::fcntl_setfl(borrowed_fd, flags.difference(OFlags::NONBLOCK))?; + rustix::fs::fcntl_setfl(borrowed_fd, flags.union(OFlags::NONBLOCK))?; } Ok(Self { - inner: AsyncFd::new(std::io::stdin())?, + inner: AsyncFd::new(MyStdin::new())?, }) } } @@ -122,8 +162,12 @@ impl AsyncRead for InnerStdin { buf.advance(len); return Poll::Ready(Ok(())); } - Ok(Err(err)) => return Poll::Ready(Err(err)), - Err(_would_block) => continue, + Ok(Err(err)) => { + return Poll::Ready(Err(err)); + } + Err(_would_block) => { + continue; + } } } } From c61a9e3ae3b209c9f63c38d1166233af1c5bf8b6 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Wed, 26 Jul 2023 11:34:06 -0700 Subject: [PATCH 10/18] Refactor to avoid `as_mut` and `.0 .0` --- crates/wasi/src/preview2/stdio/unix.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index 343198b01c61..3c9a0ada5758 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -87,17 +87,19 @@ impl crate::preview2::HostInputStream for Stdin { // then releasing the lock is acceptable here because the ready() future // is only ever going to await on a single channel recv, plus some management // of a state machine (for buffering). - struct Ready<'a>(&'a Stdin); + struct Ready<'a> { + handle: &'a Stdin, + } impl<'a> Future for Ready<'a> { type Output = Result<(), Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut locked = self.as_mut().0 .0.lock().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut locked = self.handle.0.lock().unwrap(); let fut = locked.ready(); tokio::pin!(fut); fut.poll(cx) } } - Ready(self).await + Ready { handle: self }.await } } From c458cb57dd7fb0860b053dd862eb4320bf86892b Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Thu, 27 Jul 2023 14:37:17 -0700 Subject: [PATCH 11/18] Rework the stdin implementation to hold the stream in a mutex --- crates/wasi/src/preview2/stdio/unix.rs | 101 +++++-------------------- 1 file changed, 20 insertions(+), 81 deletions(-) diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index 3c9a0ada5758..4efd89cbfd2e 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -1,11 +1,11 @@ use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState}; -use anyhow::{Context as _, Error}; +use anyhow::Error; use bytes::Bytes; use futures::ready; use std::future::Future; use std::io::{self, Read}; use std::pin::Pin; -use std::sync::{Arc, Mutex, OnceLock, Weak}; +use std::sync::{Arc, Mutex, OnceLock}; use std::task::{Context, Poll}; use tokio::io::unix::AsyncFd; use tokio::io::{AsyncRead, ReadBuf}; @@ -13,63 +13,25 @@ use tokio::io::{AsyncRead, ReadBuf}; // We need a single global instance of the AsyncFd because creating // this instance registers the process's stdin fd with epoll, which will // return an error if an fd is registered more than once. -static STDIN: OnceLock> = OnceLock::new(); - -struct GlobalStdin(Weak>); +static STDIN: OnceLock = OnceLock::new(); +#[derive(Clone)] pub struct Stdin(Arc>); -pub fn stdin() -> Stdin { - GlobalStdin::get().unwrap() -} -impl GlobalStdin { - fn upgrade(&self) -> Option { - Weak::upgrade(&self.0).map(Stdin) - } - - fn new() -> anyhow::Result<(Self, Stdin)> { - use crate::preview2::RUNTIME; - let inner = match tokio::runtime::Handle::try_current() { - Ok(_) => AsyncReadStream::new(InnerStdin::new()?), - Err(_) => { - let _enter = RUNTIME.enter(); - RUNTIME.block_on(async { - Ok::<_, anyhow::Error>(AsyncReadStream::new(InnerStdin::new()?)) - })? - } - }; - let strong = Arc::new(Mutex::new(inner)); - let global = GlobalStdin(Arc::downgrade(&strong)); - Ok((global, Stdin(strong))) - } - - fn get() -> anyhow::Result { - match STDIN.get() { - None => { - let (global, strong) = - Self::new().context("creating global stdin resource for first time")?; - match STDIN.set(Mutex::new(global)) { - Ok(_) => Ok(strong), - Err(_) => panic!("fixme: lost race?"), - } - } - Some(g) => { - let mut g = g.lock().unwrap(); - match g.upgrade() { - Some(strong) => Ok(strong), - None => { - // BUG: the Arc can go to zero but the AsyncFd hasnt finished dropping yet, - // so this will fail sometimes because epoll hasnt yet had the fd - // unregistered - let (global, strong) = - Self::new().context("re-creating global stdin resource")?; - *g = global; - Ok(strong) - } +pub fn stdin() -> Stdin { + STDIN + .get_or_init(|| { + use crate::preview2::RUNTIME; + let inner = match tokio::runtime::Handle::try_current() { + Ok(_) => AsyncReadStream::new(InnerStdin::new().unwrap()), + Err(_) => { + let _enter = RUNTIME.enter(); + RUNTIME.block_on(async { AsyncReadStream::new(InnerStdin::new().unwrap()) }) } - } - } - } + }; + Stdin(Arc::new(Mutex::new(inner))) + }) + .clone() } #[async_trait::async_trait] @@ -103,31 +65,8 @@ impl crate::preview2::HostInputStream for Stdin { } } -struct MyStdin(std::os::fd::RawFd); -impl MyStdin { - fn new() -> Self { - MyStdin(libc::STDIN_FILENO) - } -} -impl std::os::fd::AsRawFd for MyStdin { - fn as_raw_fd(&self) -> std::os::fd::RawFd { - self.0 - } -} -impl rustix::fd::AsFd for MyStdin { - fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> { - unsafe { rustix::fd::BorrowedFd::borrow_raw(self.0) } - } -} - -impl Read for MyStdin { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - Ok(rustix::io::read(self, buf)?) - } -} - struct InnerStdin { - inner: AsyncFd, + inner: AsyncFd, } impl InnerStdin { @@ -135,7 +74,7 @@ impl InnerStdin { use rustix::fs::OFlags; use std::os::fd::AsRawFd; - let stdin = MyStdin::new(); + let stdin = std::io::stdin(); let borrowed_fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(stdin.as_raw_fd()) }; let flags = rustix::fs::fcntl_getfl(borrowed_fd)?; @@ -144,7 +83,7 @@ impl InnerStdin { } Ok(Self { - inner: AsyncFd::new(MyStdin::new())?, + inner: AsyncFd::new(stdin)?, }) } } From 43e5dfd2d0a5272640c87992fb44f92cb728f940 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Thu, 27 Jul 2023 15:26:47 -0700 Subject: [PATCH 12/18] Detect when the task backing stdin has exited --- crates/wasi/src/preview2/pipe.rs | 2 +- crates/wasi/src/preview2/stdio/unix.rs | 44 ++++++++++++++++++-------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index e579072c9f90..f8d64546c635 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -102,7 +102,7 @@ pub struct AsyncReadStream { state: StreamState, buffer: Option>, receiver: tokio::sync::mpsc::Receiver>, - join_handle: tokio::task::JoinHandle<()>, + pub(crate) join_handle: tokio::task::JoinHandle<()>, } impl AsyncReadStream { diff --git a/crates/wasi/src/preview2/stdio/unix.rs b/crates/wasi/src/preview2/stdio/unix.rs index 4efd89cbfd2e..9e24efc95f50 100644 --- a/crates/wasi/src/preview2/stdio/unix.rs +++ b/crates/wasi/src/preview2/stdio/unix.rs @@ -19,19 +19,37 @@ static STDIN: OnceLock = OnceLock::new(); pub struct Stdin(Arc>); pub fn stdin() -> Stdin { - STDIN - .get_or_init(|| { - use crate::preview2::RUNTIME; - let inner = match tokio::runtime::Handle::try_current() { - Ok(_) => AsyncReadStream::new(InnerStdin::new().unwrap()), - Err(_) => { - let _enter = RUNTIME.enter(); - RUNTIME.block_on(async { AsyncReadStream::new(InnerStdin::new().unwrap()) }) - } - }; - Stdin(Arc::new(Mutex::new(inner))) - }) - .clone() + fn init_stdin() -> AsyncReadStream { + use crate::preview2::RUNTIME; + match tokio::runtime::Handle::try_current() { + Ok(_) => AsyncReadStream::new(InnerStdin::new().unwrap()), + Err(_) => { + let _enter = RUNTIME.enter(); + RUNTIME.block_on(async { AsyncReadStream::new(InnerStdin::new().unwrap()) }) + } + } + } + + let handle = STDIN + .get_or_init(|| Stdin(Arc::new(Mutex::new(init_stdin())))) + .clone(); + + { + let mut guard = handle.0.lock().unwrap(); + + // The backing task exited. This can happen in two cases: + // + // 1. the task crashed + // 2. the runtime has exited and been restarted in the same process + // + // As we can't tell the difference between these two, we assume the latter and restart the + // task. + if guard.join_handle.is_finished() { + *guard = init_stdin(); + } + } + + handle } #[async_trait::async_trait] From a484a93d3aeb196e7fc22c321915aa574d6dffe8 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Thu, 27 Jul 2023 15:27:02 -0700 Subject: [PATCH 13/18] Have the thread managing stdin exit when it encounters EOF --- crates/wasi/src/preview2/stdio/worker_thread_stdin.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index c3adc5f0f350..b4fde02465e3 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -51,6 +51,10 @@ fn create() -> GlobalStdin { std::thread::spawn(move || loop { let mut bytes = BytesMut::zeroed(1024); match std::io::stdin().lock().read(&mut bytes) { + // Reading `0` indicates that stdin has reached EOF, so we break + // the loop to allow the thread to exit. + Ok(0) => break, + Ok(nbytes) => { // Append to the buffer: bytes.truncate(nbytes); From dc835d954d71b3306838868e2ba25da3d92f9065 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Thu, 27 Jul 2023 15:27:16 -0700 Subject: [PATCH 14/18] Refactor the test to restart the runtime as well --- crates/wasi/src/preview2/stdio.rs | 163 +++++++++++++++++------------- 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 571bff9cc06d..546d2b83a469 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -83,82 +83,87 @@ mod test { { test_child_stdin( |mut result_write| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - let mut running = true; - while running { - println!("child: creating stdin"); - let mut stdin = mk_stdin(); - - println!("child: checking that stdin is not ready"); - assert!( - tokio::time::timeout( - std::time::Duration::from_millis(100), - stdin.ready() - ) - .await - .is_err(), - "stdin available too soon" - ); - - writeln!(&mut result_write, "start").unwrap(); - - println!("child: started"); - - let mut buffer = String::new(); - loop { - println!("child: waiting for stdin to be ready"); - stdin.ready().await.unwrap(); - - println!("child: reading input"); - let (bytes, status) = stdin.read(1024).unwrap(); - - println!("child: {:?}, {:?}", bytes, status); - - // We can't effectively test for the case where stdin was closed. - assert_eq!(status, StreamState::Open); - - println!("sleeping"); - tokio::task::yield_now().await; - //tokio::time::sleep(std::time::Duration::from_millis(1)).await; - println!("done sleeping"); - - buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); - if let Some((line, rest)) = buffer.split_once('\n') { - if line == "all done" { - writeln!(&mut result_write, "done").unwrap(); - println!("child: exiting..."); - running = false; - break; - } else if line == "restart" { - writeln!(&mut result_write, "restarting").unwrap(); - println!("child: restarting..."); - - break; - } else { - writeln!(&mut result_write, "{}", line).unwrap(); + let mut child_running = true; + while child_running { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + 'task: loop { + println!("child: creating stdin"); + let mut stdin = mk_stdin(); + + println!("child: checking that stdin is not ready"); + assert!( + tokio::time::timeout( + std::time::Duration::from_millis(100), + stdin.ready() + ) + .await + .is_err(), + "stdin available too soon" + ); + + writeln!(&mut result_write, "start").unwrap(); + + println!("child: started"); + + let mut buffer = String::new(); + loop { + println!("child: waiting for stdin to be ready"); + stdin.ready().await.unwrap(); + + println!("child: reading input"); + let (bytes, status) = stdin.read(1024).unwrap(); + + println!("child: {:?}, {:?}", bytes, status); + + // We can't effectively test for the case where stdin was closed. + assert_eq!(status, StreamState::Open); + + buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); + if let Some((line, rest)) = buffer.split_once('\n') { + if line == "all done" { + writeln!(&mut result_write, "done").unwrap(); + println!("child: exiting..."); + child_running = false; + break 'task; + } else if line == "restart_runtime" { + writeln!(&mut result_write, "restarting").unwrap(); + println!("child: restarting runtime..."); + break 'task; + } else if line == "restart_task" { + writeln!(&mut result_write, "restarting").unwrap(); + println!("child: restarting task..."); + continue 'task; + } else { + writeln!(&mut result_write, "{}", line).unwrap(); + } + + buffer = rest.to_owned(); } - - buffer = rest.to_owned(); } } - } - }); + }); + println!("runtime exited"); + } + println!("child exited"); }, |mut stdin_write, mut result_read| { let mut line = String::new(); result_read.read_line(&mut line).unwrap(); assert_eq!(line, "start\n"); - writeln!(&mut stdin_write, "some bytes").unwrap(); - line.clear(); - result_read.read_line(&mut line).unwrap(); - assert_eq!(line, "some bytes\n"); + for i in 0..5 { + let message = format!("some bytes {}\n", i); + stdin_write.write_all(message.as_bytes()).unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, message); + } - writeln!(&mut stdin_write, "restart").unwrap(); + writeln!(&mut stdin_write, "restart_task").unwrap(); line.clear(); result_read.read_line(&mut line).unwrap(); assert_eq!(line, "restarting\n"); @@ -167,11 +172,31 @@ mod test { result_read.read_line(&mut line).unwrap(); assert_eq!(line, "start\n"); - println!("parent: writing `more bytes`"); - writeln!(&mut stdin_write, "more bytes").unwrap(); + for i in 0..10 { + let message = format!("more bytes {}\n", i); + stdin_write.write_all(message.as_bytes()).unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, message); + } + + writeln!(&mut stdin_write, "restart_runtime").unwrap(); line.clear(); result_read.read_line(&mut line).unwrap(); - assert_eq!(line, "more bytes\n"); + assert_eq!(line, "restarting\n"); + line.clear(); + + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, "start\n"); + + for i in 0..17 { + let message = format!("even more bytes {}\n", i); + stdin_write.write_all(message.as_bytes()).unwrap(); + line.clear(); + result_read.read_line(&mut line).unwrap(); + assert_eq!(line, message); + } + writeln!(&mut stdin_write, "all done").unwrap(); From f001df3bb9baf202f03da3d0ca41144b5365b4c3 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Thu, 27 Jul 2023 15:27:40 -0700 Subject: [PATCH 15/18] Format prtest:full --- crates/wasi/src/preview2/stdio.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 546d2b83a469..81083d2e0bfc 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -197,7 +197,6 @@ mod test { assert_eq!(line, message); } - writeln!(&mut stdin_write, "all done").unwrap(); line.clear(); From d7cfc396b356a551f26a02350cbd7629e292d917 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 28 Jul 2023 10:38:46 -0700 Subject: [PATCH 16/18] internal tokio runtime used for sync interfaces is now new_current_thread --- crates/wasi/src/preview2/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 30c12ae0ee7c..0d60e947372d 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -150,7 +150,7 @@ pub mod bindings { pub(crate) static RUNTIME: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() + tokio::runtime::Builder::new_current_thread() .enable_time() .enable_io() .build() From 81a1fbb3c56f6f7387dce1511b0a89344eea2fde Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Mon, 31 Jul 2023 14:50:03 -0700 Subject: [PATCH 17/18] Remove println from test for stdout --- crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs b/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs index a66eb94ffe28..048289376a58 100644 --- a/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs +++ b/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs @@ -69,7 +69,6 @@ unsafe fn test_stdin_read() { } fn writable_subs(h: &HashMap) -> Vec { - println!("writable subs: {:?}", h); h.iter() .map(|(ud, fd)| wasi::Subscription { userdata: *ud, From 6f86ea299c4863e2270192156de4141040404985 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Mon, 31 Jul 2023 14:50:13 -0700 Subject: [PATCH 18/18] Remove unneeded `vec!` --- crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs b/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs index 048289376a58..2ddc00475281 100644 --- a/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs +++ b/crates/test-programs/wasi-tests/src/bin/poll_oneoff_stdio.rs @@ -86,7 +86,7 @@ fn writable_subs(h: &HashMap) -> Vec { unsafe fn test_stdout_stderr_write() { let mut writable: HashMap = - vec![(1, STDOUT_FD), (2, STDERR_FD)].into_iter().collect(); + [(1, STDOUT_FD), (2, STDERR_FD)].into_iter().collect(); let clock = wasi::Subscription { userdata: CLOCK_ID,