Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 2 additions & 31 deletions src/http/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{response::IncomingBody, Body, Error, Request, Response, Result};
use crate::io::{self, AsyncWrite};
use crate::io::{self, AsyncOutputStream};
use crate::runtime::Reactor;
use crate::time::Duration;
use wasi::http::types::{OutgoingBody, RequestOptions as WasiRequestOptions};
Expand Down Expand Up @@ -27,9 +27,7 @@ impl Client {
let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap();

// 2. Start sending the request body
io::copy(body, OutputStream::new(body_stream))
.await
.expect("io::copy broke oh no");
io::copy(body, AsyncOutputStream::new(body_stream)).await?;

// 3. Finish sending the request body
let trailers = None;
Expand Down Expand Up @@ -74,33 +72,6 @@ impl Client {
}
}

struct OutputStream {
stream: wasi::http::types::OutputStream,
}

impl OutputStream {
fn new(stream: wasi::http::types::OutputStream) -> Self {
Self { stream }
}
}

impl AsyncWrite for OutputStream {
async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let max = self.stream.check_write().unwrap() as usize;
let max = max.min(buf.len());
let buf = &buf[0..max];
self.stream.write(buf).unwrap();
Reactor::current().wait_for(self.stream.subscribe()).await;
Ok(max)
}

async fn flush(&mut self) -> io::Result<()> {
self.stream.flush().unwrap();
Reactor::current().wait_for(self.stream.subscribe()).await;
Ok(())
}
}

#[derive(Default, Debug)]
struct RequestOptions {
connect_timeout: Option<Duration>,
Expand Down
9 changes: 9 additions & 0 deletions src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl fmt::Debug for Error {
ErrorVariant::HeaderName(e) => write!(f, "header name error: {e:?}"),
ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e:?}"),
ErrorVariant::Method(e) => write!(f, "method error: {e:?}"),
ErrorVariant::BodyIo(e) => write!(f, "body error: {e:?}"),
ErrorVariant::Other(e) => write!(f, "{e}"),
}
}
Expand All @@ -37,6 +38,7 @@ impl fmt::Display for Error {
ErrorVariant::HeaderName(e) => write!(f, "header name error: {e}"),
ErrorVariant::HeaderValue(e) => write!(f, "header value error: {e}"),
ErrorVariant::Method(e) => write!(f, "method error: {e}"),
ErrorVariant::BodyIo(e) => write!(f, "body error: {e}"),
ErrorVariant::Other(e) => write!(f, "{e}"),
}
}
Expand Down Expand Up @@ -100,12 +102,19 @@ impl From<InvalidMethod> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
ErrorVariant::BodyIo(e).into()
}
}

#[derive(Debug)]
pub enum ErrorVariant {
WasiHttp(WasiHttpErrorCode),
WasiHeader(WasiHttpHeaderError),
HeaderName(InvalidHeaderName),
HeaderValue(InvalidHeaderValue),
Method(InvalidMethod),
BodyIo(std::io::Error),
Other(String),
}
54 changes: 4 additions & 50 deletions src/http/response.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use wasi::http::types::{IncomingBody as WasiIncomingBody, IncomingResponse};
use wasi::io::streams::{InputStream, StreamError};

use super::{fields::header_map_from_wasi, Body, Error, HeaderMap, Result, StatusCode};
use crate::io::AsyncRead;
use crate::runtime::Reactor;

/// Stream 2kb chunks at a time
const CHUNK_SIZE: u64 = 2048;
use crate::io::{AsyncInputStream, AsyncRead};

