From 433f9b6e99866e14e07e1918b3ddbf47ce07e335 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Wed, 14 Aug 2024 20:46:27 +0800 Subject: [PATCH 1/6] feat: optimize CoalesceBatches in limit --- .../physical_optimizer/coalesce_batches.rs | 150 ++++++++++++++---- 1 file changed, 119 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index da0e44c8de4e3..93b7c0d399fb6 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -18,7 +18,7 @@ //! CoalesceBatches optimizer that groups batches together rows //! in bigger batches to avoid overhead with small batches -use std::sync::Arc; +use std::{any::Any, sync::Arc}; use crate::{ config::ConfigOptions, @@ -29,8 +29,15 @@ use crate::{ }, }; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use crate::arrow::util::bit_util::ceil; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::{ + aggregates::AggregateExec, + joins::SortMergeJoinExec, + limit::{GlobalLimitExec, LocalLimitExec}, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + windows::WindowAggExec, +}; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters @@ -43,27 +50,36 @@ impl CoalesceBatches { Self::default() } } -impl PhysicalOptimizerRule for CoalesceBatches { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - if !config.execution.coalesce_batches { - return Ok(plan); - } - let target_batch_size = config.execution.batch_size; - plan.transform_up(|plan| { - let plan_any = plan.as_any(); - // The goal here is to detect operators that could produce small batches and only - // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here - // would be to build the coalescing logic directly into the operators - // See https://github.com/apache/datafusion/issues/139 - let wrap_in_coalesce = plan_any.downcast_ref::().is_some() - || plan_any.downcast_ref::().is_some() +#[inline] +fn get_limit(plan: &dyn Any) -> Option { + if let Some(limit_exec) = plan.downcast_ref::() { + limit_exec.fetch() + } else { + plan.downcast_ref::() + .map(|limit_exec| limit_exec.fetch()) + } +} + +#[inline] +fn need_scan_all(plan: &dyn Any) -> bool { + plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() +} + +#[inline] +fn need_wrap_in_coalesce(plan: &dyn Any) -> bool { + // The goal here is to detect operators that could produce small batches and only + // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here + // would be to build the coalescing logic directly into the operators + // See https://github.com/apache/arrow-datafusion/issues/139 + plan.downcast_ref::().is_some() + || plan.downcast_ref::().is_some() // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec - || plan_any + || plan .downcast_ref::() .map(|repart_exec| { !matches!( @@ -71,17 +87,89 @@ impl PhysicalOptimizerRule for CoalesceBatches { Partitioning::RoundRobinBatch(_) ) }) - .unwrap_or(false); - if wrap_in_coalesce { - Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new( - plan, - target_batch_size, - )))) - } else { - Ok(Transformed::no(plan)) - } + .unwrap_or(false) +} + +fn wrap_in_coalesce_rewrite_inner( + mut limit: Option, + partition: usize, + default_batch_size: usize, + plan: Arc, +) -> Result> { + // If the entire table needs to be scanned, the limit at the upper level does not take effect + if need_scan_all(plan.as_any()) { + limit = None + } + let children = plan + .children() + .iter() + .map(|&child| { + // Update to downstream limit + let limit = match get_limit(child.as_any()) { + None => limit, + v => v, + }; + wrap_in_coalesce_rewrite_inner( + limit, + partition, + default_batch_size, + child.clone(), + ) }) - .data() + .collect::>>()?; + + let wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); + + // Take the smaller of `limit/partition` and `default_batch_size` as target_batch_size + let target_batch_size = match limit { + Some(limit) => std::cmp::min(ceil(limit, partition), default_batch_size), + None => default_batch_size, + }; + + let plan = if children.is_empty() { + plan + } else { + plan.with_new_children(children)? + }; + + Ok(if wrap_in_coalesce { + Arc::new(CoalesceBatchesExec::new(plan, target_batch_size)) + } else { + plan + }) +} + +fn wrap_in_coalesce_rewrite( + mut partition: usize, + default_batch_size: usize, + plan: Arc, +) -> Result> { + // The partition is at least 1 + if partition == 0 { + partition = 1; + } + wrap_in_coalesce_rewrite_inner( + get_limit(plan.as_any()), + partition, + default_batch_size, + plan, + ) +} + +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.execution.coalesce_batches { + return Ok(plan); + } + wrap_in_coalesce_rewrite( + config.execution.target_partitions, + config.execution.batch_size, + plan, + ) } fn name(&self) -> &str { From b71b51a90e59c88eca8d5f5e69539636678ad7da Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 15 Aug 2024 10:27:56 +0800 Subject: [PATCH 2/6] add with_fetch in CoalesceBatchesExec --- .../core/src/physical_optimizer/coalesce_batches.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 93b7c0d399fb6..66295406b0015 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -120,10 +120,13 @@ fn wrap_in_coalesce_rewrite_inner( let wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); - // Take the smaller of `limit/partition` and `default_batch_size` as target_batch_size - let target_batch_size = match limit { - Some(limit) => std::cmp::min(ceil(limit, partition), default_batch_size), - None => default_batch_size, + let (target_batch_size, fetch) = match limit { + Some(limit) => ( + default_batch_size, + // Take the smaller of `limit/partition` and `default_batch_size` as CoalesceBatchesExec's fetch + Some(std::cmp::min(ceil(limit, partition), default_batch_size)), + ), + None => (default_batch_size, None), }; let plan = if children.is_empty() { @@ -133,7 +136,7 @@ fn wrap_in_coalesce_rewrite_inner( }; Ok(if wrap_in_coalesce { - Arc::new(CoalesceBatchesExec::new(plan, target_batch_size)) + Arc::new(CoalesceBatchesExec::new(plan, target_batch_size).with_fetch(fetch)) } else { plan }) From b56fcea086852f6794cd3fafca0b0a5e0a921bfd Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 15 Aug 2024 10:33:58 +0800 Subject: [PATCH 3/6] ceil(limit,partition) as fetch --- datafusion/core/src/physical_optimizer/coalesce_batches.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 66295406b0015..d2847b8787bbd 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -123,8 +123,8 @@ fn wrap_in_coalesce_rewrite_inner( let (target_batch_size, fetch) = match limit { Some(limit) => ( default_batch_size, - // Take the smaller of `limit/partition` and `default_batch_size` as CoalesceBatchesExec's fetch - Some(std::cmp::min(ceil(limit, partition), default_batch_size)), + // Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch + Some(ceil(limit, partition)), ), None => (default_batch_size, None), }; From 6b6e031b779f888f0dc177a6361d86bdd3071fcc Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 15 Aug 2024 11:28:53 +0800 Subject: [PATCH 4/6] optimize small limit --- .../physical_optimizer/coalesce_batches.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index d2847b8787bbd..53968db58d7a8 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -118,16 +118,16 @@ fn wrap_in_coalesce_rewrite_inner( }) .collect::>>()?; - let wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); - - let (target_batch_size, fetch) = match limit { - Some(limit) => ( - default_batch_size, - // Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch - Some(ceil(limit, partition)), - ), - None => (default_batch_size, None), - }; + let mut wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); + + // Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch + let fetch = limit.map(|limit| { + // If limit is small enough, then this optimization is not performed + if limit < partition * 16 { + wrap_in_coalesce = false; + } + ceil(limit, partition) + }); let plan = if children.is_empty() { plan @@ -136,7 +136,7 @@ fn wrap_in_coalesce_rewrite_inner( }; Ok(if wrap_in_coalesce { - Arc::new(CoalesceBatchesExec::new(plan, target_batch_size).with_fetch(fetch)) + Arc::new(CoalesceBatchesExec::new(plan, default_batch_size).with_fetch(fetch)) } else { plan }) From a7077bb3c2b51d8992e1664377f9b58556b6858b Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 15 Aug 2024 14:16:28 +0800 Subject: [PATCH 5/6] add offset --- datafusion/core/src/physical_optimizer/coalesce_batches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 53968db58d7a8..ea32967a2fd93 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -54,7 +54,7 @@ impl CoalesceBatches { #[inline] fn get_limit(plan: &dyn Any) -> Option { if let Some(limit_exec) = plan.downcast_ref::() { - limit_exec.fetch() + limit_exec.fetch().map(|fetch| limit_exec.skip() + fetch) } else { plan.downcast_ref::() .map(|limit_exec| limit_exec.fetch()) From 34e9af9d144072341fefe36aa69294a6bbba4c07 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 15 Aug 2024 16:49:43 +0800 Subject: [PATCH 6/6] refactor scan all function --- .../physical_optimizer/coalesce_batches.rs | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index ea32967a2fd93..f3af9a9d80f1a 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -61,8 +61,9 @@ fn get_limit(plan: &dyn Any) -> Option { } } +/// If you have a new operator that needs to scan the whole table, add it here #[inline] -fn need_scan_all(plan: &dyn Any) -> bool { +fn is_scan_all_node(plan: &dyn Any) -> bool { plan.downcast_ref::().is_some() || plan.downcast_ref::().is_some() || plan.downcast_ref::().is_some() @@ -91,15 +92,12 @@ fn need_wrap_in_coalesce(plan: &dyn Any) -> bool { } fn wrap_in_coalesce_rewrite_inner( - mut limit: Option, + limit: Option, + has_scan_all_node: bool, partition: usize, default_batch_size: usize, plan: Arc, ) -> Result> { - // If the entire table needs to be scanned, the limit at the upper level does not take effect - if need_scan_all(plan.as_any()) { - limit = None - } let children = plan .children() .iter() @@ -111,6 +109,7 @@ fn wrap_in_coalesce_rewrite_inner( }; wrap_in_coalesce_rewrite_inner( limit, + has_scan_all_node || is_scan_all_node(child.as_any()), partition, default_batch_size, child.clone(), @@ -121,13 +120,18 @@ fn wrap_in_coalesce_rewrite_inner( let mut wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any()); // Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch - let fetch = limit.map(|limit| { - // If limit is small enough, then this optimization is not performed - if limit < partition * 16 { - wrap_in_coalesce = false; - } - ceil(limit, partition) - }); + // If the entire table needs to be scanned, the limit does not take effect + let fetch = if has_scan_all_node { + None + } else { + limit.map(|limit| { + // If limit is small enough, then this optimization is not performed + if limit < partition * 16 { + wrap_in_coalesce = false; + } + ceil(limit, partition) + }) + }; let plan = if children.is_empty() { plan @@ -153,6 +157,7 @@ fn wrap_in_coalesce_rewrite( } wrap_in_coalesce_rewrite_inner( get_limit(plan.as_any()), + is_scan_all_node(plan.as_any()), partition, default_batch_size, plan,