diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d7c8d74592..bf970a92b2 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -917,15 +917,15 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); - let copy_exec = if can_reuse_input_batch(&child) { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) - }; + // SortExec caches batches so we need to make a copy of incoming batches. Also, + // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and + // it would be more efficient if we could avoid that. + // https://github.com/apache/datafusion-comet/issues/963 + let child = Self::wrap_in_copy_exec(child); Ok(( scans, - Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)), + Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)), )) } OpStruct::Scan(scan) => { @@ -1069,9 +1069,17 @@ impl PhysicalPlanner { join.join_type, &join.condition, )?; + + // HashJoinExec may cache the input batch internally. We need + // to copy the input batch to avoid the data corruption from reusing the input + // batch. We also need to unpack dictionary arrays, because the join operators + // do not support them. + let left = Self::wrap_in_copy_exec(join_params.left); + let right = Self::wrap_in_copy_exec(join_params.right); + let hash_join = Arc::new(HashJoinExec::try_new( - join_params.left, - join_params.right, + left, + right, join_params.join_on, join_params.join_filter, &join_params.join_type, @@ -1135,6 +1143,7 @@ impl PhysicalPlanner { } } + #[allow(clippy::too_many_arguments)] fn parse_join_parameters( &self, inputs: &mut Vec>, @@ -1263,21 +1272,6 @@ impl PhysicalPlanner { None }; - // DataFusion Join operators keep the input batch internally. We need - // to copy the input batch to avoid the data corruption from reusing the input - // batch. - let left = if can_reuse_input_batch(&left) { - Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) - }; - - let right = if can_reuse_input_batch(&right) { - Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) - }; - Ok(( JoinParameters { left, @@ -1290,6 +1284,16 @@ impl PhysicalPlanner { )) } + /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays + /// and make a deep copy of other arrays if the plan re-uses batches. + fn wrap_in_copy_exec(plan: Arc) -> Arc { + if can_reuse_input_batch(&plan) { + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) + } else { + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) + } + } + /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression fn create_agg_expr( &self, diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d6c095a77b..d8e75c67f1 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -50,7 +50,9 @@ pub struct CopyExec { #[derive(Debug, PartialEq, Clone)] pub enum CopyMode { + /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, + /// Perform a clone and also unpack dictionaries UnpackOrClone, }