Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ target/
tmp/
Cargo.lock
.DS_Store
*.swp
*.swo
7 changes: 2 additions & 5 deletions src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a> Client<'a> {
let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();

// 2. Start sending the request body
io::copy(body, OutputStream::new(&self.reactor, body_stream))
io::copy(body, OutputStream::new(self.reactor, body_stream))
.await
.expect("io::copy broke oh no");

Expand All @@ -41,10 +41,7 @@ impl<'a> Client<'a> {
// is to trap if we try and get the response more than once. The final
// `?` is to raise the actual error if there is one.
let res = res.get().unwrap().unwrap()?;
Ok(Response::try_from_incoming_response(
res,
self.reactor.clone(),
)?)
Response::try_from_incoming_response(res, self.reactor.clone())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/http/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ impl TryFrom<Fields> for WasiFields {
list.push((name.clone().into_owned(), value));
}
}
Ok(WasiFields::from_list(&list)?)
WasiFields::from_list(&list)
}
}
52 changes: 13 additions & 39 deletions src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ use super::{Body, Headers, StatusCode};
use crate::io::AsyncRead;
use crate::runtime::Reactor;

/// Stream 2kb chunks at a time
const CHUNK_SIZE: u64 = 2048;

/// An HTTP response
#[derive(Debug)]
pub struct Response<B: Body> {
Expand Down Expand Up @@ -62,8 +59,6 @@ impl Response<IncomingBody> {
.expect("cannot call `stream` twice on an incoming body");

let body = IncomingBody {
buf_offset: 0,
buf: None,
reactor,
body_stream,
_incoming_body: incoming_body,
Expand Down Expand Up @@ -102,9 +97,6 @@ impl<B: Body> Response<B> {
#[derive(Debug)]
pub struct IncomingBody {
reactor: Reactor,
buf: Option<Vec<u8>>,
// How many bytes have we already read from the buf?
buf_offset: usize,

// IMPORTANT: the order of these fields here matters. `incoming_body` must
// be dropped before `body_stream`.
Expand All @@ -113,38 +105,20 @@ pub struct IncomingBody {
}

impl AsyncRead for IncomingBody {
async fn read(&mut self, out_buf: &mut [u8]) -> crate::io::Result<usize> {
let buf = match &mut self.buf {
Some(ref mut buf) => buf,
None => {
// Wait for an event to be ready
let pollable = self.body_stream.subscribe();
self.reactor.wait_for(pollable).await;

// Read the bytes from the body stream
let buf = self.body_stream.read(CHUNK_SIZE).map_err(|err| match err {
StreamError::LastOperationFailed(err) => {
std::io::Error::other(format!("{}", err.to_debug_string()))
}
StreamError::Closed => std::io::Error::other("Connection closed"),
})?;
self.buf.insert(buf)
async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result<usize> {
// Wait for an event to be ready
self.reactor.wait_for(self.body_stream.subscribe()).await;

// Read the bytes from the body stream
let slice = match self.body_stream.read(buf.len() as u64) {
Ok(slice) => slice,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(err.to_debug_string()));
}
};

// copy bytes
let len = (buf.len() - self.buf_offset).min(out_buf.len());
let max = self.buf_offset + len;
let slice = &buf[self.buf_offset..max];
out_buf[0..len].copy_from_slice(slice);
self.buf_offset += len;

// reset the local slice if necessary
if self.buf_offset == buf.len() {
self.buf = None;
self.buf_offset = 0;
}

Ok(len)
let bytes_read = slice.len();
buf[..bytes_read].copy_from_slice(&slice);
Ok(bytes_read)
}
}
21 changes: 21 additions & 0 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
use crate::io;

const CHUNK_SIZE: usize = 2048;

/// Read bytes from a source.
pub trait AsyncRead {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
// total bytes written to buf
let mut n = 0;

loop {
// grow buf, if less than default chuck size
if buf.len() < n + CHUNK_SIZE {
buf.resize(n + CHUNK_SIZE, 0u8);
}

let len = self.read(&mut buf[n..]).await?;
if len == 0 {
buf.truncate(n);
return Ok(n);
}

n += len;
}
}
}
20 changes: 16 additions & 4 deletions src/net/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,31 @@ impl<'a> TcpStream<'a> {
impl<'a> AsyncRead for TcpStream<'a> {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reactor.wait_for(self.input.subscribe()).await;
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
let slice = match self.input.read(buf.len() as u64) {
Ok(slice) => slice,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(Error::other(err.to_debug_string()));
}
};
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
buf[..bytes_read].copy_from_slice(&slice);
Ok(bytes_read)
}
}

impl<'a> AsyncRead for &TcpStream<'a> {
async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.reactor.wait_for(self.input.subscribe()).await;
let slice = self.input.read(buf.len() as u64).map_err(to_io_err)?;
let slice = match self.input.read(buf.len() as u64) {
Ok(slice) => slice,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(Error::other(err.to_debug_string()));
}
};
let bytes_read = slice.len();
buf[..bytes_read].clone_from_slice(&slice);
buf[..bytes_read].copy_from_slice(&slice);
Ok(bytes_read)
}
}
Expand Down