diff --git a/Cargo.lock b/Cargo.lock index b1b0abca10c9..015e6f62f532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4273,6 +4273,7 @@ dependencies = [ "tokio", "tokio-rustls", "wasmtime", + "wasmtime-wasi", "webpki-roots", ] diff --git a/crates/cli-flags/src/lib.rs b/crates/cli-flags/src/lib.rs index ba568f0d5f7d..e859fd77be0a 100644 --- a/crates/cli-flags/src/lib.rs +++ b/crates/cli-flags/src/lib.rs @@ -74,6 +74,10 @@ pub const SUPPORTED_WASI_MODULES: &[(&str, &str)] = &[ "experimental-wasi-http", "enables support for the WASI HTTP APIs (experimental), see https://github.com/WebAssembly/wasi-http", ), + ( + "experimental-wasi-http-server", + "enables support for the WASI HTTP APIs (experimental) for web serving, see https://github.com/WebAssembly/wasi-http", + ), ]; fn init_file_per_thread_logger(prefix: &'static str) { @@ -512,6 +516,7 @@ fn parse_wasi_modules(modules: &str) -> Result { "experimental-wasi-nn" => Ok(wasi_modules.wasi_nn = enable), "experimental-wasi-threads" => Ok(wasi_modules.wasi_threads = enable), "experimental-wasi-http" => Ok(wasi_modules.wasi_http = enable), + "experimental-wasi-http-server" => Ok(wasi_modules.wasi_http_server = enable), "default" => bail!("'default' cannot be specified with other WASI modules"), _ => bail!("unsupported WASI module '{}'", module), }; @@ -549,6 +554,9 @@ pub struct WasiModules { /// Enable the experimental wasi-http implementation pub wasi_http: bool, + + /// Enable the experimental wasi-http-server implementation + pub wasi_http_server: bool, } impl Default for WasiModules { @@ -559,6 +567,7 @@ impl Default for WasiModules { wasi_nn: false, wasi_threads: false, wasi_http: false, + wasi_http_server: false, } } } @@ -572,6 +581,7 @@ impl WasiModules { wasi_crypto: false, wasi_threads: false, wasi_http: false, + wasi_http_server: false, } } } @@ -734,6 +744,7 @@ mod test { wasi_nn: false, wasi_threads: false, wasi_http: false, + wasi_http_server: false, } ); } @@ -748,7 +759,8 @@ mod test { wasi_crypto: false, wasi_nn: false, wasi_threads: false, - wasi_http: false + wasi_http: false, + wasi_http_server: false, } ); } @@ -768,6 +780,7 @@ mod test { wasi_nn: true, wasi_threads: false, wasi_http: false, + wasi_http_server: false, } ); } @@ -784,6 +797,45 @@ mod test { wasi_nn: false, wasi_threads: false, wasi_http: false, + wasi_http_server: false, + } + ); + } + + #[test] + fn test_http_client() { + let options = + CommonOptions::try_parse_from(vec!["foo", "--wasi-modules=experimental-wasi-http"]) + .unwrap(); + assert_eq!( + options.wasi_modules.unwrap(), + WasiModules { + wasi_common: true, + wasi_crypto: false, + wasi_nn: false, + wasi_threads: false, + wasi_http: true, + wasi_http_server: false, + } + ); + } + + #[test] + fn test_http_server() { + let options = CommonOptions::try_parse_from(vec![ + "foo", + "--wasi-modules=experimental-wasi-http-server", + ]) + .unwrap(); + assert_eq!( + options.wasi_modules.unwrap(), + WasiModules { + wasi_common: true, + wasi_crypto: false, + wasi_nn: false, + wasi_threads: false, + wasi_http: false, + wasi_http_server: true, } ); } diff --git a/crates/test-programs/tests/wasi-http.rs b/crates/test-programs/tests/wasi-http.rs index 0b1a4fd70567..67bd47266832 100644 --- a/crates/test-programs/tests/wasi-http.rs +++ b/crates/test-programs/tests/wasi-http.rs @@ -73,7 +73,7 @@ pub fn run(name: &str) -> anyhow::Result<()> { } wasmtime_wasi::sync::add_to_linker(&mut linker, |cx: &mut Ctx| &mut cx.wasi)?; - wasmtime_wasi_http::add_to_linker(&mut linker, |cx: &mut Ctx| &mut cx.http)?; + wasmtime_wasi_http::add_to_linker(&mut linker, |cx: &mut Ctx| &mut cx.http, false)?; // Create our wasi context. let builder = WasiCtxBuilder::new().inherit_stdio().arg(name)?; diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index f91f9b5233e4..b09bc9626c7a 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -16,7 +16,8 @@ http = { version = "0.2.9" } http-body = "1.0.0-rc.2" http-body-util = "0.1.0-rc.2" thiserror = { workspace = true } -wasmtime = { workspace = true, features = ['component-model'] } +wasmtime = { workspace = true, features = ['component-model', 'async'] } +wasmtime-wasi = { workspace = true, features = [ 'tokio' ] } # The `ring` crate, used to implement TLS, does not build on riscv64 or s390x [target.'cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))'.dependencies] diff --git a/crates/wasi-http/src/component_impl.rs b/crates/wasi-http/src/component_impl.rs index b0b27941c851..d14f4236a7be 100644 --- a/crates/wasi-http/src/component_impl.rs +++ b/crates/wasi-http/src/component_impl.rs @@ -81,7 +81,23 @@ fn read_option_string( } } -fn allocate_guest_pointer(caller: &mut Caller<'_, T>, size: u32) -> anyhow::Result { +async fn allocate_guest_pointer_async( + caller: &mut Caller<'_, T>, + size: u32, +) -> anyhow::Result { + let realloc = caller + .get_export("cabi_realloc") + .ok_or_else(|| anyhow!("missing required export cabi_realloc"))?; + let func = realloc + .into_func() + .ok_or_else(|| anyhow!("cabi_realloc must be a func"))?; + let typed = func.typed::<(u32, u32, u32, u32), u32>(caller.as_context())?; + Ok(typed + .call_async(caller.as_context_mut(), (0, 0, 4, size)) + .await?) +} + +fn allocate_guest_pointer(caller: &mut Caller<'_, T>, size: u32) -> anyhow::Result { let realloc = caller .get_export("cabi_realloc") .ok_or_else(|| anyhow!("missing required export cabi_realloc"))?; @@ -92,6 +108,44 @@ fn allocate_guest_pointer(caller: &mut Caller<'_, T>, size: u32) -> anyhow::R Ok(typed.call(caller.as_context_mut(), (0, 0, 4, size))?) } +async fn write_string_to_memory_async( + caller: &mut Caller<'_, T>, + ptr: u32, + s: &String, +) -> anyhow::Result<()> { + let len: u32 = s.len().try_into()?; + let str_ptr = allocate_guest_pointer_async(caller, len).await?; + + let memory = memory_get(caller)?; + + memory.write(caller.as_context_mut(), str_ptr as _, s.as_bytes())?; + + let result: [u32; 2] = [str_ptr, len]; + let raw = u32_array_to_u8(&result); + + memory.write(caller.as_context_mut(), ptr as _, &raw)?; + Ok(()) +} + +fn write_string_to_memory( + caller: &mut Caller<'_, T>, + ptr: u32, + s: &String, +) -> anyhow::Result<()> { + let len: u32 = s.len().try_into()?; + let str_ptr = allocate_guest_pointer(caller, len)?; + + let memory = memory_get(caller)?; + + memory.write(caller.as_context_mut(), str_ptr as _, s.as_bytes())?; + + let result: [u32; 2] = [str_ptr, len]; + let raw = u32_array_to_u8(&result); + + memory.write(caller.as_context_mut(), ptr as _, &raw)?; + Ok(()) +} + fn u32_array_to_u8(arr: &[u32]) -> Vec { let mut result = std::vec::Vec::new(); for val in arr.iter() { @@ -103,9 +157,10 @@ fn u32_array_to_u8(arr: &[u32]) -> Vec { result } -pub fn add_component_to_linker( +pub fn add_component_to_linker( linker: &mut wasmtime::Linker, get_cx: impl Fn(&mut T) -> &mut WasiHttp + Send + Sync + Copy + 'static, + is_async: bool, ) -> anyhow::Result<()> { linker.func_wrap( "wasi:http/outgoing-handler", @@ -281,7 +336,7 @@ pub fn add_component_to_linker( let ctx = get_cx(caller.data_mut()); let stream = ctx.incoming_response_consume(response)?.unwrap_or(0); - let memory = memory_get(&mut caller).unwrap(); + let memory = memory_get(&mut caller)?; // First == is_some // Second == stream_id @@ -354,6 +409,15 @@ pub fn add_component_to_linker( Ok(()) }, )?; + linker.func_wrap( + "wasi:http/types", + "drop-outgoing-response", + move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + ctx.drop_outgoing_response(id)?; + Ok(()) + }, + )?; linker.func_wrap( "wasi:http/types", "drop-incoming-response", @@ -392,28 +456,63 @@ pub fn add_component_to_linker( Ok(ctx.new_fields(vec)?) }, )?; - linker.func_wrap( - "wasi:io/streams", - "read", - move |mut caller: Caller<'_, T>, stream: u32, len: u64, ptr: u32| -> anyhow::Result<()> { - let ctx = get_cx(caller.data_mut()); - let bytes_tuple = ctx.read(stream, len)??; - let bytes = bytes_tuple.0; - let done = match bytes_tuple.1 { - true => 1, - false => 0, - }; - let body_len: u32 = bytes.len().try_into()?; - let out_ptr = allocate_guest_pointer(&mut caller, body_len)?; - let result: [u32; 4] = [0, out_ptr, body_len, done]; - let raw = u32_array_to_u8(&result); + if is_async { + linker.func_wrap3_async( + "wasi:io/streams", + "read", + move |mut caller: Caller<'_, T>, + stream: u32, + len: u64, + ptr: u32| + -> Box> + Send> { + Box::new(async move { + let ctx = get_cx(caller.data_mut()); + let bytes_tuple = ctx.read(stream, len)??; + let bytes = bytes_tuple.0; + let done = match bytes_tuple.1 { + true => 1, + false => 0, + }; + let body_len: u32 = bytes.len().try_into()?; + let out_ptr = allocate_guest_pointer_async(&mut caller, body_len).await?; + let result: [u32; 4] = [0, out_ptr, body_len, done]; + let raw = u32_array_to_u8(&result); + + let memory = memory_get(&mut caller)?; + memory.write(caller.as_context_mut(), out_ptr as _, &bytes)?; + memory.write(caller.as_context_mut(), ptr as _, &raw)?; + Ok(()) + }) + }, + )?; + } else { + linker.func_wrap( + "wasi:io/streams", + "read", + move |mut caller: Caller<'_, T>, + stream: u32, + len: u64, + ptr: u32| + -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let bytes_tuple = ctx.read(stream, len)??; + let bytes = bytes_tuple.0; + let done = match bytes_tuple.1 { + true => 1, + false => 0, + }; + let body_len: u32 = bytes.len().try_into()?; + let out_ptr = allocate_guest_pointer(&mut caller, body_len)?; + let result: [u32; 4] = [0, out_ptr, body_len, done]; + let raw = u32_array_to_u8(&result); - let memory = memory_get(&mut caller)?; - memory.write(caller.as_context_mut(), out_ptr as _, &bytes)?; - memory.write(caller.as_context_mut(), ptr as _, &raw)?; - Ok(()) - }, - )?; + let memory = memory_get(&mut caller)?; + memory.write(caller.as_context_mut(), out_ptr as _, &bytes)?; + memory.write(caller.as_context_mut(), ptr as _, &raw)?; + Ok(()) + }, + )?; + } linker.func_wrap( "wasi:io/streams", "write", @@ -437,49 +536,221 @@ pub fn add_component_to_linker( Ok(()) }, )?; - linker.func_wrap( - "wasi:http/types", - "fields-entries", - move |mut caller: Caller<'_, T>, fields: u32, out_ptr: u32| -> anyhow::Result<()> { - let ctx = get_cx(caller.data_mut()); - let entries = ctx.fields_entries(fields)?; - - let header_len = entries.len(); - let tuple_ptr = allocate_guest_pointer(&mut caller, (16 * header_len).try_into()?)?; - let mut ptr = tuple_ptr; - for item in entries.iter() { - let name = &item.0; - let value = &item.1; - let name_len: u32 = name.len().try_into()?; - let value_len: u32 = value.len().try_into()?; + if is_async { + linker.func_wrap2_async( + "wasi:http/types", + "fields-entries", + move |mut caller: Caller<'_, T>, + fields: u32, + out_ptr: u32| + -> Box> + Send> { + Box::new(async move { + let ctx = get_cx(caller.data_mut()); + let entries = ctx.fields_entries(fields)?; + + let header_len = entries.len(); + let tuple_ptr = + allocate_guest_pointer_async(&mut caller, (16 * header_len).try_into()?) + .await?; + let mut ptr = tuple_ptr; + for item in entries.iter() { + let name = &item.0; + let value = &item.1; + let name_len: u32 = name.len().try_into()?; + let value_len: u32 = value.len().try_into()?; + + let name_ptr = allocate_guest_pointer_async(&mut caller, name_len).await?; + let value_ptr = + allocate_guest_pointer_async(&mut caller, value_len).await?; + + let memory = memory_get(&mut caller)?; + memory.write(caller.as_context_mut(), name_ptr as _, &name.as_bytes())?; + memory.write(caller.as_context_mut(), value_ptr as _, value)?; + + let pair: [u32; 4] = [name_ptr, name_len, value_ptr, value_len]; + let raw_pair = u32_array_to_u8(&pair); + memory.write(caller.as_context_mut(), ptr as _, &raw_pair)?; + + ptr = ptr + 16; + } - let name_ptr = allocate_guest_pointer(&mut caller, name_len)?; - let value_ptr = allocate_guest_pointer(&mut caller, value_len)?; + let memory = memory_get(&mut caller)?; + let result: [u32; 2] = [tuple_ptr, header_len.try_into()?]; + let raw = u32_array_to_u8(&result); + memory.write(caller.as_context_mut(), out_ptr as _, &raw)?; + Ok(()) + }) + }, + )?; + } else { + linker.func_wrap( + "wasi:http/types", + "fields-entries", + move |mut caller: Caller<'_, T>, fields: u32, out_ptr: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let entries = ctx.fields_entries(fields)?; + + let header_len = entries.len(); + let tuple_ptr = allocate_guest_pointer(&mut caller, (16 * header_len).try_into()?)?; + let mut ptr = tuple_ptr; + for item in entries.iter() { + let name = &item.0; + let value = &item.1; + let name_len: u32 = name.len().try_into()?; + let value_len: u32 = value.len().try_into()?; + + let name_ptr = allocate_guest_pointer(&mut caller, name_len)?; + let value_ptr = allocate_guest_pointer(&mut caller, value_len)?; + + let memory = memory_get(&mut caller)?; + memory.write(caller.as_context_mut(), name_ptr as _, &name.as_bytes())?; + memory.write(caller.as_context_mut(), value_ptr as _, value)?; + + let pair: [u32; 4] = [name_ptr, name_len, value_ptr, value_len]; + let raw_pair = u32_array_to_u8(&pair); + memory.write(caller.as_context_mut(), ptr as _, &raw_pair)?; + + ptr = ptr + 16; + } let memory = memory_get(&mut caller)?; - memory.write(caller.as_context_mut(), name_ptr as _, &name.as_bytes())?; - memory.write(caller.as_context_mut(), value_ptr as _, value)?; - - let pair: [u32; 4] = [name_ptr, name_len, value_ptr, value_len]; - let raw_pair = u32_array_to_u8(&pair); - memory.write(caller.as_context_mut(), ptr as _, &raw_pair)?; - - ptr = ptr + 16; - } - + let result: [u32; 2] = [tuple_ptr, header_len.try_into()?]; + let raw = u32_array_to_u8(&result); + memory.write(caller.as_context_mut(), out_ptr as _, &raw)?; + Ok(()) + }, + )?; + } + linker.func_wrap( + "wasi:http/types", + "incoming-response-headers", + move |mut caller: Caller<'_, T>, handle: u32| -> anyhow::Result { + let ctx = get_cx(caller.data_mut()); + Ok(ctx.incoming_response_headers(handle)?) + }, + )?; + linker.func_wrap( + "wasi:http/types", + "incoming-request-headers", + move |mut caller: Caller<'_, T>, handle: u32| -> anyhow::Result { + let ctx = get_cx(caller.data_mut()); + let h = ctx.incoming_request_headers(handle)?; + Ok(h) + }, + )?; + if is_async { + linker.func_wrap2_async( + "wasi:http/types", + "incoming-request-authority", + move |mut caller: Caller<'_, T>, + request: u32, + ptr: u32| + -> Box> + Send> { + Box::new(async move { + let ctx = get_cx(caller.data_mut()); + let authority = ctx.incoming_request_authority(request)?; + let authority_str = authority.unwrap_or("".to_string()); + write_string_to_memory_async(&mut caller, ptr, &authority_str).await?; + Ok(()) + }) + }, + )?; + linker.func_wrap2_async( + "wasi:http/types", + "incoming-request-path-with-query", + move |mut caller: Caller<'_, T>, + request: u32, + ptr: u32| + -> Box> + Send> { + Box::new(async move { + let ctx = get_cx(caller.data_mut()); + let path = ctx.incoming_request_path_with_query(request)?; + let path_str = path.unwrap_or("".to_string()); + write_string_to_memory_async(&mut caller, ptr, &path_str).await?; + Ok(()) + }) + }, + )?; + } else { + linker.func_wrap( + "wasi:http/types", + "incoming-request-authority", + move |mut caller: Caller<'_, T>, request: u32, ptr: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let authority = ctx + .incoming_request_authority(request)? + .unwrap_or("".to_string()); + write_string_to_memory(&mut caller, ptr, &authority)?; + Ok(()) + }, + )?; + linker.func_wrap( + "wasi:http/types", + "incoming-request-path", + move |mut caller: Caller<'_, T>, request: u32, ptr: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let path = ctx + .incoming_request_path_with_query(request)? + .unwrap_or("".to_string()); + write_string_to_memory(&mut caller, ptr, &path)?; + Ok(()) + }, + )?; + } + linker.func_wrap( + "wasi:http/types", + "incoming-request-method", + move |mut caller: Caller<'_, T>, request: u32, ptr: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let method = ctx.incoming_request_method(request)?; let memory = memory_get(&mut caller)?; - let result: [u32; 2] = [tuple_ptr, header_len.try_into()?]; + let result: [u32; 1] = [method.into()]; let raw = u32_array_to_u8(&result); - memory.write(caller.as_context_mut(), out_ptr as _, &raw)?; + memory.write(caller.as_context_mut(), ptr as _, &raw)?; Ok(()) }, )?; linker.func_wrap( "wasi:http/types", - "incoming-response-headers", - move |mut caller: Caller<'_, T>, handle: u32| -> anyhow::Result { + "new-outgoing-response", + move |mut caller: Caller<'_, T>, status: i32, headers: u32| -> anyhow::Result { let ctx = get_cx(caller.data_mut()); - Ok(ctx.incoming_response_headers(handle)?) + match ctx.new_outgoing_response(status.try_into()?, headers) { + Ok(id) => Ok(id), + Err(_) => Ok(0), + } + }, + )?; + linker.func_wrap( + "wasi:http/types", + "outgoing-response-write", + move |mut caller: Caller<'_, T>, response: u32, ptr: u32| -> anyhow::Result<()> { + let ctx = get_cx(caller.data_mut()); + let result: [u32; 2] = match ctx.outgoing_response_write(response) { + Ok(id) => [0, id.unwrap()], + Err(_) => [1, 0], + }; + let raw = u32_array_to_u8(&result); + let memory = memory_get(&mut caller)?; + memory.write(caller.as_context_mut(), ptr as _, &raw)?; + Ok(()) + }, + )?; + linker.func_wrap( + "wasi:http/types", + "set-response-outparam", + move |mut caller: Caller<'_, T>, + outparam: u32, + is_err: u32, + response: u32, + _a: u32, + _b: u32| + -> anyhow::Result { + if is_err == 0 { + let ctx = get_cx(caller.data_mut()); + ctx.set_response_outparam(outparam, Ok(response))?.unwrap(); + } + Ok(0) }, )?; Ok(()) diff --git a/crates/wasi-http/src/http_server.rs b/crates/wasi-http/src/http_server.rs new file mode 100644 index 000000000000..5112a24830ed --- /dev/null +++ b/crates/wasi-http/src/http_server.rs @@ -0,0 +1,153 @@ +use crate::wasi::http::types::Method; +use crate::WasiHttp; +use http::{Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::body::{Bytes, Incoming}; +use hyper::server::conn::http1; +use std::future::Future; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::pin::Pin; +use tokio::net::TcpListener; +use tokio::runtime::Runtime; +use wasmtime::{AsContext, AsContextMut, Store}; + +use crate::r#struct::ActiveRequest; + +struct Host { + wasi_http: WasiHttp, + wasi: wasmtime_wasi::WasiCtx, +} + +impl Host { + pub fn new() -> Self { + Self { + wasi_http: WasiHttp::new(), + wasi: wasmtime_wasi::WasiCtxBuilder::new() + .stdin(Box::new(wasmtime_wasi::stdio::stdin())) + .build(), + } + } +} + +#[derive(Clone, Copy)] +struct HttpHandler<'a> { + engine: &'a wasmtime::Engine, + module: &'a PathBuf, +} + +impl hyper::service::Service> for HttpHandler<'_> { + type Response = Response>; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn call(&mut self, req: Request) -> Self::Future { + let mut host = Host::new(); + let ptr = host.wasi_http.request_id_base; + host.wasi_http.request_id_base += 1; + + let mut request = ActiveRequest::new(ptr); + request.method = Method::new(req.method()); + request.scheme = match req.uri().scheme() { + Some(s) => Some(s.into()), + None => None, + }; + request.authority = match req.uri().authority() { + Some(s) => s.to_string(), + None => "".to_string(), + }; + request.path_with_query = + req.uri().path().to_string() + "?" + req.uri().query().unwrap_or(""); + + for (name, value) in req.headers().iter() { + let val = value.to_str().unwrap().to_string(); + let key = name.to_string(); + match request.headers.get_mut(&key) { + Some(vec) => vec.push(val.into()), + None => { + let mut vec = std::vec::Vec::new(); + vec.push(val.into_bytes()); + request.headers.insert(key.to_string(), vec); + } + } + } + + host.wasi_http.requests.insert(ptr, request); + let outparam_id = host.wasi_http.outparams_id_base; + host.wasi_http.outparams_id_base += 1; + + let mut linker = wasmtime::Linker::new(self.engine); + let mut store = Store::new(self.engine, host); + let path = self.module.clone(); + + Box::pin(async move { + wasmtime_wasi::tokio::add_to_linker(&mut linker, |h: &mut Host| &mut h.wasi).unwrap(); + crate::add_to_linker(&mut linker, |h: &mut Host| &mut h.wasi_http, true).unwrap(); + + let module = wasmtime::Module::from_file(&store.engine(), path).unwrap(); + let instance = linker.instantiate_async(&mut store, &module).await.unwrap(); + let func = instance + .get_func(&mut store, "wasi:http/incoming-handler#handle") + .unwrap(); + let typed = func.typed::<(u32, u32), ()>(store.as_context()).unwrap(); + typed + .call_async(store.as_context_mut(), (ptr, outparam_id)) + .await + .unwrap(); + + let host = store.data_mut(); + let response_id = host + .wasi_http + .response_outparams + .get(&outparam_id) + .unwrap() + .unwrap(); + let response = host.wasi_http.responses.get(&response_id).unwrap(); + let body = Full::::new( + host.wasi_http + .streams + .entry(response.body) + .or_default() + .into(), + ); + let code = StatusCode::from_u16(response.status).unwrap(); + let res = Ok(Response::builder().status(code).body(body).unwrap()); + res + }) + } +} + +// adapted from https://docs.rs/hyper/1.0.0-rc.3/hyper/server/conn/index.html +pub async fn async_http_server(engine: &wasmtime::Engine, module: &PathBuf) { + let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); + + let tcp_listener = TcpListener::bind(addr).await.unwrap(); + let server: HttpHandler = HttpHandler { + engine: engine, + module: module, + }; + + loop { + let (tcp_stream, _) = tcp_listener.accept().await.unwrap(); + if let Err(http_err) = http1::Builder::new() + .keep_alive(true) + .serve_connection(tcp_stream, server) + .await + { + eprintln!("Error while serving HTTP connection: {}", http_err); + } + } +} + +pub fn spawn_http_server(engine: &wasmtime::Engine, module: &PathBuf) { + let (handle, _runtime) = match tokio::runtime::Handle::try_current() { + Ok(h) => (h, None), + Err(_) => { + let rt = Runtime::new().unwrap(); + let _enter = rt.enter(); + (rt.handle().clone(), Some(rt)) + } + }; + + handle.block_on(async_http_server(engine, module)) +} diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index ce5211ef8c17..7853eb451b35 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -1,10 +1,13 @@ use crate::component_impl::add_component_to_linker; +use crate::http_server::async_http_server; +use crate::http_server::spawn_http_server; pub use crate::r#struct::WasiHttp; wasmtime::component::bindgen!({ path: "wasi-http/wit", world: "proxy"}); pub mod component_impl; pub mod http_impl; +pub mod http_server; pub mod streams_impl; pub mod r#struct; pub mod types_impl; @@ -19,9 +22,18 @@ pub fn add_to_component_linker( Ok(()) } -pub fn add_to_linker( +pub fn add_to_linker( linker: &mut wasmtime::Linker, get_cx: impl Fn(&mut T) -> &mut WasiHttp + Send + Sync + Copy + 'static, + is_async: bool, ) -> anyhow::Result<()> { - add_component_to_linker(linker, get_cx) + add_component_to_linker(linker, get_cx, is_async) +} + +pub fn run_http(engine: &wasmtime::Engine, module: &std::path::PathBuf) { + spawn_http_server(engine, module); +} + +pub async fn async_run_http(engine: &wasmtime::Engine, module: &std::path::PathBuf) { + async_http_server(engine, module).await } diff --git a/crates/wasi-http/src/struct.rs b/crates/wasi-http/src/struct.rs index d73fd7c6d562..580e8bcf54ac 100644 --- a/crates/wasi-http/src/struct.rs +++ b/crates/wasi-http/src/struct.rs @@ -8,6 +8,50 @@ pub struct Stream { pub data: BytesMut, } +impl crate::wasi::http::types::Method { + pub fn new(m: &hyper::Method) -> Self { + match m { + &hyper::Method::GET => Method::Get, + &hyper::Method::PUT => Method::Put, + &hyper::Method::POST => Method::Post, + &hyper::Method::DELETE => Method::Delete, + &hyper::Method::OPTIONS => Method::Options, + &hyper::Method::HEAD => Method::Head, + &hyper::Method::CONNECT => Method::Connect, + &hyper::Method::TRACE => Method::Trace, + &hyper::Method::PATCH => Method::Patch, + _ => panic!("unknown method!"), + } + } +} + +impl From<&http::uri::Scheme> for crate::wasi::http::types::Scheme { + fn from(s: &http::uri::Scheme) -> crate::wasi::http::types::Scheme { + match s.as_str() { + "http" => crate::wasi::http::types::Scheme::Http, + "https" => crate::wasi::http::types::Scheme::Https, + _ => panic!("unsupported scheme!"), + } + } +} + +impl From for u32 { + fn from(e: crate::wasi::http::types::Method) -> u32 { + match e { + Method::Get => 0, + Method::Head => 1, + Method::Post => 2, + Method::Put => 3, + Method::Delete => 4, + Method::Connect => 5, + Method::Options => 6, + Method::Trace => 7, + Method::Patch => 8, + _ => panic!("unknown method"), + } + } +} + #[derive(Clone)] pub struct WasiHttp { pub request_id_base: u32, @@ -15,17 +59,19 @@ pub struct WasiHttp { pub fields_id_base: u32, pub streams_id_base: u32, pub future_id_base: u32, + pub outparams_id_base: u32, + pub requests: HashMap, pub responses: HashMap, pub fields: HashMap>>>, pub streams: HashMap, pub futures: HashMap, + pub response_outparams: HashMap>, } #[derive(Clone)] pub struct ActiveRequest { pub id: u32, - pub active_request: bool, pub method: Method, pub scheme: Option, pub path_with_query: String, @@ -37,7 +83,6 @@ pub struct ActiveRequest { #[derive(Clone)] pub struct ActiveResponse { pub id: u32, - pub active_response: bool, pub status: u16, pub body: u32, pub response_headers: HashMap>>, @@ -55,7 +100,6 @@ impl ActiveRequest { pub fn new(id: u32) -> Self { Self { id, - active_request: false, method: Method::Get, scheme: Some(Scheme::Http), path_with_query: "".to_string(), @@ -70,7 +114,6 @@ impl ActiveResponse { pub fn new(id: u32) -> Self { Self { id, - active_response: false, status: 0, body: 0, response_headers: HashMap::new(), @@ -106,6 +149,12 @@ impl From for Stream { } } +impl From<&mut Stream> for bytes::Bytes { + fn from(stream: &mut Stream) -> Self { + stream.data.clone().into() + } +} + impl WasiHttp { pub fn new() -> Self { Self { @@ -114,11 +163,14 @@ impl WasiHttp { fields_id_base: 1, streams_id_base: 1, future_id_base: 1, + outparams_id_base: 1, + requests: HashMap::new(), responses: HashMap::new(), fields: HashMap::new(), streams: HashMap::new(), futures: HashMap::new(), + response_outparams: HashMap::new(), } } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 5757acebf69a..92f48b37f091 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -1,4 +1,4 @@ -use crate::r#struct::{ActiveRequest, Stream}; +use crate::r#struct::{ActiveRequest, ActiveResponse, Stream}; use crate::wasi::http::types::{ Error, Fields, FutureIncomingResponse, Headers, Host, IncomingRequest, IncomingResponse, IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, ResponseOutparam, @@ -45,13 +45,12 @@ impl Host for WasiHttp { name: String, value: Vec>, ) -> wasmtime::Result<()> { - match self.fields.get_mut(&fields) { - Some(m) => { - m.insert(name, value.clone()); - Ok(()) - } - None => bail!("fields not found"), - } + let m = self + .fields + .get_mut(&fields) + .ok_or_else(|| anyhow!("unknown fields: {fields}"))?; + m.insert(name, value.clone()); + Ok(()) } fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> { match self.fields.get_mut(&fields) { @@ -81,10 +80,10 @@ impl Host for WasiHttp { Ok(()) } fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result)>> { - let field_map = match self.fields.get(&fields) { - Some(m) => m, - None => bail!("fields not found."), - }; + let field_map = self + .fields + .get(&fields) + .ok_or_else(|| anyhow!("unknown fields: {fields}"))?; let mut result = Vec::new(); for (name, value) in field_map { result.push((name.clone(), value[0].clone())); @@ -115,13 +114,20 @@ impl Host for WasiHttp { } fn finish_outgoing_stream( &mut self, - _s: OutgoingStream, - _trailers: Option, + s: OutgoingStream, + trailers: Option, ) -> wasmtime::Result<()> { - bail!("unimplemented: finish_outgoing_stream") + match trailers { + Some(_) => bail!("trailers not supported!"), + None => { + self.streams.remove(&s); + Ok(()) + } + } } - fn drop_incoming_request(&mut self, _request: IncomingRequest) -> wasmtime::Result<()> { - bail!("unimplemented: drop_incoming_request") + fn drop_incoming_request(&mut self, request: IncomingRequest) -> wasmtime::Result<()> { + self.requests.remove(&request); + Ok(()) } fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { if let Entry::Occupied(e) = self.requests.entry(request) { @@ -130,35 +136,63 @@ impl Host for WasiHttp { } Ok(()) } - fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result { - bail!("unimplemented: incoming_request_method") + fn incoming_request_method(&mut self, request: IncomingRequest) -> wasmtime::Result { + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + Ok(r.method.clone()) } fn incoming_request_path_with_query( &mut self, - _request: IncomingRequest, + request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_path") + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + Ok(Some(r.path_with_query.clone())) } fn incoming_request_scheme( &mut self, - _request: IncomingRequest, + request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_scheme") + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + Ok(r.scheme.clone()) } fn incoming_request_authority( &mut self, - _request: IncomingRequest, + request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_authority") + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + Ok(Some(r.authority.clone())) } - fn incoming_request_headers(&mut self, _request: IncomingRequest) -> wasmtime::Result { - bail!("unimplemented: incoming_request_headers") + fn incoming_request_headers(&mut self, request: IncomingRequest) -> wasmtime::Result { + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + + let fields_id = self.fields_id_base; + self.fields_id_base = self.fields_id_base + 1; + self.fields.insert(fields_id, r.headers.clone()); + Ok(fields_id) } fn incoming_request_consume( &mut self, - _request: IncomingRequest, + request: IncomingRequest, ) -> wasmtime::Result> { - bail!("unimplemented: incoming_request_consume") + let r = self + .requests + .get(&request) + .ok_or_else(|| anyhow!("unknown request: {request}"))?; + Ok(Ok(r.body)) } fn new_outgoing_request( &mut self, @@ -198,15 +232,20 @@ impl Host for WasiHttp { } Ok(Ok(req.body)) } - fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> { - bail!("unimplemented: drop_response_outparam") + fn drop_response_outparam(&mut self, response: ResponseOutparam) -> wasmtime::Result<()> { + self.response_outparams.remove(&response); + Ok(()) } fn set_response_outparam( &mut self, - _outparam: ResponseOutparam, - _response: Result, + outparam: ResponseOutparam, + response: Result, ) -> wasmtime::Result> { - bail!("unimplemented: set_response_outparam") + match response { + Ok(r) => self.response_outparams.insert(outparam, Some(r)), + Err(_) => self.response_outparams.insert(outparam, None), + }; + Ok(Ok(())) } fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { if let Entry::Occupied(e) = self.responses.entry(response) { @@ -215,8 +254,9 @@ impl Host for WasiHttp { } Ok(()) } - fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> { - bail!("unimplemented: drop_outgoing_response") + fn drop_outgoing_response(&mut self, response: OutgoingResponse) -> wasmtime::Result<()> { + self.responses.remove(&response); + Ok(()) } fn incoming_response_status( &mut self, @@ -255,16 +295,32 @@ impl Host for WasiHttp { } fn new_outgoing_response( &mut self, - _status_code: StatusCode, - _headers: Headers, + status_code: StatusCode, + headers: Headers, ) -> wasmtime::Result { - bail!("unimplemented: new_outgoing_response") + let response_id = self.response_id_base; + self.response_id_base = self.response_id_base + 1; + let mut res = ActiveResponse::new(response_id); + res.status = status_code; + match self.fields.get(&headers) { + Some(h) => res.response_headers = h.clone(), + None => {} + }; + self.responses.insert(response_id, res); + Ok(response_id) } fn outgoing_response_write( &mut self, - _response: OutgoingResponse, + response: OutgoingResponse, ) -> wasmtime::Result> { - bail!("unimplemented: outgoing_response_write") + match self.responses.get_mut(&response) { + Some(res) => { + res.body = self.streams_id_base; + self.streams_id_base = self.streams_id_base + 1; + Ok(Ok(res.body)) + } + None => bail!("response not found"), + } } fn drop_future_incoming_response( &mut self, diff --git a/src/commands/run.rs b/src/commands/run.rs index 95c5fc43ef0b..b20e5f937cc7 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -35,7 +35,7 @@ use wasmtime_wasi_crypto::WasiCryptoCtx; use wasmtime_wasi_threads::WasiThreadsCtx; #[cfg(feature = "wasi-http")] -use wasmtime_wasi_http::WasiHttp; +use wasmtime_wasi_http::{run_http, WasiHttp}; fn parse_module(s: &OsStr) -> anyhow::Result { // Do not accept wasmtime subcommand names as the module name @@ -281,6 +281,14 @@ impl RunCommand { None => {} } + if self + .common + .wasi_modules + .unwrap_or(WasiModules::default()) + .wasi_http_server + { + config.async_support(true); + } let engine = Engine::new(&config)?; let preopen_sockets = self.compute_preopen_sockets()?; @@ -358,17 +366,29 @@ impl RunCommand { ))?; } - // Load the main wasm module. - match self - .load_main_module(&mut store, &mut linker, module, modules, &argv[0]) - .with_context(|| format!("failed to run main module `{}`", self.module.display())) + if self + .common + .wasi_modules + .unwrap_or(WasiModules::default()) + .wasi_http_server { - Ok(()) => (), - Err(e) => { - // Exit the process if Wasmtime understands the error; - // otherwise, fall back on Rust's default error printing/return - // code. - return Err(maybe_exit_on_error(e)); + #[cfg(feature = "wasi-http")] + run_http(&engine, &self.module); + #[cfg(not(feature = "wasi-http"))] + panic!("Requesting HTTP server, but wasi-http feature is not enabled."); + } else { + // Load the main wasm module. + match self + .load_main_module(&mut store, &mut linker, module, modules, &argv[0]) + .with_context(|| format!("failed to run main module `{}`", self.module.display())) + { + Ok(()) => (), + Err(e) => { + // Exit the process if Wasmtime understands the error; + // otherwise, fall back on Rust's default error printing/return + // code. + return Err(maybe_exit_on_error(e)); + } } } @@ -784,9 +804,11 @@ fn populate_with_wasi( #[cfg(feature = "wasi-http")] { let w_http = WasiHttp::new(); - wasmtime_wasi_http::add_to_linker(linker, |host: &mut Host| { - host.wasi_http.as_mut().unwrap() - })?; + wasmtime_wasi_http::add_to_linker( + linker, + |host: &mut Host| host.wasi_http.as_mut().unwrap(), + false, + )?; store.data_mut().wasi_http = Some(w_http); } }