From 7977adbae6629a31f5f422f2afa2cc0d5b400f3e Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Thu, 25 Feb 2021 17:38:31 +0100 Subject: [PATCH 1/2] ARROW-11784: [Rust][DataFusion] CoalesceBatchesStream doesn't honor Stream interface --- .../datafusion/src/physical_plan/coalesce_batches.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 9f36fd8f794..1dd3c93f7fc 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -111,6 +111,7 @@ impl ExecutionPlan for CoalesceBatchesExec { target_batch_size: self.target_batch_size, buffer: Vec::new(), buffered_rows: 0, + is_closed: false, })) } } @@ -126,6 +127,8 @@ struct CoalesceBatchesStream { buffer: Vec, /// Buffered row count buffered_rows: usize, + /// Whether the stream has finished returning all of its data or not + is_closed: bool } impl Stream for CoalesceBatchesStream { @@ -135,6 +138,9 @@ impl Stream for CoalesceBatchesStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.is_closed { + return Poll::Ready(None); + } loop { let input_batch = self.input.poll_next_unpin(cx); match input_batch { @@ -167,6 +173,7 @@ impl Stream for CoalesceBatchesStream { } } None => { + self.is_closed = true; // we have reached the end of the input stream but there could still // be buffered batches if self.buffer.is_empty() { @@ -234,7 +241,7 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec}; use arrow::array::UInt32Array; use arrow::datatypes::{DataType, Field, Schema}; @@ -244,7 +251,7 @@ mod tests { let partition = create_vec_batches(&schema, 10); let partitions = vec![partition]; - let output_partitions = coalesce_batches(&schema, partitions, 20).await?; + let output_partitions = coalesce_batches(&schema, partitions, 21).await?; assert_eq!(1, output_partitions.len()); // input is 10 batches x 8 rows (80 rows) @@ -287,6 +294,7 @@ mod tests { ) -> Result>> { // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec = RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?; let exec: Arc = Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size)); From 9d5243f4eeac53fd93feb166863ba50873071da0 Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Thu, 25 Feb 2021 22:30:02 +0100 Subject: [PATCH 2/2] fmt --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 1dd3c93f7fc..b91e0b672eb 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -128,7 +128,7 @@ struct CoalesceBatchesStream { /// Buffered row count buffered_rows: usize, /// Whether the stream has finished returning all of its data or not - is_closed: bool + is_closed: bool, } impl Stream for CoalesceBatchesStream { @@ -294,7 +294,8 @@ mod tests { ) -> Result>> { // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; - let exec = RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?; + let exec = + RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?; let exec: Arc = Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size));