diff --git a/src/futures/bufread/generic/decoder.rs b/src/futures/bufread/generic/decoder.rs index ff39b163..d19daae5 100644 --- a/src/futures/bufread/generic/decoder.rs +++ b/src/futures/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/src/tokio/bufread/generic/decoder.rs b/src/tokio/bufread/generic/decoder.rs index e9ef44e8..2f4d8c7f 100644 --- a/src/tokio/bufread/generic/decoder.rs +++ b/src/tokio/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/src/tokio_02/bufread/generic/decoder.rs b/src/tokio_02/bufread/generic/decoder.rs index 7e876770..38dc068d 100644 --- a/src/tokio_02/bufread/generic/decoder.rs +++ b/src/tokio_02/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/src/tokio_03/bufread/generic/decoder.rs b/src/tokio_03/bufread/generic/decoder.rs index b643c636..796218c9 100644 --- a/src/tokio_03/bufread/generic/decoder.rs +++ b/src/tokio_03/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/tests/utils/impls.rs b/tests/utils/impls.rs index a1b1ac8c..18cea2eb 100644 --- a/tests/utils/impls.rs +++ b/tests/utils/impls.rs @@ -13,13 +13,13 @@ pub mod futures { pub mod bufread { pub use futures::io::AsyncBufRead; - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use futures::stream::{StreamExt as _, TryStreamExt as _}; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - input.stream().map(Ok).into_async_read() + TrackEof::new(input.stream().map(Ok).into_async_read()) } } @@ -100,13 +100,13 @@ pub mod tokio_02 { pub mod bufread { pub use tokio_02::io::AsyncBufRead; - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use tokio_02::io::stream_reader; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - stream_reader(input.bytes_05_stream()) + TrackEof::new(stream_reader(input.bytes_05_stream())) } } @@ -169,14 +169,14 @@ pub mod tokio_02 { #[cfg(feature = "tokio-03")] pub mod tokio_03 { pub mod bufread { - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; pub use tokio_03::io::AsyncBufRead; use tokio_util_04::io::StreamReader; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - StreamReader::new(input.bytes_05_stream()) + TrackEof::new(StreamReader::new(input.bytes_05_stream())) } } @@ -239,7 +239,7 @@ pub mod tokio_03 { #[cfg(feature = "tokio")] pub mod tokio { pub mod bufread { - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use bytes::Bytes; use futures::stream::StreamExt; pub use tokio::io::AsyncBufRead; @@ -248,7 +248,9 @@ pub mod tokio { pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok)) + TrackEof::new(StreamReader::new( + input.stream().map(Bytes::from).map(std::io::Result::Ok), + )) } } diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index a83a1506..0f6de461 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -8,13 +8,14 @@ mod tokio_03_ext; #[cfg(feature = "tokio")] mod tokio_ext; mod track_closed; +mod track_eof; #[macro_use] mod test_cases; pub mod algos; pub mod impls; -pub use self::{input_stream::InputStream, track_closed::TrackClosed}; +pub use self::{input_stream::InputStream, track_closed::TrackClosed, track_eof::TrackEof}; pub use async_compression::Level; pub use futures::{executor::block_on, pin_mut, stream::Stream}; pub use std::{future::Future, io::Result, iter::FromIterator, pin::Pin}; diff --git a/tests/utils/track_eof.rs b/tests/utils/track_eof.rs new file mode 100644 index 00000000..d2101d13 --- /dev/null +++ b/tests/utils/track_eof.rs @@ -0,0 +1,184 @@ +#[cfg_attr(not(feature = "all-implementations"), allow(unused))] +use std::{ + io::Result, + pin::Pin, + task::{Context, Poll}, +}; + +pub struct TrackEof { + inner: R, + eof: bool, +} + +impl TrackEof { + pub fn new(inner: R) -> Self { + Self { inner, eof: false } + } + + pub fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, &mut bool) { + let Self { inner, eof } = Pin::into_inner(self); + (Pin::new(inner), eof) + } +} + +#[cfg(feature = "futures-io")] +impl futures::io::AsyncRead for TrackEof { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(0)) => { + if !buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(0)) + } + other => other, + } + } +} + +#[cfg(feature = "futures-io")] +impl futures::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio-02")] +impl tokio_02::io::AsyncRead for TrackEof { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(0)) => { + if !buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(0)) + } + other => other, + } + } +} + +#[cfg(feature = "tokio-02")] +impl tokio_02::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio-03")] +impl tokio_03::io::AsyncRead for TrackEof { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio_03::io::ReadBuf, + ) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + let len = buf.filled().len(); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + if buf.filled().len() == len && buf.remaining() > 0 { + *eof = true; + } + Poll::Ready(Ok(())) + } + other => other, + } + } +} + +#[cfg(feature = "tokio-03")] +impl tokio_03::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncRead for TrackEof { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + let len = buf.filled().len(); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + if buf.filled().len() == len && buf.remaining() > 0 { + *eof = true; + } + Poll::Ready(Ok(())) + } + other => other, + } + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +}