diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index da0e44c8de4e3..f3af9a9d80f1a 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -18,7 +18,7 @@ //! CoalesceBatches optimizer that groups batches together rows //! in bigger batches to avoid overhead with small batches -use std::sync::Arc; +use std::{any::Any, sync::Arc}; use crate::{ config::ConfigOptions, @@ -29,8 +29,15 @@ use crate::{ }, }; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use crate::arrow::util::bit_util::ceil; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::{ + aggregates::AggregateExec, + joins::SortMergeJoinExec, + limit::{GlobalLimitExec, LocalLimitExec}, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + windows::WindowAggExec, +}; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters @@ -43,27 +50,37 @@ impl CoalesceBatches { Self::default() } } -impl PhysicalOptimizerRule for CoalesceBatches { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - if !config.execution.coalesce_batches { - return Ok(plan); - } - let target_batch_size = config.execution.batch_size; - plan.transform_up(|plan| { - let plan_any = plan.as_any(); - // The goal here is to detect operators that could produce small batches and only - // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here - // would be to build the coalescing logic directly into the operators - // See https://github.com/apache/datafusion/issues/139 - let wrap_in_coalesce = plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some() +#[inline] +fn get_limit(plan: &dyn Any) -> Option { + if let Some(limit_exec) = plan.downcast_ref::() { + limit_exec.fetch().map(|fetch| limit_exec.skip() + fetch) + } else { + plan.downcast_ref::() + .map(|limit_exec| limit_exec.fetch()) + } +} + +/// If you have a new operator that needs to scan the whole table, add it here +#[inline] +fn is_scan_all_node(plan: &dyn Any) -> bool { + plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() +} + +#[inline] +fn need_wrap_in_coalesce(plan: &dyn Any) -> bool { + // The goal here is to detect operators that could produce small batches and only + // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here + // would be to build the coalescing logic directly into the operators + // See https://github.com/apache/arrow-datafusion/issues/139 + plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec - || plan_any + || plan .downcast_ref::() .map(|repart_exec| { !matches!( @@ -71,17 +88,96 @@ impl PhysicalOptimizerRule for CoalesceBatches { Partitioning::RoundRobinBatch(_) ) }) - .unwrap_or(false); - if wrap_in_coalesce { - Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new( - plan, - target_batch_size, - )))) - } else { - Ok(Transformed::no(plan)) + .unwrap_or(false) +} + +fn wrap_in_coalesce_rewrite_inner( + limit: Option, + has_scan_all_node: bool, + partition: usize, + default_batch_size: usize, + plan: Arc, +) -> Result> { + let children = plan + .children() + .iter() + .map(|&child| { + // Update to downstream limit + let limit = match get_limit(child.as_any()) { + None => limit, + v => v, + }; + wrap_in_coalesce_rewrite_inner( + limit, + has_scan_all_node || is_scan_all_node(child.as_any()), + partition, + default_batch_size, + child.clone(), + ) + }) + .collect::>>()?; + + let mut wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); + + // Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch + // If the entire table needs to be scanned, the limit does not take effect + let fetch = if has_scan_all_node { + None + } else { + limit.map(|limit| { + // If limit is small enough, then this optimization is not performed + if limit < partition * 16 { + wrap_in_coalesce = false; } + ceil(limit, partition) }) - .data() + }; + + let plan = if children.is_empty() { + plan + } else { + plan.with_new_children(children)? + }; + + Ok(if wrap_in_coalesce { + Arc::new(CoalesceBatchesExec::new(plan, default_batch_size).with_fetch(fetch)) + } else { + plan + }) +} + +fn wrap_in_coalesce_rewrite( + mut partition: usize, + default_batch_size: usize, + plan: Arc, +) -> Result> { + // The partition is at least 1 + if partition == 0 { + partition = 1; + } + wrap_in_coalesce_rewrite_inner( + get_limit(plan.as_any()), + is_scan_all_node(plan.as_any()), + partition, + default_batch_size, + plan, + ) +} + +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.execution.coalesce_batches { + return Ok(plan); + } + wrap_in_coalesce_rewrite( + config.execution.target_partitions, + config.execution.batch_size, + plan, + ) } fn name(&self) -> &str {