Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::{
},
physical_plan::{
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
coalesce_batches::CoalesceBatchesExec,
filter::FilterExec,
joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec},
limit::LocalLimitExec,
Expand Down Expand Up @@ -724,7 +725,31 @@ impl PhysicalPlanner {
.map(|r| (r, format!("col_{}", idx)))
})
.collect();
Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?)))

let projection = Arc::new(ProjectionExec::try_new(exprs?, child)?);
// the scan reuses batches so we need to copy them before coalescing ... if
// this approach looks like it has benefit then we may want to build a custom
// version of CoalesceBatchesExec that copies when needed
let copied = Arc::new(CopyExec::new(projection));

// keep this small to avoid error:
let target_batch_size = 1024;

/*
org.apache.comet.CometNativeException: range end index 8504 out of range for slice of length 8192
at comet::errors::init::{{closure}}(__internal__:0)
at std::panicking::rust_panic_with_hook(__internal__:0)
at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0)
at rust_begin_unwind(__internal__:0)
at core::panicking::panic_fmt(__internal__:0)
at core::slice::index::slice_end_index_len_fail(__internal__:0)
at comet::execution::datafusion::shuffle_writer::external_shuffle::{{closure}}(__internal__:0)
*/

let coalesced: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(copied, target_batch_size));
Ok((scans, coalesced))
}
OpStruct::Filter(filter) => {
assert!(children.len() == 1);
Expand Down