Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,12 @@ impl PhysicalPlanner {
self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?;

let filter: Arc<dyn ExecutionPlan> = if filter.use_datafusion_filter {
assert_eq!(
scans.len(),
0,
"Must not use ScanExec as it may requires a copy"
);

Comment on lines +959 to +964
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we only reuse buffers when using native_comet to read the Parquet files. If native_iceberg_compat is used then we will still have a ScanExec reading from JVM but it is safe to use DataFusion's FilterExec in this case.

@parthchandra would you agree with this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. native_iceberg_compat does not reuse buffers.

Arc::new(DataFusionFilterExec::try_new(
predicate,
Arc::clone(&child.native_plan),
Expand Down
10 changes: 5 additions & 5 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2406,19 +2406,19 @@ object QueryPlanSerde extends Logging with CometExprShim {

// TODO this could be optimized more to stop walking the tree on hitting
// certain operators such as join or aggregate which will copy batches
def containsNativeCometScan(plan: SparkPlan): Boolean = {
def containsNonDataFusionScan(plan: SparkPlan): Boolean = {
plan match {
case w: CometScanWrapper => containsNativeCometScan(w.originalPlan)
case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET
case _: CometScanWrapper => true
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to return true as native scan exec is not wrapped in CometScanWrapper

Copy link
Copy Markdown
Member Author

@rluvaton rluvaton May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and when not scan datafusion so it will not use df filter when compat iceberg

case scan: CometScanExec => scan.scanImpl != CometConf.SCAN_NATIVE_DATAFUSION
case _: CometNativeScanExec => false
case _ => plan.children.exists(containsNativeCometScan)
case _ => plan.children.isEmpty || plan.children.exists(containsNonDataFusionScan)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exists return false for empty children as it means that this is the data source, right? and if so it using the ScanExec of rust and not ScanExec of DataFusion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScanExec of rust and not ScanExec of DataFusion

what does this mean?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must start from a scan, right? either ScanExec or DataFusion DataSourceExec so if no children this means that we are at the root, for example ExistingRDD and therefore we must use ScanExec as otherwise we would have converted to CometNativeScanExec

}
}

val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
.setUseDatafusionFilter(!containsNativeCometScan(op))
.setUseDatafusionFilter(!containsNonDataFusionScan(op))
Some(result.setFilter(filterBuilder).build())
} else {
withInfo(op, condition, child)
Expand Down
Loading