From c7b3f257bce3f17b3034b0def6ef97b95f45a121 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Tue, 18 Apr 2023 04:12:31 +0000 Subject: [PATCH 1/5] Make streams owned by request/response that they are tied to. --- crates/wasi-http/src/http_impl.rs | 13 ++++++-- crates/wasi-http/src/streams_impl.rs | 47 +++++++++++++++++++++++----- crates/wasi-http/src/struct.rs | 17 +++++++++- crates/wasi-http/src/types_impl.rs | 16 ++++++++-- 4 files changed, 80 insertions(+), 13 deletions(-) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 69c94d8489f9..171b5897192b 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -1,5 +1,5 @@ use crate::r#struct::ActiveResponse; -pub use crate::r#struct::WasiHttp; +use crate::r#struct::{Stream, WasiHttp}; use crate::types::{RequestOptions, Scheme}; #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] use anyhow::anyhow; @@ -183,7 +183,8 @@ impl WasiHttp { let body = Full::::new( self.streams .get(&request.body) - .unwrap_or(&Bytes::new()) + .unwrap_or(&Stream::new()) + .data .clone(), ); let t = timeout(first_bytes_timeout, sender.send_request(call.body(body)?)).await?; @@ -222,7 +223,13 @@ impl WasiHttp { } response.body = self.streams_id_base; self.streams_id_base = self.streams_id_base + 1; - self.streams.insert(response.body, buf.freeze()); + self.streams.insert( + response.body, + Stream { + closed: false, + data: buf.freeze(), + }, + ); self.responses.insert(response_id, response); Ok(response_id) } diff --git a/crates/wasi-http/src/streams_impl.rs b/crates/wasi-http/src/streams_impl.rs index 6b88fae7628b..35531882dc8b 100644 --- a/crates/wasi-http/src/streams_impl.rs +++ b/crates/wasi-http/src/streams_impl.rs @@ -1,4 +1,5 @@ use crate::poll::Pollable; +use crate::r#struct::Stream; use crate::streams::{InputStream, OutputStream, StreamError}; use crate::WasiHttp; use anyhow::{anyhow, bail}; @@ -11,10 +12,14 @@ impl crate::streams::Host for WasiHttp { stream: InputStream, len: u64, ) -> wasmtime::Result, bool), StreamError>> { - let s = self + let st = self .streams .get_mut(&stream) .ok_or_else(|| anyhow!("stream not found: {stream}"))?; + if st.closed { + bail!("stream is dropped!"); + } + let s = &mut st.data; if len == 0 { Ok(Ok((bytes::Bytes::new().to_vec(), s.len() > 0))) } else if s.len() > len.try_into()? { @@ -31,10 +36,14 @@ impl crate::streams::Host for WasiHttp { stream: InputStream, len: u64, ) -> wasmtime::Result> { - let s = self + let st = self .streams .get_mut(&stream) .ok_or_else(|| anyhow!("stream not found: {stream}"))?; + if st.closed { + bail!("stream is dropped!"); + } + let s = &mut st.data; if len == 0 { Ok(Ok((0, s.len() > 0))) } else if s.len() > len.try_into()? { @@ -52,7 +61,11 @@ impl crate::streams::Host for WasiHttp { } fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> { - self.streams.remove(&stream); + let st = self + .streams + .get_mut(&stream) + .ok_or_else(|| anyhow!("stream not found: {stream}"))?; + st.closed = true; Ok(()) } @@ -62,14 +75,30 @@ impl crate::streams::Host for WasiHttp { buf: Vec, ) -> wasmtime::Result> { match self.streams.get(&this) { - Some(data) => { + Some(st) => { + if st.closed { + bail!("stream is dropped!"); + } + let data = &st.data; let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len()); new.put(data.clone()); new.put(bytes::Bytes::from(buf.clone())); - self.streams.insert(this, new.freeze()); + self.streams.insert( + this, + Stream { + closed: false, + data: new.freeze(), + }, + ); } None => { - self.streams.insert(this, bytes::Bytes::from(buf.clone())); + self.streams.insert( + this, + Stream { + closed: false, + data: bytes::Bytes::from(buf.clone()), + }, + ); } } Ok(Ok(buf.len().try_into()?)) @@ -111,7 +140,11 @@ impl crate::streams::Host for WasiHttp { } fn drop_output_stream(&mut self, stream: OutputStream) -> wasmtime::Result<()> { - self.streams.remove(&stream); + let st = self + .streams + .get_mut(&stream) + .ok_or_else(|| anyhow!("stream not found: {stream}"))?; + st.closed = true; Ok(()) } } diff --git a/crates/wasi-http/src/struct.rs b/crates/wasi-http/src/struct.rs index 0cb124504888..d259fce299cf 100644 --- a/crates/wasi-http/src/struct.rs +++ b/crates/wasi-http/src/struct.rs @@ -2,6 +2,12 @@ use crate::types::{Method, Scheme}; use bytes::Bytes; use std::collections::HashMap; +#[derive(Clone)] +pub struct Stream { + pub closed: bool, + pub data: Bytes, +} + #[derive(Clone)] pub struct WasiHttp { pub request_id_base: u32, @@ -11,7 +17,7 @@ pub struct WasiHttp { pub requests: HashMap, pub responses: HashMap, pub fields: HashMap>>, - pub streams: HashMap, + pub streams: HashMap, } #[derive(Clone)] @@ -66,6 +72,15 @@ impl ActiveResponse { } } +impl Stream { + pub fn new() -> Self { + Self { + closed: false, + data: Bytes::new(), + } + } +} + impl WasiHttp { pub fn new() -> Self { Self { diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 721dc4cb9e30..3915a1ba3c7f 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -123,7 +123,13 @@ impl crate::types::Host for WasiHttp { bail!("unimplemented: drop_incoming_request") } fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { - self.requests.remove(&request); + match self.requests.get(&request) { + Some(r) => { + self.streams.remove(&r.body); + self.requests.remove(&request); + } + None => { /* pass */ } + } Ok(()) } fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result { @@ -206,7 +212,13 @@ impl crate::types::Host for WasiHttp { bail!("unimplemented: set_response_outparam") } fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { - self.responses.remove(&response); + match self.responses.get(&response) { + Some(r) => { + self.streams.remove(&r.body); + self.responses.remove(&response); + } + None => { /* pass */ } + } Ok(()) } fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> { From a72d8e0978d4179ab41acaddd29cade9914c46a9 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 19 Apr 2023 21:50:47 +0000 Subject: [PATCH 2/5] Address comments, fix tests. --- crates/wasi-http/src/http_impl.rs | 2 +- crates/wasi-http/src/streams_impl.rs | 28 ++++++++-------------------- crates/wasi-http/src/struct.rs | 10 ++++++++-- crates/wasi-http/src/types_impl.rs | 23 +++++++++-------------- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 171b5897192b..4a327f6d4eba 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -183,7 +183,7 @@ impl WasiHttp { let body = Full::::new( self.streams .get(&request.body) - .unwrap_or(&Stream::new()) + .unwrap_or(&Stream::default()) .data .clone(), ); diff --git a/crates/wasi-http/src/streams_impl.rs b/crates/wasi-http/src/streams_impl.rs index 35531882dc8b..fbfef635c424 100644 --- a/crates/wasi-http/src/streams_impl.rs +++ b/crates/wasi-http/src/streams_impl.rs @@ -1,5 +1,4 @@ use crate::poll::Pollable; -use crate::r#struct::Stream; use crate::streams::{InputStream, OutputStream, StreamError}; use crate::WasiHttp; use anyhow::{anyhow, bail}; @@ -74,34 +73,23 @@ impl crate::streams::Host for WasiHttp { this: OutputStream, buf: Vec, ) -> wasmtime::Result> { + let len = buf.len(); match self.streams.get(&this) { Some(st) => { if st.closed { bail!("stream is dropped!"); } - let data = &st.data; - let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len()); - new.put(data.clone()); - new.put(bytes::Bytes::from(buf.clone())); - self.streams.insert( - this, - Stream { - closed: false, - data: new.freeze(), - }, - ); + let new_len = st.data.len() + len; + let mut new = bytes::BytesMut::with_capacity(new_len); + new.put(st.data.clone()); + new.put(bytes::Bytes::from(buf)); + self.streams.insert(this, new.freeze().into()); } None => { - self.streams.insert( - this, - Stream { - closed: false, - data: bytes::Bytes::from(buf.clone()), - }, - ); + self.streams.insert(this, bytes::Bytes::from(buf).into()); } } - Ok(Ok(buf.len().try_into()?)) + Ok(Ok(len.try_into()?)) } fn write_zeroes( diff --git a/crates/wasi-http/src/struct.rs b/crates/wasi-http/src/struct.rs index d259fce299cf..9e38af307549 100644 --- a/crates/wasi-http/src/struct.rs +++ b/crates/wasi-http/src/struct.rs @@ -2,7 +2,7 @@ use crate::types::{Method, Scheme}; use bytes::Bytes; use std::collections::HashMap; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct Stream { pub closed: bool, pub data: Bytes, @@ -74,9 +74,15 @@ impl ActiveResponse { impl Stream { pub fn new() -> Self { + Self::default() + } +} + +impl From for Stream { + fn from(bytes: Bytes) -> Self { Self { closed: false, - data: Bytes::new(), + data: bytes, } } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 3915a1ba3c7f..e9079de7dee3 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -1,5 +1,5 @@ use crate::poll::Pollable; -use crate::r#struct::ActiveRequest; +use crate::r#struct::{ActiveRequest, Stream}; use crate::types::{ Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse, IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, ResponseOutparam, @@ -7,7 +7,7 @@ use crate::types::{ }; use crate::WasiHttp; use anyhow::{anyhow, bail}; -use std::collections::HashMap; +use std::collections::{hash_map::Entry, HashMap}; impl crate::types::Host for WasiHttp { fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> { @@ -123,12 +123,9 @@ impl crate::types::Host for WasiHttp { bail!("unimplemented: drop_incoming_request") } fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { - match self.requests.get(&request) { - Some(r) => { - self.streams.remove(&r.body); - self.requests.remove(&request); - } - None => { /* pass */ } + if let Entry::Occupied(e) = self.requests.entry(request) { + let r = e.remove(); + self.streams.remove(&r.body); } Ok(()) } @@ -198,6 +195,7 @@ impl crate::types::Host for WasiHttp { if req.body == 0 { req.body = self.streams_id_base; self.streams_id_base = self.streams_id_base + 1; + self.streams.insert(req.body, Stream::default()); } Ok(Ok(req.body)) } @@ -212,12 +210,9 @@ impl crate::types::Host for WasiHttp { bail!("unimplemented: set_response_outparam") } fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { - match self.responses.get(&response) { - Some(r) => { - self.streams.remove(&r.body); - self.responses.remove(&response); - } - None => { /* pass */ } + if let Entry::Occupied(e) = self.responses.entry(response) { + let r = e.remove(); + self.streams.remove(&r.body); } Ok(()) } From 14b25d22d65b645f9c708caabf5488c9ae3c5d5d Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 20 Apr 2023 16:57:09 +0000 Subject: [PATCH 3/5] Address comment. --- crates/wasi-http/src/http_impl.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 4a327f6d4eba..45b4cb68a72e 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -223,13 +223,7 @@ impl WasiHttp { } response.body = self.streams_id_base; self.streams_id_base = self.streams_id_base + 1; - self.streams.insert( - response.body, - Stream { - closed: false, - data: buf.freeze(), - }, - ); + self.streams.insert(response.body, buf.freeze().into()); self.responses.insert(response_id, response); Ok(response_id) } From 22a2dd513f6e38b41e5c41c3fe7067a5505bbb2d Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 20 Apr 2023 12:52:59 -0700 Subject: [PATCH 4/5] Update crates/wasi-http/src/streams_impl.rs Co-authored-by: Pat Hickey --- crates/wasi-http/src/streams_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/wasi-http/src/streams_impl.rs b/crates/wasi-http/src/streams_impl.rs index fbfef635c424..796acbd5b217 100644 --- a/crates/wasi-http/src/streams_impl.rs +++ b/crates/wasi-http/src/streams_impl.rs @@ -77,7 +77,7 @@ impl crate::streams::Host for WasiHttp { match self.streams.get(&this) { Some(st) => { if st.closed { - bail!("stream is dropped!"); + bail!("cannot write to closed stream"); } let new_len = st.data.len() + len; let mut new = bytes::BytesMut::with_capacity(new_len); From bf2c3ba24d87f747dec793ace82b1aa4885463f1 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 20 Apr 2023 20:20:18 +0000 Subject: [PATCH 5/5] Switch to BytesMut --- crates/wasi-http/src/http_impl.rs | 3 ++- crates/wasi-http/src/streams_impl.rs | 20 ++++---------------- crates/wasi-http/src/struct.rs | 8 +++++--- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 45b4cb68a72e..5b0e3862d909 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -185,7 +185,8 @@ impl WasiHttp { .get(&request.body) .unwrap_or(&Stream::default()) .data - .clone(), + .clone() + .freeze(), ); let t = timeout(first_bytes_timeout, sender.send_request(call.body(body)?)).await?; let mut res = t?; diff --git a/crates/wasi-http/src/streams_impl.rs b/crates/wasi-http/src/streams_impl.rs index fbfef635c424..9e9d1e59f722 100644 --- a/crates/wasi-http/src/streams_impl.rs +++ b/crates/wasi-http/src/streams_impl.rs @@ -2,7 +2,6 @@ use crate::poll::Pollable; use crate::streams::{InputStream, OutputStream, StreamError}; use crate::WasiHttp; use anyhow::{anyhow, bail}; -use bytes::BufMut; use std::vec::Vec; impl crate::streams::Host for WasiHttp { @@ -74,21 +73,10 @@ impl crate::streams::Host for WasiHttp { buf: Vec, ) -> wasmtime::Result> { let len = buf.len(); - match self.streams.get(&this) { - Some(st) => { - if st.closed { - bail!("stream is dropped!"); - } - let new_len = st.data.len() + len; - let mut new = bytes::BytesMut::with_capacity(new_len); - new.put(st.data.clone()); - new.put(bytes::Bytes::from(buf)); - self.streams.insert(this, new.freeze().into()); - } - None => { - self.streams.insert(this, bytes::Bytes::from(buf).into()); - } - } + self.streams.entry(this) + .or_default() + .data + .extend_from_slice(buf.as_slice()); Ok(Ok(len.try_into()?)) } diff --git a/crates/wasi-http/src/struct.rs b/crates/wasi-http/src/struct.rs index 9e38af307549..574be6c8e5fa 100644 --- a/crates/wasi-http/src/struct.rs +++ b/crates/wasi-http/src/struct.rs @@ -1,11 +1,11 @@ use crate::types::{Method, Scheme}; -use bytes::Bytes; +use bytes::{BufMut, Bytes, BytesMut}; use std::collections::HashMap; #[derive(Clone, Default)] pub struct Stream { pub closed: bool, - pub data: Bytes, + pub data: BytesMut, } #[derive(Clone)] @@ -80,9 +80,11 @@ impl Stream { impl From for Stream { fn from(bytes: Bytes) -> Self { + let mut buf = BytesMut::with_capacity(bytes.len()); + buf.put(bytes); Self { closed: false, - data: bytes, + data: buf, } } }