/// An HTTP response
#[derive(Debug)]
Expand Down Expand Up @@ -57,9 +52,7 @@ impl Response<IncomingBody> {

let body = IncomingBody {
kind,
buf_offset: 0,
buf: None,
body_stream,
body_stream: AsyncInputStream::new(body_stream),
_incoming_body: incoming_body,
};

Expand Down Expand Up @@ -96,54 +89,15 @@ impl<B: Body> Response<B> {
#[derive(Debug)]
pub struct IncomingBody {
kind: BodyKind,
buf: Option<Vec<u8>>,
// How many bytes have we already read from the buf?
buf_offset: usize,

// IMPORTANT: the order of these fields here matters. `body_stream` must
// be dropped before `_incoming_body`.
body_stream: InputStream,
body_stream: AsyncInputStream,
_incoming_body: WasiIncomingBody,
}

impl AsyncRead for IncomingBody {
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
let buf = match &mut self.buf {
Some(ref mut buf) => buf,
None => {
// Wait for an event to be ready
let pollable = self.body_stream.subscribe();
Reactor::current().wait_for(pollable).await;

// Read the bytes from the body stream
let buf = match self.body_stream.read(CHUNK_SIZE) {
Ok(buf) => buf,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(format!(
"last operation failed: {}",
err.to_debug_string()
)))
}
};
self.buf.insert(buf)
}
};

// copy bytes
let len = (buf.len() - self.buf_offset).min(out_buf.len());
let max = self.buf_offset + len;
let slice = &buf[self.buf_offset..max];
out_buf[0..len].copy_from_slice(slice);
self.buf_offset += len;

// reset the local slice if necessary
if self.buf_offset == buf.len() {
self.buf = None;
self.buf_offset = 0;
}

Ok(len)
self.body_stream.read(out_buf).await
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ mod cursor;
mod empty;
mod read;
mod seek;
mod stdio;
mod streams;
mod write;

pub use crate::runtime::AsyncPollable;
pub use copy::*;
pub use cursor::*;
pub use empty::*;
pub use read::*;
pub use seek::*;
pub use stdio::*;
pub use streams::*;
pub use write::*;

/// The error type for I/O operations.
Expand Down
139 changes: 139 additions & 0 deletions src/io/stdio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use super::{AsyncInputStream, AsyncOutputStream};
use std::cell::LazyCell;
use wasi::cli::terminal_input::TerminalInput;
use wasi::cli::terminal_output::TerminalOutput;

/// Use the program's stdin as an `AsyncInputStream`.
#[derive(Debug)]
pub struct Stdin {
stream: AsyncInputStream,
terminput: LazyCell<Option<TerminalInput>>,
}

/// Get the program's stdin for use as an `AsyncInputStream`.
pub fn stdin() -> Stdin {
let stream = AsyncInputStream::new(wasi::cli::stdin::get_stdin());
Stdin {
stream,
terminput: LazyCell::new(|| wasi::cli::terminal_stdin::get_terminal_stdin()),
}
}

impl std::ops::Deref for Stdin {
type Target = AsyncInputStream;
fn deref(&self) -> &AsyncInputStream {
&self.stream
}
}
impl std::ops::DerefMut for Stdin {
fn deref_mut(&mut self) -> &mut AsyncInputStream {
&mut self.stream
}
}

impl Stdin {
/// Check if stdin is a terminal.
pub fn is_terminal(&self) -> bool {
LazyCell::force(&self.terminput).is_some()
}
}

/// Use the program's stdout as an `AsyncOutputStream`.
#[derive(Debug)]
pub struct Stdout {
stream: AsyncOutputStream,
termoutput: LazyCell<Option<TerminalOutput>>,
}

/// Get the program's stdout for use as an `AsyncOutputStream`.
pub fn stdout() -> Stdout {
let stream = AsyncOutputStream::new(wasi::cli::stdout::get_stdout());
Stdout {
stream,
termoutput: LazyCell::new(|| wasi::cli::terminal_stdout::get_terminal_stdout()),
}
}

impl Stdout {
/// Check if stdout is a terminal.
pub fn is_terminal(&self) -> bool {
LazyCell::force(&self.termoutput).is_some()
}
}

impl std::ops::Deref for Stdout {
type Target = AsyncOutputStream;
fn deref(&self) -> &AsyncOutputStream {
&self.stream
}
}
impl std::ops::DerefMut for Stdout {
fn deref_mut(&mut self) -> &mut AsyncOutputStream {
&mut self.stream
}
}

/// Use the program's stdout as an `AsyncOutputStream`.
#[derive(Debug)]
pub struct Stderr {
stream: AsyncOutputStream,
termoutput: LazyCell<Option<TerminalOutput>>,
}

/// Get the program's stdout for use as an `AsyncOutputStream`.
pub fn stderr() -> Stderr {
let stream = AsyncOutputStream::new(wasi::cli::stderr::get_stderr());
Stderr {
stream,
termoutput: LazyCell::new(|| wasi::cli::terminal_stderr::get_terminal_stderr()),
}
}

impl Stderr {
/// Check if stderr is a terminal.
pub fn is_terminal(&self) -> bool {
LazyCell::force(&self.termoutput).is_some()
}
}

impl std::ops::Deref for Stderr {
type Target = AsyncOutputStream;
fn deref(&self) -> &AsyncOutputStream {
&self.stream
}
}
impl std::ops::DerefMut for Stderr {
fn deref_mut(&mut self) -> &mut AsyncOutputStream {
&mut self.stream
}
}

#[cfg(test)]
mod test {
use crate::io::AsyncWrite;
use crate::runtime::block_on;
#[test]
// No internal predicate. Run test with --nocapture and inspect output manually.
fn stdout_println_hello_world() {
block_on(async {
let mut stdout = super::stdout();
let term = if stdout.is_terminal() { "is" } else { "is not" };
stdout
.write_all(format!("hello, world! stdout {term} a terminal\n",).as_bytes())
.await
.unwrap();
})
}
#[test]
// No internal predicate. Run test with --nocapture and inspect output manually.
fn stderr_println_hello_world() {
block_on(async {
let mut stdout = super::stdout();
let term = if stdout.is_terminal() { "is" } else { "is not" };
stdout
.write_all(format!("hello, world! stderr {term} a terminal\n",).as_bytes())
.await
.unwrap();
})
}
}
Loading
Loading