Skip to content

AQE DPP SAB wrapping skipped when V2 scan is wrapped in CometSparkToColumnarExec #4145

@mbutrovich

Description

@mbutrovich

Describe the bug

CometExecRule.transform walks the plan with transformUp and for each node calls convertNode(op) followed by convertSubqueryBroadcasts(converted). When convertNode wraps a V2 BatchScanExec in CometSparkToColumnarExec, the wrapped scan's runtimeFilters (where AQE DPP places its SubqueryAdaptiveBroadcastExec) are hidden from the subsequent convertSubqueryBroadcasts call. transformExpressionsUp only walks the current node's expressions, not its children's.

The SAB never gets wrapped in CometSubqueryAdaptiveBroadcastExec. Spark's PlanAdaptiveDynamicPruningFilters at queryStageOptimizerRule time then finds the unwrapped SAB, fails its sameResult match against the Comet build-side exchange, and replaces DPP with Literal.TrueLiteral.

The code in spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala:

plan.transformUp { case op =>
  val converted = convertNode(op)            // may wrap op in CometSparkToColumnarExec
  convertSubqueryBroadcasts(converted)        // misses wrapped op's runtimeFilters / partitionFilters
}

Not urgent today: V2 Parquet AQE DPP isn't supported in Spark (Comet #3510; SPARK-53439 tracks it upstream, still Open with no fix version, and apache/spark#52180 closed without merging). No current user path hits this. Filing now so it's tracked against when V2 Parquet DPP eventually lands.

Steps to reproduce

Seen while debugging CometExecSuite.AQE DPP: V2 BatchScan broadcast query stage creation order (SPARK-34637) by temporarily enabling CometSparkToColumnarExec for the InMemoryBatchScan:

withSQLConf(
  CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true",
  CometConf.COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.key ->
    "BatchScan,Range,InMemoryTableScan,RDDScan,OneRowRelation",
  // ...existing confs
) { /* run the test */ }

With those set, the plan shows the scan's runtimeFilters containing a plain SubqueryAdaptiveBroadcast (unwrapped) instead of CometSubqueryAdaptiveBroadcast, and the final plan has dynamicpruningexpression(true).

Expected behavior

The SAB should be wrapped in CometSubqueryAdaptiveBroadcastExec regardless of whether convertNode also wraps the scan in CometSparkToColumnarExec, so that CometPlanAdaptiveDynamicPruningFilters can rewrite it into CometSubqueryBroadcastExec at queryStageOptimizerRule time.

Additional context

Surfaced in #4112 (AQE DPP for native Parquet scans with broadcast reuse) while investigating a 4.1 failure in the V2 BatchScan broadcast query stage creation order (SPARK-34637) test. That test was gated with assume(!isSpark41Plus) in #4112 because Spark 4.1 elides the shuffle that gives Comet its entry point for the test's query. This bug turned up when trying to restore the entry point by wrapping the BatchScan in CometSparkToColumnarExec.

Likely fix: reorder in CometExecRule.transform so SAB/SubqueryBroadcastExec wrapping runs before convertNode:

plan.transformUp { case op =>
  val withSubs = convertSubqueryBroadcasts(op)
  convertNode(withSubs)
}

Or a separate pass using transformAllExpressions, which recurses into all nodes' expressions:

val converted = plan.transformUp { case op => convertNode(op) }
converted.transformAllExpressions {
  // SAB / SubqueryBroadcastExec wrapping logic
}

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions