From cbaf050ba5f5be8728e39c8bace69fc4cfb61412 Mon Sep 17 00:00:00 2001 From: Calvin Prewitt Date: Fri, 20 Sep 2024 12:08:36 -0500 Subject: [PATCH 1/2] added `read_to_end` to `AsyncRead` trait --- .gitignore | 2 ++ src/http/client.rs | 7 ++----- src/http/fields.rs | 2 +- src/http/response.rs | 11 ++++++----- src/io/read.rs | 21 +++++++++++++++++++++ src/net/tcp_stream.rs | 20 ++++++++++++++++---- 6 files changed, 48 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index b75a144..c3bd750 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ target/ tmp/ Cargo.lock .DS_Store +*.swp +*.swo diff --git a/src/http/client.rs b/src/http/client.rs index 2afeba8..8f2510d 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -27,7 +27,7 @@ impl<'a> Client<'a> { let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap(); // 2. Start sending the request body - io::copy(body, OutputStream::new(&self.reactor, body_stream)) + io::copy(body, OutputStream::new(self.reactor, body_stream)) .await .expect("io::copy broke oh no"); @@ -41,10 +41,7 @@ impl<'a> Client<'a> { // is to trap if we try and get the response more than once. The final // `?` is to raise the actual error if there is one. let res = res.get().unwrap().unwrap()?; - Ok(Response::try_from_incoming_response( - res, - self.reactor.clone(), - )?) + Response::try_from_incoming_response(res, self.reactor.clone()) } } diff --git a/src/http/fields.rs b/src/http/fields.rs index 71c1dc9..24f6c53 100644 --- a/src/http/fields.rs +++ b/src/http/fields.rs @@ -70,6 +70,6 @@ impl TryFrom for WasiFields { list.push((name.clone().into_owned(), value)); } } - Ok(WasiFields::from_list(&list)?) + WasiFields::from_list(&list) } } diff --git a/src/http/response.rs b/src/http/response.rs index 68b5d3e..62b80d5 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -122,12 +122,13 @@ impl AsyncRead for IncomingBody { self.reactor.wait_for(pollable).await; // Read the bytes from the body stream - let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err { - StreamError::LastOperationFailed(err) => { - std::io::Error::other(format!("{}", err.to_debug_string())) + let buf = match self.body_stream.read(CHUNK_SIZE) { + Ok(buf) => buf, + Err(StreamError::Closed) => return Ok(0), + Err(StreamError::LastOperationFailed(err)) => { + return Err(std::io::Error::other(err.to_debug_string())); } - StreamError::Closed => std::io::Error::other("Connection closed"), - })?; + }; self.buf.insert(buf) } }; diff --git a/src/io/read.rs b/src/io/read.rs index be54fcb..e8c3294 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,6 +1,27 @@ use crate::io; +const CHUNK_SIZE: usize = 2048; + /// Read bytes from a source. pub trait AsyncRead { async fn read(&mut self, buf: &mut [u8]) -> io::Result; + async fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + // total bytes written to buf + let mut n = 0; + + loop { + // grow buf, if less than default chuck size + if buf.len() < n + CHUNK_SIZE { + buf.resize(n + CHUNK_SIZE, 0u8); + } + + let len = self.read(&mut buf[n..]).await?; + if len == 0 { + buf.truncate(n); + return Ok(n); + } + + n += len; + } + } } diff --git a/src/net/tcp_stream.rs b/src/net/tcp_stream.rs index 912f409..414e084 100644 --- a/src/net/tcp_stream.rs +++ b/src/net/tcp_stream.rs @@ -32,9 +32,15 @@ impl<'a> TcpStream<'a> { impl<'a> AsyncRead for TcpStream<'a> { async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.reactor.wait_for(self.input.subscribe()).await; - let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?; + let slice = match self.input.read(buf.len() as u64) { + Ok(slice) => slice, + Err(StreamError::Closed) => return Ok(0), + Err(StreamError::LastOperationFailed(err)) => { + return Err(Error::other(err.to_debug_string())); + } + }; let bytes_read = slice.len(); - buf[..bytes_read].clone_from_slice(&slice); + buf[..bytes_read].copy_from_slice(&slice); Ok(bytes_read) } } @@ -42,9 +48,15 @@ impl<'a> AsyncRead for TcpStream<'a> { impl<'a> AsyncRead for &TcpStream<'a> { async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.reactor.wait_for(self.input.subscribe()).await; - let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?; + let slice = match self.input.read(buf.len() as u64) { + Ok(slice) => slice, + Err(StreamError::Closed) => return Ok(0), + Err(StreamError::LastOperationFailed(err)) => { + return Err(Error::other(err.to_debug_string())); + } + }; let bytes_read = slice.len(); - buf[..bytes_read].clone_from_slice(&slice); + buf[..bytes_read].copy_from_slice(&slice); Ok(bytes_read) } } From 00909fe5e01f553896cf7e7f69a2504354728375 Mon Sep 17 00:00:00 2001 From: Calvin Prewitt Date: Fri, 20 Sep 2024 12:17:08 -0500 Subject: [PATCH 2/2] removed http incoming body buffer --- src/http/response.rs | 53 +++++++++++--------------------------------- 1 file changed, 13 insertions(+), 40 deletions(-) diff --git a/src/http/response.rs b/src/http/response.rs index 62b80d5..901d8ff 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -5,9 +5,6 @@ use super::{Body, Headers, StatusCode}; use crate::io::AsyncRead; use crate::runtime::Reactor; -/// Stream 2kb chunks at a time -const CHUNK_SIZE: u64 = 2048; - /// An HTTP response #[derive(Debug)] pub struct Response { @@ -62,8 +59,6 @@ impl Response { .expect("cannot call `stream` twice on an incoming body"); let body = IncomingBody { - buf_offset: 0, - buf: None, reactor, body_stream, _incoming_body: incoming_body, @@ -102,9 +97,6 @@ impl Response { #[derive(Debug)] pub struct IncomingBody { reactor: Reactor, - buf: Option>, - // How many bytes have we already read from the buf? - buf_offset: usize, // IMPORTANT: the order of these fields here matters. `incoming_body` must // be dropped before `body_stream`. @@ -113,39 +105,20 @@ pub struct IncomingBody { } impl AsyncRead for IncomingBody { - async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result { - let buf = match &mut self.buf { - Some(ref mut buf) => buf, - None => { - // Wait for an event to be ready - let pollable = self.body_stream.subscribe(); - self.reactor.wait_for(pollable).await; - - // Read the bytes from the body stream - let buf = match self.body_stream.read(CHUNK_SIZE) { - Ok(buf) => buf, - Err(StreamError::Closed) => return Ok(0), - Err(StreamError::LastOperationFailed(err)) => { - return Err(std::io::Error::other(err.to_debug_string())); - } - }; - self.buf.insert(buf) + async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result { + // Wait for an event to be ready + self.reactor.wait_for(self.body_stream.subscribe()).await; + + // Read the bytes from the body stream + let slice = match self.body_stream.read(buf.len() as u64) { + Ok(slice) => slice, + Err(StreamError::Closed) => return Ok(0), + Err(StreamError::LastOperationFailed(err)) => { + return Err(std::io::Error::other(err.to_debug_string())); } }; - - // copy bytes - let len = (buf.len() - self.buf_offset).min(out_buf.len()); - let max = self.buf_offset + len; - let slice = &buf[self.buf_offset..max]; - out_buf[0..len].copy_from_slice(slice); - self.buf_offset += len; - - // reset the local slice if necessary - if self.buf_offset == buf.len() { - self.buf = None; - self.buf_offset = 0; - } - - Ok(len) + let bytes_read = slice.len(); + buf[..bytes_read].copy_from_slice(&slice); + Ok(bytes_read) } }