From bc92f6e50bf1e088992e92481504c0de25e506a5 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 26 May 2025 16:55:20 +0300 Subject: [PATCH 1/2] chore: add assertion that not using comet scan but using native scan --- native/core/src/execution/planner.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 202c187d35..641ce97a90 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -956,6 +956,12 @@ impl PhysicalPlanner { self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; let filter: Arc = if filter.use_datafusion_filter { + assert_eq!( + scans.len(), + 0, + "Must not use ScanExec as it may requires a copy" + ); + Arc::new(DataFusionFilterExec::try_new( predicate, Arc::clone(&child.native_plan), From cceb32a2cc6b12a7190bbb0735dbcbed7f528fe8 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 27 May 2025 17:52:30 +0300 Subject: [PATCH 2/2] fix: use native DF filter only when using native DF scan --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1da3e984bc..aae893208c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2406,19 +2406,19 @@ object QueryPlanSerde extends Logging with CometExprShim { // TODO this could be optimized more to stop walking the tree on hitting // certain operators such as join or aggregate which will copy batches - def containsNativeCometScan(plan: SparkPlan): Boolean = { + def containsNonDataFusionScan(plan: SparkPlan): Boolean = { plan match { - case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) - case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET + case _: CometScanWrapper => true + case scan: CometScanExec => scan.scanImpl != CometConf.SCAN_NATIVE_DATAFUSION case _: CometNativeScanExec => false - case _ => plan.children.exists(containsNativeCometScan) + case _ => plan.children.isEmpty || plan.children.exists(containsNonDataFusionScan) } } val filterBuilder = OperatorOuterClass.Filter .newBuilder() .setPredicate(cond.get) - .setUseDatafusionFilter(!containsNativeCometScan(op)) + .setUseDatafusionFilter(!containsNonDataFusionScan(op)) Some(result.setFilter(filterBuilder).build()) } else { withInfo(op, condition, child)