diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 202c187d35..641ce97a90 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -956,6 +956,12 @@ impl PhysicalPlanner { self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; let filter: Arc = if filter.use_datafusion_filter { + assert_eq!( + scans.len(), + 0, + "Must not use ScanExec as it may requires a copy" + ); + Arc::new(DataFusionFilterExec::try_new( predicate, Arc::clone(&child.native_plan), diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1da3e984bc..aae893208c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2406,19 +2406,19 @@ object QueryPlanSerde extends Logging with CometExprShim { // TODO this could be optimized more to stop walking the tree on hitting // certain operators such as join or aggregate which will copy batches - def containsNativeCometScan(plan: SparkPlan): Boolean = { + def containsNonDataFusionScan(plan: SparkPlan): Boolean = { plan match { - case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) - case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET + case _: CometScanWrapper => true + case scan: CometScanExec => scan.scanImpl != CometConf.SCAN_NATIVE_DATAFUSION case _: CometNativeScanExec => false - case _ => plan.children.exists(containsNativeCometScan) + case _ => plan.children.isEmpty || plan.children.exists(containsNonDataFusionScan) } } val filterBuilder = OperatorOuterClass.Filter .newBuilder() .setPredicate(cond.get) - .setUseDatafusionFilter(!containsNativeCometScan(op)) + .setUseDatafusionFilter(!containsNonDataFusionScan(op)) Some(result.setFilter(filterBuilder).build()) } else { withInfo(op, condition, child)