Skip to content
50 changes: 27 additions & 23 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,15 +917,15 @@ impl PhysicalPlanner {

let fetch = sort.fetch.map(|num| num as usize);

let copy_exec = if can_reuse_input_batch(&child) {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
};
// SortExec caches batches so we need to make a copy of incoming batches. Also,
// SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and
// it would be more efficient if we could avoid that.
// https://github.com/apache/datafusion-comet/issues/963
let child = Self::wrap_in_copy_exec(child);
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a similar comment here as you added with HashJoin on why Sort needs a CopyExec?


Ok((
scans,
Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)),
Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)),
))
}
OpStruct::Scan(scan) => {
Expand Down Expand Up @@ -1069,9 +1069,17 @@ impl PhysicalPlanner {
join.join_type,
&join.condition,
)?;

// HashJoinExec may cache the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch. We also need to unpack dictionary arrays, because the join operators
// do not support them.
let left = Self::wrap_in_copy_exec(join_params.left);
let right = Self::wrap_in_copy_exec(join_params.right);

let hash_join = Arc::new(HashJoinExec::try_new(
join_params.left,
join_params.right,
left,
right,
join_params.join_on,
join_params.join_filter,
&join_params.join_type,
Expand Down Expand Up @@ -1135,6 +1143,7 @@ impl PhysicalPlanner {
}
}

#[allow(clippy::too_many_arguments)]
fn parse_join_parameters(
&self,
inputs: &mut Vec<Arc<GlobalRef>>,
Expand Down Expand Up @@ -1263,21 +1272,6 @@ impl PhysicalPlanner {
None
};

// DataFusion Join operators keep the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch.
let left = if can_reuse_input_batch(&left) {
Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
};

let right = if can_reuse_input_batch(&right) {
Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
};

Comment on lines -1266 to -1280
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is specific to HashJoinExec and not to SortMergeJoinExec, so I moved it out of this code, which is common to both joins.

Ok((
JoinParameters {
left,
Expand All @@ -1290,6 +1284,16 @@ impl PhysicalPlanner {
))
}

/// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays
/// and make a deep copy of other arrays if the plan re-uses batches.
fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if can_reuse_input_batch(&plan) {
Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
}
}

/// Create a DataFusion physical aggregate expression from Spark physical aggregate expression
fn create_agg_expr(
&self,
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub struct CopyExec {

#[derive(Debug, PartialEq, Clone)]
pub enum CopyMode {
/// Perform a deep copy and also unpack dictionaries
UnpackOrDeepCopy,
/// Perform a clone and also unpack dictionaries
UnpackOrClone,
}

Expand Down