Skip to content

[Spark SQL] Enable all tests in DynamicPartitionPruningSuite #1839

@andygrove

Description

@andygrove

Describe the bug

Depends on #121

The following tests in DynamicPartitionPruningSuite pass when using native shuffle (due to queries falling back to Spark), but fail when using columnar shuffle (because more of the plan is running natively).

  • SPARK-34637: DPP side broadcast query stage is created firstly
  • avoid reordering broadcast join keys to match input hash partitioning
  • canonicalization and exchange reuse

Steps to reproduce

No response

Expected behavior

Using SPARK-34637: DPP side broadcast query stage is created firstly as an example.

Plan with native shuffle has a SubqueryBroadcast under the FileScan.

PLAN: AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) BroadcastHashJoin [store_id#5284], [store_id#5296], Inner, BuildRight, false
   :- *(3) HashAggregate(keys=[store_id#5284], functions=[], output=[store_id#5284])
   :  +- AQEShuffleRead coalesced
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(store_id#5284, 5), ENSURE_REQUIREMENTS, [plan_id=726]
   :           +- *(1) HashAggregate(keys=[store_id#5284], functions=[], output=[store_id#5284])
   :              +- *(1) Project [store_id#5284]
   :                 +- *(1) Filter (isnotnull(units_sold#5283) AND (units_sold#5283 = 70))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet spark_catalog.default.fact_stats[units_sold#5283,store_id#5284] Batched: true, DataFilters: [isnotnull(units_sold#5283), (units_sold#5283 = 70)], Format: Parquet, Location: InMemoryFileIndex(25 paths)[file:/home/andy/git/apache/apache-spark/sql/core/spark-warehouse/org...., PartitionFilters: [isnotnull(store_id#5284), dynamicpruningexpression(store_id#5284 IN dynamicpruning#5297)], PushedFilters: [IsNotNull(units_sold), EqualTo(units_sold,70)], ReadSchema: struct<units_sold:int>
   :                             +- SubqueryBroadcast dynamicpruning#5297, 0, [store_id#5296], [id=#706]
   :                                +- AdaptiveSparkPlan isFinalPlan=true
                                    +- == Final Plan ==
                                       BroadcastQueryStage 1
                                       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=818]
                                          +- *(1) CometColumnarToRow
                                             +- CometHashAggregate [store_id#5296], [store_id#5296]
                                                +- AQEShuffleRead coalesced
                                                   +- ShuffleQueryStage 0
                                                      +- ReusedExchange [store_id#5296], CometExchange hashpartitioning(store_id#5296, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=749]
                                    +- == Initial Plan ==
                                       BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=703]
                                       +- CometHashAggregate [store_id#5296], [store_id#5296]
                                          +- CometExchange hashpartitioning(store_id#5296, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=701]
                                             +- CometHashAggregate [store_id#5296], [store_id#5296]
                                                +- CometProject [store_id#5296], [store_id#5296]
                                                   +- CometFilter [units_sold#5295, store_id#5296], (isnotnull(units_sold#5295) AND (units_sold#5295 = 70))
                                                      +- CometScan parquet spark_catalog.default.fact_stats[units_sold#5295,store_id#5296] Batched: true, DataFilters: [isnotnull(units_sold#5295), (units_sold#5295 = 70)], Format: CometParquet, Location: InMemoryFileIndex(25 paths)[file:/home/andy/git/apache/apache-spark/sql/core/spark-warehouse/org...., PartitionFilters: [isnotnull(store_id#5296)], PushedFilters: [IsNotNull(units_sold), EqualTo(units_sold,70)], ReadSchema: struct<units_sold:int>
   +- BroadcastQueryStage 3
      +- ReusedExchange [store_id#5296], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=818]

Plan with columnar shuffle does not have a SubqueryBroadcast under the FileScan.

PLAN: AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) CometColumnarToRow
   +- CometBroadcastHashJoin [store_id#5284], [store_id#5296], Inner, BuildRight
      :- CometHashAggregate [store_id#5284], [store_id#5284]
      :  +- AQEShuffleRead coalesced
      :     +- ShuffleQueryStage 0
      :        +- CometColumnarExchange hashpartitioning(store_id#5284, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=671]
      :           +- *(1) HashAggregate(keys=[store_id#5284], functions=[], output=[store_id#5284])
      :              +- *(1) Project [store_id#5284]
      :                 +- *(1) Filter (isnotnull(units_sold#5283) AND (units_sold#5283 = 70))
      :                    +- *(1) ColumnarToRow
      :                       +- FileScan parquet spark_catalog.default.fact_stats[units_sold#5283,store_id#5284] Batched: true, DataFilters: [isnotnull(units_sold#5283), (units_sold#5283 = 70)], Format: Parquet, Location: InMemoryFileIndex(25 paths)[file:/home/andy/git/apache/apache-spark/sql/core/spark-warehouse/org...., PartitionFilters: [isnotnull(store_id#5284), dynamicpruningexpression(true)], PushedFilters: [IsNotNull(units_sold), EqualTo(units_sold,70)], ReadSchema: struct<units_sold:int>
      +- BroadcastQueryStage 5
         +- ReusedExchange [store_id#5296], CometBroadcastExchange [store_id#5296]

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions