Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/futures/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
State::Decoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
*this.multiple_members = false;
Comment thread
Nemo157 marked this conversation as resolved.
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
Expand Down
3 changes: 3 additions & 0 deletions src/tokio/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
State::Decoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
*this.multiple_members = false;
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
Expand Down
3 changes: 3 additions & 0 deletions src/tokio_02/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
State::Decoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
*this.multiple_members = false;
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
Expand Down
3 changes: 3 additions & 0 deletions src/tokio_03/bufread/generic/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
State::Decoding => {
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if input.is_empty() {
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
*this.multiple_members = false;
State::Flushing
} else {
let mut input = PartialBuffer::new(input);
Expand Down
18 changes: 10 additions & 8 deletions tests/utils/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ pub mod futures {
pub mod bufread {
pub use futures::io::AsyncBufRead;

use crate::utils::InputStream;
use crate::utils::{InputStream, TrackEof};
use futures::stream::{StreamExt as _, TryStreamExt as _};

pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
input.stream().map(Ok).into_async_read()
TrackEof::new(input.stream().map(Ok).into_async_read())
}
}

Expand Down Expand Up @@ -100,13 +100,13 @@ pub mod tokio_02 {
pub mod bufread {
pub use tokio_02::io::AsyncBufRead;

use crate::utils::InputStream;
use crate::utils::{InputStream, TrackEof};
use tokio_02::io::stream_reader;

pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
stream_reader(input.bytes_05_stream())
TrackEof::new(stream_reader(input.bytes_05_stream()))
}
}

Expand Down Expand Up @@ -169,14 +169,14 @@ pub mod tokio_02 {
#[cfg(feature = "tokio-03")]
pub mod tokio_03 {
pub mod bufread {
use crate::utils::InputStream;
use crate::utils::{InputStream, TrackEof};
pub use tokio_03::io::AsyncBufRead;
use tokio_util_04::io::StreamReader;

pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
StreamReader::new(input.bytes_05_stream())
TrackEof::new(StreamReader::new(input.bytes_05_stream()))
}
}

Expand Down Expand Up @@ -239,7 +239,7 @@ pub mod tokio_03 {
#[cfg(feature = "tokio")]
pub mod tokio {
pub mod bufread {
use crate::utils::InputStream;
use crate::utils::{InputStream, TrackEof};
use bytes::Bytes;
use futures::stream::StreamExt;
pub use tokio::io::AsyncBufRead;
Expand All @@ -248,7 +248,9 @@ pub mod tokio {
pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok))
TrackEof::new(StreamReader::new(
input.stream().map(Bytes::from).map(std::io::Result::Ok),
))
}
}

Expand Down
3 changes: 2 additions & 1 deletion tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ mod tokio_03_ext;
#[cfg(feature = "tokio")]
mod tokio_ext;
mod track_closed;
mod track_eof;
#[macro_use]
mod test_cases;

pub mod algos;
pub mod impls;

pub use self::{input_stream::InputStream, track_closed::TrackClosed};
pub use self::{input_stream::InputStream, track_closed::TrackClosed, track_eof::TrackEof};
pub use async_compression::Level;
pub use futures::{executor::block_on, pin_mut, stream::Stream};
pub use std::{future::Future, io::Result, iter::FromIterator, pin::Pin};
Expand Down
184 changes: 184 additions & 0 deletions tests/utils/track_eof.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#[cfg_attr(not(feature = "all-implementations"), allow(unused))]
use std::{
io::Result,
pin::Pin,
task::{Context, Poll},
};

pub struct TrackEof<R> {
inner: R,
eof: bool,
}

impl<R: Unpin> TrackEof<R> {
pub fn new(inner: R) -> Self {
Self { inner, eof: false }
}

pub fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, &mut bool) {
let Self { inner, eof } = Pin::into_inner(self);
(Pin::new(inner), eof)
}
}

#[cfg(feature = "futures-io")]
impl<R: futures::io::AsyncRead + Unpin> futures::io::AsyncRead for TrackEof<R> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_read(cx, buf) {
Poll::Ready(Ok(0)) => {
if !buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(0))
}
other => other,
}
}
}

#[cfg(feature = "futures-io")]
impl<R: futures::io::AsyncBufRead + Unpin> futures::io::AsyncBufRead for TrackEof<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => {
if buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(buf))
}
other => other,
}
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().0.consume(amt)
}
}

#[cfg(feature = "tokio-02")]
impl<R: tokio_02::io::AsyncRead + Unpin> tokio_02::io::AsyncRead for TrackEof<R> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_read(cx, buf) {
Poll::Ready(Ok(0)) => {
if !buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(0))
}
other => other,
}
}
}

#[cfg(feature = "tokio-02")]
impl<R: tokio_02::io::AsyncBufRead + Unpin> tokio_02::io::AsyncBufRead for TrackEof<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => {
if buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(buf))
}
other => other,
}
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().0.consume(amt)
}
}

#[cfg(feature = "tokio-03")]
impl<R: tokio_03::io::AsyncRead + Unpin> tokio_03::io::AsyncRead for TrackEof<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut tokio_03::io::ReadBuf,
) -> Poll<Result<()>> {
let (inner, eof) = self.project();
assert!(!*eof);
let len = buf.filled().len();
match inner.poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
if buf.filled().len() == len && buf.remaining() > 0 {
*eof = true;
}
Poll::Ready(Ok(()))
}
other => other,
}
}
}

#[cfg(feature = "tokio-03")]
impl<R: tokio_03::io::AsyncBufRead + Unpin> tokio_03::io::AsyncBufRead for TrackEof<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => {
if buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(buf))
}
other => other,
}
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().0.consume(amt)
}
}

#[cfg(feature = "tokio")]
impl<R: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for TrackEof<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut tokio::io::ReadBuf,
) -> Poll<Result<()>> {
let (inner, eof) = self.project();
assert!(!*eof);
let len = buf.filled().len();
match inner.poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
if buf.filled().len() == len && buf.remaining() > 0 {
*eof = true;
}
Poll::Ready(Ok(()))
}
other => other,
}
}
}

#[cfg(feature = "tokio")]
impl<R: tokio::io::AsyncBufRead + Unpin> tokio::io::AsyncBufRead for TrackEof<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8]>> {
let (inner, eof) = self.project();
assert!(!*eof);
match inner.poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => {
if buf.is_empty() {
*eof = true;
}
Poll::Ready(Ok(buf))
}
other => other,
}
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().0.consume(amt)
}
}