diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 4cad0192dc3..dae9c90c6a2 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -596,8 +596,8 @@ mod tests { use super::*; use crate::logical_plan::{col, create_udf, sum}; - use crate::physical_plan::collect; use crate::physical_plan::functions::ScalarFunctionImplementation; + use crate::physical_plan::{collect, collect_partitioned}; use crate::test; use crate::variable::VarType; use crate::{ @@ -683,14 +683,25 @@ mod tests { let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; + println!("{:?}", physical_plan); - let results = collect(physical_plan).await?; - - // there should be one batch per partition + let results = collect_partitioned(physical_plan).await?; assert_eq!(results.len(), partition_count); - let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(row_count, 20); + // there should be a total of 2 batches with 20 rows because the where clause filters + // out results from 2 partitions + + // note that the order of partitions is not deterministic + let mut num_batches = 0; + let mut num_rows = 0; + for partition in &results { + for batch in partition { + num_batches += 1; + num_rows += batch.num_rows(); + } + } + assert_eq!(2, num_batches); + assert_eq!(20, num_rows); Ok(()) } diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs new file mode 100644 index 00000000000..7b71a09b955 --- /dev/null +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of +//! vectorized processing by upstream operators. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; + +use arrow::compute::kernels::concat::concat; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::stream::{Stream, StreamExt}; +use log::debug; + +/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of +/// vectorized processing by upstream operators. +#[derive(Debug)] +pub struct CoalesceBatchesExec { + /// The input plan + input: Arc, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, +} + +impl CoalesceBatchesExec { + /// Create a new CoalesceBatchesExec + pub fn new(input: Arc, target_batch_size: usize) -> Self { + Self { + input, + target_batch_size, + } + } +} + +#[async_trait] +impl ExecutionPlan for CoalesceBatchesExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// Get the schema for this execution plan + fn schema(&self) -> SchemaRef { + // The coalesce batches operator does not make any changes to the schema of its input + self.input.schema() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + // The coalesce batches operator does not make any changes to the partitioning of its input + self.input.output_partitioning() + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(CoalesceBatchesExec::new( + children[0].clone(), + self.target_batch_size, + ))), + _ => Err(DataFusionError::Internal( + "CoalesceBatchesExec wrong number of children".to_string(), + )), + } + } + + async fn execute(&self, partition: usize) -> Result { + Ok(Box::pin(CoalesceBatchesStream { + input: self.input.execute(partition).await?, + schema: self.input.schema().clone(), + target_batch_size: self.target_batch_size.clone(), + buffer: Vec::new(), + buffered_rows: 0, + })) + } +} + +struct CoalesceBatchesStream { + /// The input plan + input: SendableRecordBatchStream, + /// The input schema + schema: SchemaRef, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, + /// Buffered batches + buffer: Vec, + /// Buffered row count + buffered_rows: usize, +} + +impl Stream for CoalesceBatchesStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + let input_batch = self.input.poll_next_unpin(cx); + match input_batch { + Poll::Ready(x) => match x { + Some(Ok(ref batch)) => { + if batch.num_rows() >= self.target_batch_size + && self.buffer.is_empty() + { + return Poll::Ready(Some(Ok(batch.clone()))); + } else if batch.num_rows() == 0 { + // discard empty batches + } else { + // add to the buffered batches + self.buffer.push(batch.clone()); + self.buffered_rows += batch.num_rows(); + // check to see if we have enough batches yet + if self.buffered_rows >= self.target_batch_size { + // combine the batches and return + let batch = concat_batches( + &self.schema, + &self.buffer, + self.buffered_rows, + )?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + return Poll::Ready(Some(Ok(batch))); + } + } + } + None => { + // we have reached the end of the input stream but there could still + // be buffered batches + if self.buffer.is_empty() { + return Poll::Ready(None); + } else { + // combine the batches and return + let batch = concat_batches( + &self.schema, + &self.buffer, + self.buffered_rows, + )?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + return Poll::Ready(Some(Ok(batch))); + } + } + other => return Poll::Ready(other), + }, + Poll::Pending => return Poll::Pending, + } + } + } + + fn size_hint(&self) -> (usize, Option) { + // we can't predict the size of incoming batches so re-use the size hint from the input + self.input.size_hint() + } +} + +impl RecordBatchStream for CoalesceBatchesStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +fn concat_batches( + schema: &SchemaRef, + batches: &[RecordBatch], + row_count: usize, +) -> ArrowResult { + let mut arrays = Vec::with_capacity(schema.fields().len()); + for i in 0..schema.fields().len() { + let array = concat( + &batches + .iter() + .map(|batch| batch.column(i).as_ref()) + .collect::>(), + )?; + arrays.push(array); + } + debug!( + "Combined {} batches containing {} rows", + batches.len(), + row_count + ); + RecordBatch::try_new(schema.clone(), arrays) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::memory::MemoryExec; + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; + + #[tokio::test(threaded_scheduler)] + async fn test_concat_batches() -> Result<()> { + let schema = test_schema(); + let partition = create_vec_batches(&schema, 10)?; + let partitions = vec![partition]; + + let output_partitions = coalesce_batches(&schema, partitions, 20).await?; + assert_eq!(1, output_partitions.len()); + + // input is 10 batches x 8 rows (80 rows) + // expected output is batches of at least 20 rows (except for the final batch) + let batches = &output_partitions[0]; + assert_eq!(4, batches.len()); + assert_eq!(24, batches[0].num_rows()); + assert_eq!(24, batches[1].num_rows()); + assert_eq!(24, batches[2].num_rows()); + assert_eq!(8, batches[3].num_rows()); + + Ok(()) + } + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) + } + + fn create_vec_batches( + schema: &Arc, + num_batches: usize, + ) -> Result> { + let batch = create_batch(schema); + let mut vec = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + vec.push(batch.clone()); + } + Ok(vec) + } + + fn create_batch(schema: &Arc) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))], + ) + .unwrap() + } + + async fn coalesce_batches( + schema: &SchemaRef, + input_partitions: Vec>, + target_batch_size: usize, + ) -> Result>> { + // create physical plan + let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; + let exec: Arc = + Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size)); + + // execute and collect results + let output_partition_count = exec.output_partitioning().partition_count(); + let mut output_partitions = Vec::with_capacity(output_partition_count); + for i in 0..output_partition_count { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i).await?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + Ok(output_partitions) + } +} diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index d0dcede78d0..605e5d6f44a 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -104,6 +104,26 @@ pub async fn collect(plan: Arc) -> Result> { } } +/// Execute the [ExecutionPlan] and collect the results in memory +pub async fn collect_partitioned( + plan: Arc, +) -> Result>> { + match plan.output_partitioning().partition_count() { + 0 => Ok(vec![]), + 1 => { + let it = plan.execute(0).await?; + Ok(vec![common::collect(it).await?]) + } + _ => { + let mut partitions = vec![]; + for i in 0..plan.output_partitioning().partition_count() { + partitions.push(common::collect(plan.execute(i).await?).await?) + } + Ok(partitions) + } + } +} + /// Partitioning schemes supported by operators. #[derive(Debug, Clone)] pub enum Partitioning { @@ -248,6 +268,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; +pub mod coalesce_batches; pub mod common; pub mod csv; pub mod datetime_expressions; diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index f4b578a9f8c..fd414890fb6 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -26,6 +26,7 @@ use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, UserDefinedLogicalNode, }; +use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr}; use crate::physical_plan::filter::FilterExec; @@ -110,6 +111,34 @@ impl DefaultPhysicalPlanner { // leaf node, children cannot be replaced Ok(plan.clone()) } else { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let plan_any = plan.as_any(); + //TODO we should do this in a more generic way either by wrapping all operators + // or having an API so that operators can declare when their inputs or outputs + // need to be wrapped in a coalesce batches operator. + // See https://issues.apache.org/jira/browse/ARROW-11068 + let wrap_in_coalesce = plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some() + || plan_any.downcast_ref::().is_some(); + + //TODO we should also do this for HashAggregateExec but we need to update tests + // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068 + // || plan_any.downcast_ref::().is_some(); + + let plan = if wrap_in_coalesce { + //TODO we should add specific configuration settings for coalescing batches and + // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is + // implemented. For now, we choose half the configured batch size to avoid copies + // when a small number of rows are removed from a batch + let target_batch_size = ctx_state.config.batch_size / 2; + Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) + } else { + plan.clone() + }; + + let children = plan.children().clone(); + match plan.required_child_distribution() { Distribution::UnspecifiedDistribution => plan.with_new_children(children), Distribution::SinglePartition => plan.with_new_children(