From 9c0d2efe25deab93b9ca1d88d2dc9d13a67c2c7a Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Mon, 6 Jan 2025 09:30:38 -0800 Subject: [PATCH 1/9] Add a `start_request` function for requests with streaming output bodies. --- src/http/client.rs | 90 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 5 deletions(-) diff --git a/src/http/client.rs b/src/http/client.rs index b9107d9..ce193ad 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -1,9 +1,16 @@ -use super::{body::IncomingBody, Body, Error, Request, Response, Result}; +use super::{ + body::{BodyForthcoming, IncomingBody, OutgoingBody}, + fields::header_map_to_wasi, + Body, Error, HeaderMap, Request, Response, Result, +}; use crate::http::request::try_into_outgoing; use crate::http::response::try_from_incoming; use crate::io::{self, AsyncOutputStream, AsyncPollable}; use crate::time::Duration; -use wasi::http::types::{OutgoingBody, RequestOptions as WasiRequestOptions}; +use wasi::http::types::{ + FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody, + RequestOptions as WasiRequestOptions, +}; /// An HTTP client. // Empty for now, but permits adding support for RequestOptions soon: @@ -19,22 +26,27 @@ impl Client { } /// Send an HTTP request. + /// + /// TODO: Should this automatically add a "Content-Length" header if the + /// body size is known? + /// + /// To respond with trailers, use [`Client::start_request`] instead. pub async fn send(&self, req: Request) -> Result> { // We don't use `body::OutputBody` here because we can report I/O // errors from the `copy` directly. let (wasi_req, body) = try_into_outgoing(req)?; let wasi_body = wasi_req.body().unwrap(); - let body_stream = wasi_body.write().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); // 1. Start sending the request head let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); // 2. Start sending the request body - io::copy(body, AsyncOutputStream::new(body_stream)).await?; + io::copy(body, AsyncOutputStream::new(wasi_stream)).await?; // 3. Finish sending the request body let trailers = None; - OutgoingBody::finish(wasi_body, trailers).unwrap(); + WasiOutgoingBody::finish(wasi_body, trailers).unwrap(); // 4. Receive the response AsyncPollable::new(res.subscribe()).wait_for().await; @@ -46,6 +58,55 @@ impl Client { try_from_incoming(res) } + /// Start sending an HTTP request, and return an `OutgoingBody` stream to + /// write the body to. + /// + /// The returned `OutgoingBody` must be consumed by [`Client::finish`] or + /// [`Client::fail`]. + pub async fn start_request( + &self, + req: Request, + ) -> Result<(OutgoingBody, FutureIncomingResponse)> { + let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?; + let wasi_body = wasi_req.body().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); + + // Start sending the request head. + let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); + + let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); + + Ok((outgoing_body, FutureIncomingResponse(res))) + } + + /// Finish the body, optionally with trailers. + /// + /// This is used with [`Client::start_request`]. + pub fn finish(body: OutgoingBody, trailers: Option) -> Result<()> { + let (stream, body) = body.consume(); + + // The stream is a child resource of the `OutgoingBody`, so ensure that + // it's dropped first. + drop(stream); + + let wasi_trailers = match trailers { + Some(trailers) => Some(header_map_to_wasi(&trailers)?), + None => None, + }; + + wasi::http::types::OutgoingBody::finish(body, wasi_trailers) + .expect("body length did not match Content-Length header value"); + Ok(()) + } + + /// Consume the `OutgoingBody` and indicate that the body was not + /// completed. + /// + /// This is used with [`Client::start_request`]. + pub fn fail(body: OutgoingBody) { + let (_stream, _body) = body.consume(); + } + /// Set timeout on connecting to HTTP server pub fn set_connect_timeout(&mut self, d: impl Into) { self.options_mut().connect_timeout = Some(d.into()); @@ -76,6 +137,25 @@ impl Client { } } +/// Returned from [`Client::start_request`], this represents a handle to a +/// response which has not arrived yet. Call [`FutureIncomingResponse::get`] +/// to wait for the response. +pub struct FutureIncomingResponse(WasiFutureIncomingResponse); + +impl FutureIncomingResponse { + /// Consume this `FutureIncomingResponse`, wait, and return the `Response`. + pub async fn get(self) -> Result> { + // Wait for the response. + AsyncPollable::new(self.0.subscribe()).wait_for().await; + + // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap` + // is to trap if we try and get the response more than once. The final + // `?` is to raise the actual error if there is one. + let res = self.0.get().unwrap().unwrap()?; + try_from_incoming(res) + } +} + #[derive(Default, Debug)] struct RequestOptions { connect_timeout: Option, From a49c9dcadf52718f6726d25f2900d1e72f7d9c11 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Mon, 6 Jan 2025 18:05:43 -0800 Subject: [PATCH 2/9] Add a `StreamedBody` utility. --- src/http/body.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/http/body.rs b/src/http/body.rs index a301d13..0373357 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -107,6 +107,27 @@ impl> Body for BoundedBody { } } +/// An HTTP body with an unknown length +#[derive(Debug)] +pub struct StreamedBody(S); + +impl StreamedBody { + /// Wrap an `AsyncRead` impl in a type that provides a [`Body`] implementation. + pub fn new(s: S) -> Self { + Self(s) + } +} +impl AsyncRead for StreamedBody { + async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result { + self.0.read(buf).await + } +} +impl Body for StreamedBody { + fn len(&self) -> Option { + None + } +} + impl Body for Empty { fn len(&self) -> Option { Some(0) From 4da851d50dd837bb197db3fbe84edb7c17ea3575 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Mon, 6 Jan 2025 09:11:20 -0800 Subject: [PATCH 3/9] Add an HTTP client example. --- Cargo.toml | 4 ++ examples/http_client.rs | 119 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 examples/http_client.rs diff --git a/Cargo.toml b/Cargo.toml index 8019adc..8dcac0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,9 @@ wstd-macro.workspace = true [dev-dependencies] anyhow.workspace = true +clap.workspace = true futures-lite.workspace = true +humantime.workspace = true serde_json.workspace = true [workspace] @@ -49,8 +51,10 @@ authors = [ [workspace.dependencies] anyhow = "1" cargo_metadata = "0.18.1" +clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" +humantime = "2.1.0" heck = "0.5" http = "1.1" itoa = "1" diff --git a/examples/http_client.rs b/examples/http_client.rs new file mode 100644 index 0000000..12bc685 --- /dev/null +++ b/examples/http_client.rs @@ -0,0 +1,119 @@ +use anyhow::{anyhow, Result}; +use clap::{ArgAction, Parser}; +use wstd::http::{ + body::{IncomingBody, StreamedBody}, + request::Builder, + Body, Client, Method, Request, Response, Uri, +}; + +/// Simple HTTP client +/// +/// A simple command-line HTTP client, implemented using `wstd`, using WASI. +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + /// The URL to request + url: Uri, + + /// Forward stdin to the request body + #[arg(long)] + body: bool, + + /// Add a header to the request + #[arg(long = "header", action = ArgAction::Append, value_name = "HEADER")] + headers: Vec, + + /// Method of the request + #[arg(long, default_value = "GET")] + method: Method, + + /// Set the connect timeout + #[arg(long, value_name = "DURATION")] + connect_timeout: Option, + + /// Set the first-byte timeout + #[arg(long, value_name = "DURATION")] + first_byte_timeout: Option, + + /// Set the between-bytes timeout + #[arg(long, value_name = "DURATION")] + between_bytes_timeout: Option, +} + +#[wstd::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Create and configure the `Client` + + let mut client = Client::new(); + + if let Some(connect_timeout) = args.connect_timeout { + client.set_connect_timeout(*connect_timeout); + } + if let Some(first_byte_timeout) = args.first_byte_timeout { + client.set_first_byte_timeout(*first_byte_timeout); + } + if let Some(between_bytes_timeout) = args.between_bytes_timeout { + client.set_between_bytes_timeout(*between_bytes_timeout); + } + + // Create and configure the request. + + let mut request = Request::builder(); + + request = request.uri(args.url).method(args.method); + + for header in args.headers { + let mut parts = header.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("headers must be formatted like \"key: value\""))?; + request = request.header(key, value); + } + + // Send the request. + + async fn send_request( + client: &Client, + request: Builder, + body: B, + ) -> Result> { + let request = request.body(body)?; + + eprintln!("> {} / {:?}", request.method(), request.version()); + for (key, value) in request.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + Ok(client.send(request).await?) + } + let response = if args.body { + send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await + } else { + send_request(&client, request, wstd::io::empty()).await + }?; + + // Print the response. + + eprintln!("< {:?} {}", response.version(), response.status()); + for (key, value) in response.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + + let mut body = response.into_body(); + wstd::io::copy(&mut body, wstd::io::stdout()).await?; + + let trailers = body.finish().await?; + if let Some(trailers) = trailers { + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + } + + Ok(()) +} From 0b2476d8f737ddc5247ea92f5e26b7a69f201394 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Fri, 10 Jan 2025 12:45:28 -0800 Subject: [PATCH 4/9] Add a somewhat more complex HTTP client. --- examples/complex_http_client.rs | 139 ++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 examples/complex_http_client.rs diff --git a/examples/complex_http_client.rs b/examples/complex_http_client.rs new file mode 100644 index 0000000..b165468 --- /dev/null +++ b/examples/complex_http_client.rs @@ -0,0 +1,139 @@ +use anyhow::{anyhow, Result}; +use clap::{ArgAction, Parser}; +use std::str::FromStr; +use wstd::http::{ + body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, +}; + +/// Complex HTTP client +/// +/// A somewhat more complex command-line HTTP client, implemented using +/// `wstd`, using WASI. +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + /// The URL to request + url: Uri, + + /// Forward stdin to the request body + #[arg(long)] + body: bool, + + /// Add a header to the request + #[arg(long = "header", action = ArgAction::Append, value_name = "HEADER")] + headers: Vec, + + /// Add a trailer to the request + #[arg(long = "trailer", action = ArgAction::Append, value_name = "TRAILER")] + trailers: Vec, + + /// Method of the request + #[arg(long, default_value = "GET")] + method: Method, + + /// Set the connect timeout + #[arg(long, value_name = "DURATION")] + connect_timeout: Option, + + /// Set the first-byte timeout + #[arg(long, value_name = "DURATION")] + first_byte_timeout: Option, + + /// Set the between-bytes timeout + #[arg(long, value_name = "DURATION")] + between_bytes_timeout: Option, +} + +#[wstd::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Create and configure the `Client` + + let mut client = Client::new(); + + if let Some(connect_timeout) = args.connect_timeout { + client.set_connect_timeout(*connect_timeout); + } + if let Some(first_byte_timeout) = args.first_byte_timeout { + client.set_first_byte_timeout(*first_byte_timeout); + } + if let Some(between_bytes_timeout) = args.between_bytes_timeout { + client.set_between_bytes_timeout(*between_bytes_timeout); + } + + // Create and configure the request. + + let mut request = Request::builder(); + + request = request.uri(args.url).method(args.method); + + for header in args.headers { + let mut parts = header.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("headers must be formatted like \"key: value\""))?; + request = request.header(key, value); + } + let mut trailers = HeaderMap::new(); + for trailer in args.trailers { + let mut parts = trailer.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("trailers must be formatted like \"key: value\""))?; + trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?); + } + + // Send the request. + + let request = request.body(BodyForthcoming)?; + + eprintln!("> {} / {:?}", request.method(), request.version()); + for (key, value) in request.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + let (mut outgoing_body, response) = client.start_request(request).await?; + + if args.body { + wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?; + } else { + wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?; + } + + if !trailers.is_empty() { + eprintln!("..."); + } + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + Client::finish(outgoing_body, Some(trailers))?; + + let response = response.get().await?; + + // Print the response. + + eprintln!("< {:?} {}", response.version(), response.status()); + for (key, value) in response.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + + let mut body = response.into_body(); + wstd::io::copy(&mut body, wstd::io::stdout()).await?; + + let trailers = body.finish().await?; + if let Some(trailers) = trailers { + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + } + + Ok(()) +} From d4cf15948b19beeb42750f25979bc89f57be0fcd Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Wed, 15 Jan 2025 09:59:16 -0800 Subject: [PATCH 5/9] Switch from `FutureIncomingResponse` to `impl Future`. --- src/http/client.rs | 51 +++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/src/http/client.rs b/src/http/client.rs index ce193ad..e627ebf 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -6,7 +6,11 @@ use super::{ use crate::http::request::try_into_outgoing; use crate::http::response::try_from_incoming; use crate::io::{self, AsyncOutputStream, AsyncPollable}; +use crate::runtime::WaitFor; use crate::time::Duration; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use wasi::http::types::{ FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody, RequestOptions as WasiRequestOptions, @@ -66,7 +70,10 @@ impl Client { pub async fn start_request( &self, req: Request, - ) -> Result<(OutgoingBody, FutureIncomingResponse)> { + ) -> Result<( + OutgoingBody, + impl Future>>, + )> { let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?; let wasi_body = wasi_req.body().unwrap(); let wasi_stream = wasi_body.write().unwrap(); @@ -76,7 +83,28 @@ impl Client { let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); - Ok((outgoing_body, FutureIncomingResponse(res))) + struct IncomingResponseFuture { + subscription: WaitFor, + wasi: WasiFutureIncomingResponse, + } + impl Future for IncomingResponseFuture { + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match pin_project(self.subscription).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(response) => Poll::Ready(try_from_incoming(response)), + } + } + } + + let subscription = AsyncPollable::new(res.subscribe()).wait_for(); + let future = IncomingResponseFuture { + subscription, + wasi: res, + }; + + Ok((outgoing_body, future)) } /// Finish the body, optionally with trailers. @@ -137,25 +165,6 @@ impl Client { } } -/// Returned from [`Client::start_request`], this represents a handle to a -/// response which has not arrived yet. Call [`FutureIncomingResponse::get`] -/// to wait for the response. -pub struct FutureIncomingResponse(WasiFutureIncomingResponse); - -impl FutureIncomingResponse { - /// Consume this `FutureIncomingResponse`, wait, and return the `Response`. - pub async fn get(self) -> Result> { - // Wait for the response. - AsyncPollable::new(self.0.subscribe()).wait_for().await; - - // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap` - // is to trap if we try and get the response more than once. The final - // `?` is to raise the actual error if there is one. - let res = self.0.get().unwrap().unwrap()?; - try_from_incoming(res) - } -} - #[derive(Default, Debug)] struct RequestOptions { connect_timeout: Option, From 49709b5ccfadb96adbf8f84b1d4137969696c4e4 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Wed, 15 Jan 2025 10:15:26 -0800 Subject: [PATCH 6/9] Use `pin_project`. --- examples/complex_http_client.rs | 2 +- src/http/client.rs | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/examples/complex_http_client.rs b/examples/complex_http_client.rs index b165468..8a7b2b6 100644 --- a/examples/complex_http_client.rs +++ b/examples/complex_http_client.rs @@ -114,7 +114,7 @@ async fn main() -> Result<()> { Client::finish(outgoing_body, Some(trailers))?; - let response = response.get().await?; + let response = response.await?; // Print the response. diff --git a/src/http/client.rs b/src/http/client.rs index e627ebf..d998bf4 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -8,6 +8,7 @@ use crate::http::response::try_from_incoming; use crate::io::{self, AsyncOutputStream, AsyncPollable}; use crate::runtime::WaitFor; use crate::time::Duration; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -83,24 +84,35 @@ impl Client { let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); - struct IncomingResponseFuture { - subscription: WaitFor, - wasi: WasiFutureIncomingResponse, + pin_project! { + struct IncomingResponseFuture { + #[pin] + subscription: Option, + wasi: WasiFutureIncomingResponse, + } } impl Future for IncomingResponseFuture { type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match pin_project(self.subscription).poll(cx) { + let this = self.project(); + match this.subscription.as_pin_mut().expect("make it so").poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(response) => Poll::Ready(try_from_incoming(response)), + Poll::Ready(()) => Poll::Ready( + this.wasi + .get() + .unwrap() + .unwrap() + .map_err(Error::from) + .and_then(try_from_incoming), + ), } } } let subscription = AsyncPollable::new(res.subscribe()).wait_for(); let future = IncomingResponseFuture { - subscription, + subscription: Some(subscription), wasi: res, }; From 904bf4416d85c2acb67c3a5f7448816b31cbf0ea Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Wed, 15 Jan 2025 10:16:55 -0800 Subject: [PATCH 7/9] Simplify. --- src/http/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http/client.rs b/src/http/client.rs index d998bf4..fce93d3 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -87,7 +87,7 @@ impl Client { pin_project! { struct IncomingResponseFuture { #[pin] - subscription: Option, + subscription: WaitFor, wasi: WasiFutureIncomingResponse, } } @@ -96,7 +96,7 @@ impl Client { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - match this.subscription.as_pin_mut().expect("make it so").poll(cx) { + match this.subscription.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(()) => Poll::Ready( this.wasi @@ -112,7 +112,7 @@ impl Client { let subscription = AsyncPollable::new(res.subscribe()).wait_for(); let future = IncomingResponseFuture { - subscription: Some(subscription), + subscription, wasi: res, }; From 98fcdbe4e01ac6836978787c405880f89c714831 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Wed, 15 Jan 2025 10:32:27 -0800 Subject: [PATCH 8/9] Add myself to `authors`. --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 8dcac0c..534d287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ categories = [] authors = [ "Yoshua Wuyts ", "Pat Hickey ", + "Dan Gohman ", ] [workspace.dependencies] From d8a10e433989a758a71508ccfcf9ca615aee505e Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Wed, 15 Jan 2025 10:41:40 -0800 Subject: [PATCH 9/9] Update src/http/client.rs Co-authored-by: Pat Hickey --- src/http/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/http/client.rs b/src/http/client.rs index fce93d3..1569d77 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -85,6 +85,7 @@ impl Client { let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); pin_project! { + #[must_use = "futures do nothing unless polled or .awaited"] struct IncomingResponseFuture { #[pin] subscription: WaitFor,