Skip to content

feat: AQE DPP for native Parquet scans with broadcast reuse#4112

Open
mbutrovich wants to merge 45 commits intoapache:mainfrom
mbutrovich:aqe_dpp_parquet
Open

feat: AQE DPP for native Parquet scans with broadcast reuse#4112
mbutrovich wants to merge 45 commits intoapache:mainfrom
mbutrovich:aqe_dpp_parquet

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented Apr 27, 2026

Which issue does this PR close?

Partially addresses #3510. Closes #4045. Related PRs: #4011 (non-AQE DPP), #4053 (scalar subquery pushdown + CometReuseSubquery), #4037 (non-AQE DPP edge case tests), #4033 (AQE DPP for Iceberg, draft).

Rationale for this change

Under AQE (the default), Spark creates SubqueryAdaptiveBroadcastExec (SAB) for DPP. Spark's PlanAdaptiveDynamicPruningFilters converts these by finding BroadcastHashJoinExec in the plan. After Comet replaces it with CometBroadcastHashJoinExec, Spark's rule can't find a match and replaces DPP with Literal.TrueLiteral, disabling partition pruning. Previously, the isAqeDynamicPruningFilter rejection caused the scan to fall back to Spark entirely, losing native acceleration for all DPP queries under AQE.

What changes are included in this PR?

Spark 3.5+: two-phase SAB conversion

