From 4b96a1cf04e1511bb40eebeeb2abfa0495ee82c8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 10:05:48 -0700 Subject: [PATCH 01/10] Add new coalesce batches operator --- .../src/physical_plan/coalesce_batches.rs | 135 ++++++++++++++++++ rust/datafusion/src/physical_plan/mod.rs | 1 + 2 files changed, 136 insertions(+) create mode 100644 rust/datafusion/src/physical_plan/coalesce_batches.rs diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs new file mode 100644 index 00000000000..50d73e3b6fb --- /dev/null +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of +//! vectorized processing by upstream operators. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; + +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::stream::{Stream, StreamExt}; + +#[derive(Debug)] +struct CoalesceBatchesExec { + /// The input plan + input: Arc, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, +} + +impl CoalesceBatchesExec { + fn new(input: Arc, target_batch_size: usize) -> Self { + Self { + input, + target_batch_size, + } + } +} + +#[async_trait] +impl ExecutionPlan for CoalesceBatchesExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + // The coalesce batches operator does not make any changes to the schema of its input + self.input.schema() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + // The coalesce batches operator does not make any changes to the partitioning of its input + self.input.output_partitioning() + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(CoalesceBatchesExec::new( + children[0].clone(), + self.target_batch_size, + ))), + _ => Err(DataFusionError::Internal( + "CoalesceBatchesExec wrong number of children".to_string(), + )), + } + } + + async fn execute(&self, partition: usize) -> Result { + Ok(Box::pin(CoalesceBatchesStream { + input: self.input.execute(partition).await?, + schema: self.input.schema().clone(), + target_batch_size: self.target_batch_size.clone(), + })) + } +} + +struct CoalesceBatchesStream { + /// The input plan + input: SendableRecordBatchStream, + /// The input schema + schema: SchemaRef, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, +} + +impl Stream for CoalesceBatchesStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + unimplemented!() + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + // same number of record batches + self.input.size_hint() + } +} + +impl RecordBatchStream for CoalesceBatchesStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index d0dcede78d0..9cc0e3054e0 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -248,6 +248,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; +pub mod coalesce_batches; pub mod common; pub mod csv; pub mod datetime_expressions; From b32ec1d525e17fef5940b839f18d89a39864cfe4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 10:23:01 -0700 Subject: [PATCH 02/10] coalesce batches operator code complete --- .../src/physical_plan/coalesce_batches.rs | 64 +++++++++++++++++-- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 50d73e3b6fb..6779cf2c346 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -28,6 +28,7 @@ use crate::physical_plan::{ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; +use arrow::array::{make_array, MutableArrayData}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -94,6 +95,8 @@ impl ExecutionPlan for CoalesceBatchesExec { input: self.input.execute(partition).await?, schema: self.input.schema().clone(), target_batch_size: self.target_batch_size.clone(), + buffer: Vec::new(), + buffered_rows: 0, })) } } @@ -105,6 +108,10 @@ struct CoalesceBatchesStream { schema: SchemaRef, /// Minimum number of rows for coalesces batches target_batch_size: usize, + /// Buffered batches + buffer: Vec, + /// Buffered row count + buffered_rows: usize, } impl Stream for CoalesceBatchesStream { @@ -114,15 +121,62 @@ impl Stream for CoalesceBatchesStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.input.poll_next_unpin(cx).map(|x| match x { - Some(Ok(batch)) => { - unimplemented!() + loop { + let input_batch = self.input.poll_next_unpin(cx); + match input_batch { + Poll::Ready(x) => match x { + Some(Ok(ref batch)) => { + if batch.num_rows() >= self.target_batch_size + && self.buffer.is_empty() + { + return Poll::Ready(Some(Ok(batch.clone()))); + } else { + // add to the buffered batches + self.buffer.push(batch.clone()); + self.buffered_rows += batch.num_rows(); + // check to see if we have enough batches yet + if self.buffered_rows >= self.target_batch_size { + // combine the batches and return + let mut arrays = + Vec::with_capacity(self.schema.fields().len()); + for i in 0..self.schema.fields().len() { + let source_arrays = self + .buffer + .iter() + .map(|batch| batch.column(i).data_ref().as_ref()) + .collect(); + let mut array_data = MutableArrayData::new( + source_arrays, + true, + self.buffered_rows, + ); + for j in 0..self.buffer.len() { + array_data.extend( + j, + 0, + self.buffer[j].num_rows(), + ); + } + let data = array_data.freeze(); + arrays.push(make_array(Arc::new(data))); + } + let batch = + RecordBatch::try_new(self.schema.clone(), arrays)?; + self.buffer.clear(); + self.buffered_rows = 0; + return Poll::Ready(Some(Ok(batch))); + } + } + } + other => return Poll::Ready(other), + }, + Poll::Pending => return Poll::Pending, } - other => other, - }) + } } fn size_hint(&self) -> (usize, Option) { + //TODO need to do something here? // same number of record batches self.input.size_hint() } From cd9aac0f6995338964fc650962033efcdf409668 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 10:47:28 -0700 Subject: [PATCH 03/10] integrate with optimizer --- .../src/physical_plan/coalesce_batches.rs | 18 ++++++++++++++++-- rust/datafusion/src/physical_plan/planner.rs | 12 ++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 6779cf2c346..ec1486700fc 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -34,9 +34,12 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; +use log::debug; +/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of +/// vectorized processing by upstream operators. #[derive(Debug)] -struct CoalesceBatchesExec { +pub struct CoalesceBatchesExec { /// The input plan input: Arc, /// Minimum number of rows for coalesces batches @@ -44,7 +47,8 @@ struct CoalesceBatchesExec { } impl CoalesceBatchesExec { - fn new(input: Arc, target_batch_size: usize) -> Self { + /// Create a new CoalesceBatchesExec + pub fn new(input: Arc, target_batch_size: usize) -> Self { Self { input, target_batch_size, @@ -162,8 +166,18 @@ impl Stream for CoalesceBatchesStream { } let batch = RecordBatch::try_new(self.schema.clone(), arrays)?; + + debug!( + "Combined {} batches containing {} rows", + self.buffer.len(), + self.buffered_rows + ); + + // reset buffer state self.buffer.clear(); self.buffered_rows = 0; + + // return batch return Poll::Ready(Some(Ok(batch))); } } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index f4b578a9f8c..6caa16e39b5 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -26,6 +26,7 @@ use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, UserDefinedLogicalNode, }; +use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; @@ -100,6 +101,7 @@ impl DefaultPhysicalPlanner { plan: Arc, ctx_state: &ExecutionContextState, ) -> Result> { + let children = plan .children() .iter() @@ -110,6 +112,16 @@ impl DefaultPhysicalPlanner { // leaf node, children cannot be replaced Ok(plan.clone()) } else { + // wrap filter in coalesce batches + let plan = if plan.as_any().downcast_ref::().is_some() { + let target_batch_size = ctx_state.config.batch_size; + Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) + } else { + plan.clone() + }; + + let children = plan.children().clone(); + match plan.required_child_distribution() { Distribution::UnspecifiedDistribution => plan.with_new_children(children), Distribution::SinglePartition => plan.with_new_children( From 5b3302d5bd365a35f3cc29455d6d763c2c0bf351 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 10:47:37 -0700 Subject: [PATCH 04/10] format --- rust/datafusion/src/physical_plan/planner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 6caa16e39b5..8086a69ec54 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -101,7 +101,6 @@ impl DefaultPhysicalPlanner { plan: Arc, ctx_state: &ExecutionContextState, ) -> Result> { - let children = plan .children() .iter() From 100bb99b26ecfa8e1331f72e3c8fe71a20c31c73 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 11:27:17 -0700 Subject: [PATCH 05/10] discard empty batches, document size_hint --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index ec1486700fc..507f22b9a5e 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -135,9 +135,11 @@ impl Stream for CoalesceBatchesStream { { return Poll::Ready(Some(Ok(batch.clone()))); } else { - // add to the buffered batches - self.buffer.push(batch.clone()); - self.buffered_rows += batch.num_rows(); + // add to the buffered batches (if non-empty) + if batch.num_rows() > 0 { + self.buffer.push(batch.clone()); + self.buffered_rows += batch.num_rows(); + } // check to see if we have enough batches yet if self.buffered_rows >= self.target_batch_size { // combine the batches and return @@ -190,8 +192,7 @@ impl Stream for CoalesceBatchesStream { } fn size_hint(&self) -> (usize, Option) { - //TODO need to do something here? - // same number of record batches + // we can't predict the size of incoming batches so re-use the size hint from the input self.input.size_hint() } } From f92ec211ffbcb6e2d4a76354ae8d9bcb7b451552 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 12:21:37 -0700 Subject: [PATCH 06/10] handle end of stream --- .../src/physical_plan/coalesce_batches.rs | 97 +++++++++++-------- 1 file changed, 59 insertions(+), 38 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 507f22b9a5e..48ff7d1ac88 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -134,56 +134,47 @@ impl Stream for CoalesceBatchesStream { && self.buffer.is_empty() { return Poll::Ready(Some(Ok(batch.clone()))); + } else if batch.num_rows() == 0 { + // discard empty batches } else { - // add to the buffered batches (if non-empty) - if batch.num_rows() > 0 { - self.buffer.push(batch.clone()); - self.buffered_rows += batch.num_rows(); - } + // add to the buffered batches + self.buffer.push(batch.clone()); + self.buffered_rows += batch.num_rows(); // check to see if we have enough batches yet if self.buffered_rows >= self.target_batch_size { // combine the batches and return - let mut arrays = - Vec::with_capacity(self.schema.fields().len()); - for i in 0..self.schema.fields().len() { - let source_arrays = self - .buffer - .iter() - .map(|batch| batch.column(i).data_ref().as_ref()) - .collect(); - let mut array_data = MutableArrayData::new( - source_arrays, - true, - self.buffered_rows, - ); - for j in 0..self.buffer.len() { - array_data.extend( - j, - 0, - self.buffer[j].num_rows(), - ); - } - let data = array_data.freeze(); - arrays.push(make_array(Arc::new(data))); - } - let batch = - RecordBatch::try_new(self.schema.clone(), arrays)?; - - debug!( - "Combined {} batches containing {} rows", - self.buffer.len(), - self.buffered_rows - ); - + let batch = concat_batches( + &self.schema, + &self.buffer, + self.buffered_rows, + )?; // reset buffer state self.buffer.clear(); self.buffered_rows = 0; - // return batch return Poll::Ready(Some(Ok(batch))); } } } + None => { + // we have reached the end of the input stream but there could still + // be buffered batches + if self.buffer.is_empty() { + return Poll::Ready(None); + } else { + // combine the batches and return + let batch = concat_batches( + &self.schema, + &self.buffer, + self.buffered_rows, + )?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + return Poll::Ready(Some(Ok(batch))); + } + } other => return Poll::Ready(other), }, Poll::Pending => return Poll::Pending, @@ -202,3 +193,33 @@ impl RecordBatchStream for CoalesceBatchesStream { self.schema.clone() } } + +fn concat_batches( + schema: &SchemaRef, + batches: &[RecordBatch], + row_count: usize, +) -> ArrowResult { + let mut arrays = Vec::with_capacity(schema.fields().len()); + for i in 0..schema.fields().len() { + let source_arrays = batches + .iter() + .map(|batch| batch.column(i).data_ref().as_ref()) + .collect(); + let mut array_data = MutableArrayData::new( + source_arrays, + true, //TODO + row_count, + ); + for j in 0..batches.len() { + array_data.extend(j, 0, batches[j].num_rows()); + } + let data = array_data.freeze(); + arrays.push(make_array(Arc::new(data))); + } + debug!( + "Combined {} batches containing {} rows", + batches.len(), + row_count + ); + RecordBatch::try_new(schema.clone(), arrays) +} From e1b39b4819bac362dd02217b5a393daf0691fe50 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 12:26:26 -0700 Subject: [PATCH 07/10] use concat --- .../src/physical_plan/coalesce_batches.rs | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 48ff7d1ac88..437d34d7078 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -28,7 +28,7 @@ use crate::physical_plan::{ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; -use arrow::array::{make_array, MutableArrayData}; +use arrow::compute::kernels::concat::concat; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -201,20 +201,13 @@ fn concat_batches( ) -> ArrowResult { let mut arrays = Vec::with_capacity(schema.fields().len()); for i in 0..schema.fields().len() { - let source_arrays = batches - .iter() - .map(|batch| batch.column(i).data_ref().as_ref()) - .collect(); - let mut array_data = MutableArrayData::new( - source_arrays, - true, //TODO - row_count, - ); - for j in 0..batches.len() { - array_data.extend(j, 0, batches[j].num_rows()); - } - let data = array_data.freeze(); - arrays.push(make_array(Arc::new(data))); + let array = concat( + &batches + .iter() + .map(|batch| batch.column(i).as_ref()) + .collect::>(), + )?; + arrays.push(array); } debug!( "Combined {} batches containing {} rows", From b9e47aca479023155e052e4649e0348879e6e5ce Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 12:44:21 -0700 Subject: [PATCH 08/10] add unit test --- .../src/physical_plan/coalesce_batches.rs | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 437d34d7078..1d85c83165d 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -216,3 +216,80 @@ fn concat_batches( ); RecordBatch::try_new(schema.clone(), arrays) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::memory::MemoryExec; + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; + + #[tokio::test(threaded_scheduler)] + async fn test_concat_batches() -> Result<()> { + let schema = test_schema(); + let partition = create_vec_batches(&schema, 10)?; + let partitions = vec![partition]; + + let output_partitions = coalesce_batches(&schema, partitions, 20).await?; + assert_eq!(1, output_partitions.len()); + + // input is 10 batches x 8 rows (80 rows) + // expected output is batches of at least 20 rows (except for the final batch) + let batches = &output_partitions[0]; + assert_eq!(4, batches.len()); + assert_eq!(24, batches[0].num_rows()); + assert_eq!(24, batches[1].num_rows()); + assert_eq!(24, batches[2].num_rows()); + assert_eq!(8, batches[3].num_rows()); + + Ok(()) + } + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) + } + + fn create_vec_batches( + schema: &Arc, + num_batches: usize, + ) -> Result> { + let batch = create_batch(schema); + let mut vec = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + vec.push(batch.clone()); + } + Ok(vec) + } + + fn create_batch(schema: &Arc) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))], + ) + .unwrap() + } + + async fn coalesce_batches( + schema: &SchemaRef, + input_partitions: Vec>, + target_batch_size: usize, + ) -> Result>> { + // create physical plan + let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec: Arc = + Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size)); + + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..exec.output_partitioning().partition_count() { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i).await?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + Ok(output_partitions) + } +} From c4668b26f61fb777feb5361f7a5a25763f9cbde0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 29 Dec 2020 13:06:18 -0700 Subject: [PATCH 09/10] fix regression --- rust/datafusion/src/execution/context.rs | 23 +++++++++++++++++------ rust/datafusion/src/physical_plan/mod.rs | 20 ++++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 4cad0192dc3..dae9c90c6a2 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -596,8 +596,8 @@ mod tests { use super::*; use crate::logical_plan::{col, create_udf, sum}; - use crate::physical_plan::collect; use crate::physical_plan::functions::ScalarFunctionImplementation; + use crate::physical_plan::{collect, collect_partitioned}; use crate::test; use crate::variable::VarType; use crate::{ @@ -683,14 +683,25 @@ mod tests { let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; + println!("{:?}", physical_plan); - let results = collect(physical_plan).await?; - - // there should be one batch per partition + let results = collect_partitioned(physical_plan).await?; assert_eq!(results.len(), partition_count); - let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(row_count, 20); + // there should be a total of 2 batches with 20 rows because the where clause filters + // out results from 2 partitions + + // note that the order of partitions is not deterministic + let mut num_batches = 0; + let mut num_rows = 0; + for partition in &results { + for batch in partition { + num_batches += 1; + num_rows += batch.num_rows(); + } + } + assert_eq!(2, num_batches); + assert_eq!(20, num_rows); Ok(()) } diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 9cc0e3054e0..605e5d6f44a 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -104,6 +104,26 @@ pub async fn collect(plan: Arc) -> Result> { } } +/// Execute the [ExecutionPlan] and collect the results in memory +pub async fn collect_partitioned( + plan: Arc, +) -> Result>> { + match plan.output_partitioning().partition_count() { + 0 => Ok(vec![]), + 1 => { + let it = plan.execute(0).await?; + Ok(vec![common::collect(it).await?]) + } + _ => { + let mut partitions = vec![]; + for i in 0..plan.output_partitioning().partition_count() { + partitions.push(common::collect(plan.execute(i).await?).await?) + } + Ok(partitions) + } + } +} + /// Partitioning schemes supported by operators. #[derive(Debug, Clone)] pub enum Partitioning { From bdbbfa20689953310761268114249b77a7883b9c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 30 Dec 2020 08:07:30 -0700 Subject: [PATCH 10/10] address PR feedback --- .../src/physical_plan/coalesce_batches.rs | 5 ++-- rust/datafusion/src/physical_plan/planner.rs | 24 ++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 1d85c83165d..7b71a09b955 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -280,8 +280,9 @@ mod tests { Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size)); // execute and collect results - let mut output_partitions = vec![]; - for i in 0..exec.output_partitioning().partition_count() { + let output_partition_count = exec.output_partitioning().partition_count(); + let mut output_partitions = Vec::with_capacity(output_partition_count); + for i in 0..output_partition_count { // execute this *output* partition and collect all batches let mut stream = exec.execute(i).await?; let mut batches = vec![]; diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 8086a69ec54..fd414890fb6 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -111,9 +111,27 @@ impl DefaultPhysicalPlanner { // leaf node, children cannot be replaced Ok(plan.clone()) } else { - // wrap filter in coalesce batches - let plan = if plan.as_any().downcast_ref::().is_some() { - let target_batch_size = ctx_state.config.batch_size; + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let plan_any = plan.as_any(); + //TODO we should do this in a more generic way either by wrapping all operators + // or having an API so that operators can declare when their inputs or outputs + // need to be wrapped in a coalesce batches operator. + // See https://issues.apache.org/jira/browse/ARROW-11068 + let wrap_in_coalesce = plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some(); + + //TODO we should also do this for HashAggregateExec but we need to update tests + // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 + // || plan_any.downcast_ref::().is_some(); + + let plan = if wrap_in_coalesce { + //TODO we should add specific configuration settings for coalescing batches and + // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is + // implemented. For now, we choose half the configured batch size to avoid copies + // when a small number of rows are removed from a batch + let target_batch_size = ctx_state.config.batch_size / 2; Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) } else { plan.clone()