diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index a5bcf5654b..dd476205b9 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -39,6 +39,7 @@ use datafusion::{ }, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, limit::LocalLimitExec, @@ -724,7 +725,31 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?))) + + let projection = Arc::new(ProjectionExec::try_new(exprs?, child)?); + // the scan reuses batches so we need to copy them before coalescing ... if + // this approach looks like it has benefit then we may want to build a custom + // version of CoalesceBatchesExec that copies when needed + let copied = Arc::new(CopyExec::new(projection)); + + // keep this small to avoid error: + let target_batch_size = 1024; + + /* + org.apache.comet.CometNativeException: range end index 8504 out of range for slice of length 8192 + at comet::errors::init::{{closure}}(__internal__:0) + at std::panicking::rust_panic_with_hook(__internal__:0) + at std::panicking::begin_panic_handler::{{closure}}(__internal__:0) + at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0) + at rust_begin_unwind(__internal__:0) + at core::panicking::panic_fmt(__internal__:0) + at core::slice::index::slice_end_index_len_fail(__internal__:0) + at comet::execution::datafusion::shuffle_writer::external_shuffle::{{closure}}(__internal__:0) + */ + + let coalesced: Arc = + Arc::new(CoalesceBatchesExec::new(copied, target_batch_size)); + Ok((scans, coalesced)) } OpStruct::Filter(filter) => { assert!(children.len() == 1);