From eb1d0cc2d36b498da200f25e6141fb6d1eb0cd90 Mon Sep 17 00:00:00 2001 From: eta Date: Thu, 12 May 2022 18:32:05 +0100 Subject: [PATCH 1/3] bufread::generic::Decoder: don't reinitialize on reader EOF If `multiple_members` is enabled, the `bufread::generic::Decoder` will attempt to reinitialise the decoder inside `State::Flushing`, even if the reason it entered that state was due to the reader returning an EOF. This will result in an attempt to read past EOF, which is highly undesirable to say the least. To fix this, force `multiple_members` to `false` when we get an EOF condition from the reader. --- src/futures/bufread/generic/decoder.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/futures/bufread/generic/decoder.rs b/src/futures/bufread/generic/decoder.rs index ff39b163..d19daae5 100644 --- a/src/futures/bufread/generic/decoder.rs +++ b/src/futures/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); From 701a3a34511a2ea5916b95de3b99d89bdac7ad0f Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 13 May 2022 10:57:54 +0200 Subject: [PATCH 2/3] Add assertion that we do not read past EOF --- tests/utils/impls.rs | 18 ++-- tests/utils/mod.rs | 3 +- tests/utils/track_eof.rs | 184 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 196 insertions(+), 9 deletions(-) create mode 100644 tests/utils/track_eof.rs diff --git a/tests/utils/impls.rs b/tests/utils/impls.rs index a1b1ac8c..18cea2eb 100644 --- a/tests/utils/impls.rs +++ b/tests/utils/impls.rs @@ -13,13 +13,13 @@ pub mod futures { pub mod bufread { pub use futures::io::AsyncBufRead; - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use futures::stream::{StreamExt as _, TryStreamExt as _}; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - input.stream().map(Ok).into_async_read() + TrackEof::new(input.stream().map(Ok).into_async_read()) } } @@ -100,13 +100,13 @@ pub mod tokio_02 { pub mod bufread { pub use tokio_02::io::AsyncBufRead; - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use tokio_02::io::stream_reader; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - stream_reader(input.bytes_05_stream()) + TrackEof::new(stream_reader(input.bytes_05_stream())) } } @@ -169,14 +169,14 @@ pub mod tokio_02 { #[cfg(feature = "tokio-03")] pub mod tokio_03 { pub mod bufread { - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; pub use tokio_03::io::AsyncBufRead; use tokio_util_04::io::StreamReader; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - StreamReader::new(input.bytes_05_stream()) + TrackEof::new(StreamReader::new(input.bytes_05_stream())) } } @@ -239,7 +239,7 @@ pub mod tokio_03 { #[cfg(feature = "tokio")] pub mod tokio { pub mod bufread { - use crate::utils::InputStream; + use crate::utils::{InputStream, TrackEof}; use bytes::Bytes; use futures::stream::StreamExt; pub use tokio::io::AsyncBufRead; @@ -248,7 +248,9 @@ pub mod tokio { pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. - StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok)) + TrackEof::new(StreamReader::new( + input.stream().map(Bytes::from).map(std::io::Result::Ok), + )) } } diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index a83a1506..0f6de461 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -8,13 +8,14 @@ mod tokio_03_ext; #[cfg(feature = "tokio")] mod tokio_ext; mod track_closed; +mod track_eof; #[macro_use] mod test_cases; pub mod algos; pub mod impls; -pub use self::{input_stream::InputStream, track_closed::TrackClosed}; +pub use self::{input_stream::InputStream, track_closed::TrackClosed, track_eof::TrackEof}; pub use async_compression::Level; pub use futures::{executor::block_on, pin_mut, stream::Stream}; pub use std::{future::Future, io::Result, iter::FromIterator, pin::Pin}; diff --git a/tests/utils/track_eof.rs b/tests/utils/track_eof.rs new file mode 100644 index 00000000..d2101d13 --- /dev/null +++ b/tests/utils/track_eof.rs @@ -0,0 +1,184 @@ +#[cfg_attr(not(feature = "all-implementations"), allow(unused))] +use std::{ + io::Result, + pin::Pin, + task::{Context, Poll}, +}; + +pub struct TrackEof { + inner: R, + eof: bool, +} + +impl TrackEof { + pub fn new(inner: R) -> Self { + Self { inner, eof: false } + } + + pub fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, &mut bool) { + let Self { inner, eof } = Pin::into_inner(self); + (Pin::new(inner), eof) + } +} + +#[cfg(feature = "futures-io")] +impl futures::io::AsyncRead for TrackEof { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(0)) => { + if !buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(0)) + } + other => other, + } + } +} + +#[cfg(feature = "futures-io")] +impl futures::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio-02")] +impl tokio_02::io::AsyncRead for TrackEof { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(0)) => { + if !buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(0)) + } + other => other, + } + } +} + +#[cfg(feature = "tokio-02")] +impl tokio_02::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio-03")] +impl tokio_03::io::AsyncRead for TrackEof { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio_03::io::ReadBuf, + ) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + let len = buf.filled().len(); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + if buf.filled().len() == len && buf.remaining() > 0 { + *eof = true; + } + Poll::Ready(Ok(())) + } + other => other, + } + } +} + +#[cfg(feature = "tokio-03")] +impl tokio_03::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncRead for TrackEof { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + let len = buf.filled().len(); + match inner.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + if buf.filled().len() == len && buf.remaining() > 0 { + *eof = true; + } + Poll::Ready(Ok(())) + } + other => other, + } + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncBufRead for TrackEof { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (inner, eof) = self.project(); + assert!(!*eof); + match inner.poll_fill_buf(cx) { + Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + *eof = true; + } + Poll::Ready(Ok(buf)) + } + other => other, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().0.consume(amt) + } +} From e7246738762de34bbb820662c3923f8f2610ac95 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Sun, 15 May 2022 14:40:17 +0200 Subject: [PATCH 3/3] Don't reinitialize on reader EOF for other implementations --- src/tokio/bufread/generic/decoder.rs | 3 +++ src/tokio_02/bufread/generic/decoder.rs | 3 +++ src/tokio_03/bufread/generic/decoder.rs | 3 +++ 3 files changed, 9 insertions(+) diff --git a/src/tokio/bufread/generic/decoder.rs b/src/tokio/bufread/generic/decoder.rs index e9ef44e8..2f4d8c7f 100644 --- a/src/tokio/bufread/generic/decoder.rs +++ b/src/tokio/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/src/tokio_02/bufread/generic/decoder.rs b/src/tokio_02/bufread/generic/decoder.rs index 7e876770..38dc068d 100644 --- a/src/tokio_02/bufread/generic/decoder.rs +++ b/src/tokio_02/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input); diff --git a/src/tokio_03/bufread/generic/decoder.rs b/src/tokio_03/bufread/generic/decoder.rs index b643c636..796218c9 100644 --- a/src/tokio_03/bufread/generic/decoder.rs +++ b/src/tokio_03/bufread/generic/decoder.rs @@ -70,6 +70,9 @@ impl Decoder { State::Decoding => { let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; State::Flushing } else { let mut input = PartialBuffer::new(input);