Spark's PlanAdaptiveDynamicPruningFilters runs before custom queryStageOptimizerRules and converts SABs to TrueLiteral. We work around this in two phases:

  1. CometExecRule (queryStagePreparationRules, before Spark's rule): wraps SABs in CometSubqueryAdaptiveBroadcastExec so Spark's pattern match doesn't recognize them. Wraps all SABs regardless of scan type, so CometPlanAdaptiveDynamicPruningFilters can convert them for both Comet native scans and non-Comet scans (e.g., V2 BatchScan).
  2. CometPlanAdaptiveDynamicPruningFilters (queryStageOptimizerRule, after Spark's rule): converts following Spark's decision tree:
    • exchangeReuseEnabled + matching broadcast join: CometSubqueryBroadcastExec wired to BroadcastQueryStageExec for broadcast reuse.
    • No reusable broadcast + onlyInBroadcast=true: Literal.TrueLiteral.
    • No reusable broadcast + onlyInBroadcast=false: aggregate SubqueryExec (matching PlanAdaptiveDynamicPruningFilters.scala:68-79).

Cross-stage broadcast search

Spark's rule is constructed with rootPlan = this (each ASPE's own instance). Custom queryStageOptimizerRules are shared across all ASPEs without a per-ASPE rootPlan. We approximate with two searches:

  1. stagePlan (the plan arg to apply()): same-stage joins and scalar subqueries where scan and join are under one exchange.
  2. context.qe.executedPlan (the main query's ASPE): cross-stage joins where a shuffle separates scan from broadcast join.

When the broadcast is not yet materialized (cross-stage case), we follow Spark's pattern (lines 44-64): construct a new broadcast exchange, wrap in a new ASPE, and let AQE's stageCache canonicalization ensure the broadcast runs once.

Subquery deduplication via shared cache

Our rule runs after Spark's ReuseAdaptiveSubquery (which can't see our subqueries because they don't exist yet). We register DPP subqueries directly in AdaptiveExecutionContext.subqueryCache, matching ReuseAdaptiveSubquery's behavior for cross-plan reuse (e.g., main query and scalar subquery with identical DPP).

Dual-filter resolution

CometNativeScanExec.partitionFilters and CometScanExec.partitionFilters contain separate InSubqueryExec instances. CometExecRule only wraps the outer filters (the inner CometScanExec is @transient). CometPlanAdaptiveDynamicPruningFilters converts both.

Spark 3.4: narrow-tagging fallback (CometSpark34AqeDppFallbackRule)

injectQueryStageOptimizerRule is unavailable on 3.4 (SPARK-45785 added it in 3.5), so CometPlanAdaptiveDynamicPruningFilters can't run. Rewriting the SAB at queryStagePrepRule time also doesn't work: AQE rebuilds plan nodes between prep and execution in ways that drop the @transient inner scan needed for the dual-filter update.

Instead, on 3.4 we arrange for Spark's PlanAdaptiveDynamicPruningFilters to succeed on its own by tagging specific nodes to stay Spark-native. The rule only writes skip-tags; it never rewrites expressions or plan structure. Tags are honored by CometScanRule and CometExecRule, and survive AQE per-stage re-entry (same contract as the existing SKIP_COMET_SHUFFLE_TAG from #4010). Four cases:

  1. SAB + matching BHJ (non-V1 scans: Hive, V2, V2Filter): tag the BHJ's build-side BroadcastExchangeExec with SKIP_COMET_BROADCAST_TAG. Comet's BHJ conversion then fails its all-Comet-children guard and the BHJ stays Spark; Spark's rule matches via sameResult and creates SubqueryBroadcastExec.
  2. SAB + matching BHJ on V1: CometScanRule.transformV1Scan already rejects the V1 fact scan; the cascade keeps the BHJ and its BroadcastExchangeExec Spark-native. No tagging needed. V1 BHJ queries (e.g. TPC-DS Q7) behave exactly as today on 3.4 main, including Comet acceleration on dim scans below the Spark broadcast.
  3. SAB with no matching BHJ (V1 SMJ self-join, e.g. SPARK-32509 with AUTO_BROADCASTJOIN_THRESHOLD=-1): tag peer scans + their shuffles so both self-join branches end up Spark-native with matching canonical forms. Spark's rule replaces the SAB with TrueLiteral; FileSourceScanExec.doCanonicalize strips it, restoring shuffle exchange reuse.
  4. SubqueryBroadcastExec-bearing scans (AQE re-optimize): on re-optimize, ASPE.preprocessingRules (PlanAdaptiveSubqueries) fills the DPP slot with the already-materialized SubqueryBroadcastExec rather than the original SAB, and the freshly-planned main-BHJ build BroadcastExchangeExec is a new instance with no tag. The rule also scans for SubqueryBroadcastExec (descending into QueryStageExec via AdaptiveSparkPlanHelper), extracts its buildKeys, and tags the matching BHJ's build BE so AQE stageCache can dedupe with the DPP subquery's broadcast.

Registered via a new injectPreSpark35QueryStagePrepRuleShim (3.4 only; no-op on 3.5+). The rule asserts !isSpark35Plus at entry.

Known limitation on 3.4: cross-plan scalar-subquery DPP (same limitation Spark's own rule has on 3.4). At prep-rule time each ASPE sees only its own plan, so an SAB in a scalar subquery can't see a matching BHJ in the main query. Produces correct results via Spark's rule falling through to TrueLiteral / aggregate SubqueryExec; only broadcast reuse is lost in that edge case.

Broadcast fallback cases (3.5+)

  • Spark BHJ (Comet BHJ disabled): finds BroadcastHashJoinExec, creates SubqueryBroadcastExec via shim.
  • SMJ (no broadcast): falls back to Literal.TrueLiteral or aggregate SubqueryExec depending on onlyInBroadcast.
  • ReusedExchangeExec: BroadcastQueryStageExec.plan may be ReusedExchangeExec when AQE reuses exchanges across plans. The rule unwraps it to verify the underlying exchange type.

Other changes

  • CometBroadcastExchangeExec: handles non-Comet children (e.g., LocalTableScan after AQE re-optimization of empty broadcasts) by wrapping in CometSparkToColumnarExec.
  • CometNativeScanExec.doCanonicalize: strips DPP filters from originalPlan to prevent stale SABs from blocking exchange reuse.
  • CometShuffleExchangeExec.doCanonicalize: excludes originalPlan from canonical form (matches CometBroadcastExchangeExec).
  • CometScanUtils.filterUnusedDynamicPruningExpressions: strips unconverted SABs in addition to TrueLiteral, matching Spark's FileSourceScanExec.filterUnusedDynamicPruningExpressions.
  • ShimPrepareExecutedPlan: new shim for QueryExecution.prepareExecutedPlan (3-arg on 3.x/4.0, 2-arg on 4.1+).
  • Existing DPP tests (CometDppFallbackRepro3949Suite, CometShuffleFallbackStickinessSuite) updated to disable native scan to preserve the stageContainsDPPScan stickiness code path.
  • Removed IgnoreComet(#4045) tags from Spark's DynamicPartitionPruningSuite diffs for SPARK-32509 and SPARK-34637. Tests ported to CometExecSuite with version-specific assertions.

How are these changes tested?

16 new AQE DPP tests in CometExecSuite covering BHJ / SMJ / empty broadcast / dual filters / exchange reuse / non-atomic types / cross-stage search / scalar subquery deduplication / SPARK-32509 / SPARK-34637 / SPARK-39447. SPARK-32509 and SPARK-34637 ports are un-gated: SPARK-32509 asserts 1 ReusedExchangeExec on all versions; SPARK-34637 asserts CometSubqueryBroadcastExec on 3.5-4.0 and Spark-native SubqueryBroadcastExec on 3.4 and 4.1+. The V2 BatchScan variant runs on 3.4 with an explicit hasReuse check mirroring Spark's checkPartitionPruningPredicate, exercising case 4 above. Existing non-AQE DPP tests renamed to consistent "[non-AQE|AQE] DPP: <scenario>" format.

@mbutrovich mbutrovich self-assigned this Apr 28, 2026
@mbutrovich mbutrovich added enhancement New feature or request native_datafusion Specific to native_datafusion scan type area:scan Parquet scan / data reading labels Apr 28, 2026
@mbutrovich mbutrovich marked this pull request as ready for review April 29, 2026 20:48
Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I will run benchmarks today to confirm no regressions. Thanks @mbutrovich!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:scan Parquet scan / data reading enhancement New feature or request native_datafusion Specific to native_datafusion scan type

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Comet DPP exchange/broadcast reuse fails under AQE

2 participants