Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 126 additions & 30 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use std::sync::Arc;
use std::{any::Any, sync::Arc};

use crate::{
config::ConfigOptions,
Expand All @@ -29,8 +29,15 @@ use crate::{
},
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use crate::arrow::util::bit_util::ceil;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::{
aggregates::AggregateExec,
joins::SortMergeJoinExec,
limit::{GlobalLimitExec, LocalLimitExec},
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
windows::WindowAggExec,
};

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
Expand All @@ -43,45 +50,134 @@ impl CoalesceBatches {
Self::default()
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if !config.execution.coalesce_batches {
return Ok(plan);
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
#[inline]
fn get_limit(plan: &dyn Any) -> Option<usize> {
if let Some(limit_exec) = plan.downcast_ref::<GlobalLimitExec>() {
limit_exec.fetch().map(|fetch| limit_exec.skip() + fetch)
} else {
plan.downcast_ref::<LocalLimitExec>()
.map(|limit_exec| limit_exec.fetch())
}
}

/// If you have a new operator that needs to scan the whole table, add it here
#[inline]
fn is_scan_all_node(plan: &dyn Any) -> bool {
plan.downcast_ref::<SortMergeJoinExec>().is_some()
|| plan.downcast_ref::<AggregateExec>().is_some()
|| plan.downcast_ref::<SortExec>().is_some()
|| plan.downcast_ref::<SortPreservingMergeExec>().is_some()
|| plan.downcast_ref::<WindowAggExec>().is_some()
}

#[inline]
fn need_wrap_in_coalesce(plan: &dyn Any) -> bool {
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/arrow-datafusion/issues/139
plan.downcast_ref::<FilterExec>().is_some()
|| plan.downcast_ref::<HashJoinExec>().is_some()
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
|| plan
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))
} else {
Ok(Transformed::no(plan))
.unwrap_or(false)
}

fn wrap_in_coalesce_rewrite_inner(
limit: Option<usize>,
has_scan_all_node: bool,
partition: usize,
default_batch_size: usize,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let children = plan
.children()
.iter()
.map(|&child| {
// Update to downstream limit
let limit = match get_limit(child.as_any()) {
None => limit,
v => v,
};
wrap_in_coalesce_rewrite_inner(
limit,
has_scan_all_node || is_scan_all_node(child.as_any()),
partition,
default_batch_size,
child.clone(),
)
})
.collect::<Result<Vec<_>>>()?;

let mut wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any());

// Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch
// If the entire table needs to be scanned, the limit does not take effect
let fetch = if has_scan_all_node {
None
} else {
limit.map(|limit| {
// If limit is small enough, then this optimization is not performed
if limit < partition * 16 {
wrap_in_coalesce = false;
}
ceil(limit, partition)
})
.data()
};

let plan = if children.is_empty() {
plan
} else {
plan.with_new_children(children)?
};

Ok(if wrap_in_coalesce {
Arc::new(CoalesceBatchesExec::new(plan, default_batch_size).with_fetch(fetch))
} else {
plan
})
}

fn wrap_in_coalesce_rewrite(
mut partition: usize,
default_batch_size: usize,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
// The partition is at least 1
if partition == 0 {
partition = 1;
}
wrap_in_coalesce_rewrite_inner(
get_limit(plan.as_any()),
is_scan_all_node(plan.as_any()),
partition,
default_batch_size,
plan,
)
}

impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if !config.execution.coalesce_batches {
return Ok(plan);
}
wrap_in_coalesce_rewrite(
config.execution.target_partitions,
config.execution.batch_size,
plan,
)
}

fn name(&self) -> &str {
Expand Down