Skip to content

feat: support RangePartitioning with native shuffle#1862

Merged
mbutrovich merged 79 commits intoapache:mainfrom
mbutrovich:range_partitioning
Jun 18, 2025
Merged

feat: support RangePartitioning with native shuffle#1862
mbutrovich merged 79 commits intoapache:mainfrom
mbutrovich:range_partitioning

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented Jun 7, 2025

Which issue does this PR close?

Closes #458.

Rationale for this change

What changes are included in this PR?

  • New configs for hash and range partitioning with native shuffle. Range partitioning, due to its random sampling, can yield differently-ordered query results. It's unclear that this would matter in production, but this makes it easier to make Spark SQL tests work without falling back to Spark entirely
  • New diffs to turn range partitioning with native shuffle off for tests that expect specific partitioning (bucket scan-related tests)
  • Range partitioning added to shuffle_writer benchmark
  • Remove use of DF's Partitioning since Comet's supported schemes don't match. There's a new CometPartitioning enum
  • Moved some repeated shuffle_writer logic to individual functions
  • RangePartitioner class that performs random sampling of input batch and determines partitioning bounds

How are these changes tested?

  • Native tests for RangePartitioner
  • New golden plans that have CometExchange where there used to be CometColumnarExchange
  • Updated fuzz and native shuffle tests

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 7, 2025

Codecov Report

❌ Patch coverage is 84.09091% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.94%. Comparing base (f09f8af) to head (d527dfe).
⚠️ Report is 1161 commits behind head on main.

Files with missing lines Patch % Lines
...t/execution/shuffle/CometNativeShuffleWriter.scala 76.00% 4 Missing and 2 partials ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 90.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1862      +/-   ##
============================================
+ Coverage     56.12%   58.94%   +2.81%     
- Complexity      976     1144     +168     
============================================
  Files           119      130      +11     
  Lines         11743    12823    +1080     
  Branches       2251     2412     +161     
============================================
+ Hits           6591     7558     +967     
- Misses         4012     4044      +32     
- Partials       1140     1221      +81     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Copy Markdown
Member

I ran TPC-H benchmarks and saw shuffles with range partitioning run natively. I did not see any difference in performance compared to the last set of benchmarks I ran some time ago, but I have not compared to the main branch yet.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

mbutrovich commented Jun 7, 2025

I ran TPC-H benchmarks and saw shuffles with range partitioning run natively. I did not see any difference in performance compared to the last set of benchmarks I ran some time ago, but I have not compared to the main branch yet.

Thanks Andy. I'm doing some pretty inefficient stuff to get around ownership issues of Rows, Vec<Row>, Vec<OwnedRow>, etc. that I aim to optimize, so performance should improve. The microbenchmark for shuffle_writer shows range partitioning taking almost twice as long as hash partitioning on my laptop at the moment, but I aim to fix that.

@mbutrovich mbutrovich marked this pull request as ready for review June 12, 2025 16:46
@mbutrovich
Copy link
Copy Markdown
Contributor Author

Looking at the 3 Spark SQL test failures (all related to bucket scan) now that there are fewer 3.5.x diffs to update.

Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try and understand the reservoir sampling logic on this pass, but the PR looks great and I have been testing locally with no issues, Thanks @mbutrovich!

Comment thread dev/diffs/3.4.3.diff
Comment thread spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala Outdated
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a user have both configs enabled? What happens?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is both enabled. They individually control whether hash or range partitioning falls back, respectively.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought. Is there a way to add a unit test with both enabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically every unit test already (including the updated native shuffle suite and fuzz test).

@mbutrovich mbutrovich merged commit 3aa3dc7 into apache:main Jun 18, 2025
73 checks passed
@mbutrovich mbutrovich deleted the range_partitioning branch June 18, 2025 16:18
@Kontinuation
Copy link
Copy Markdown
Member

This implementation of RangePartitioning may be incorrect. RangePartitioning should partition the input DataFrame into partitions with consecutive and non-overlapping ranges, this requires scanning the entire DataFrame to obtain the ranges of each partition before performing the actual shuffle writing.

Here is the PySpark code to illustrate the difference between the behavior of Comet and Vanilla Spark.

spark.range(0, 100000).write.format("parquet").mode("overwrite").save("range-partitioning")

df = spark.read.parquet("range-partitioning")
df_range_partitioned = df.repartitionByRange(10, "id")

df_range_partitioned.explain()

# Show the min and max of each range
def get_partition_bounds(idx, iterator):
    min = None
    max = None
    for row in iterator:
        if min is None or row.id < min:
            min = row.id
        if max is None or row.id > max:
            max = row.id
    yield idx, min, max

partition_bounds = df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()

# Print the results
for partition_id, min_id, max_id in sorted(partition_bounds):
    print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")

Comet:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
   +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Partition 0: min_id=0, max_id=90799
Partition 1: min_id=753, max_id=91680
Partition 2: min_id=1527, max_id=92520
Partition 3: min_id=2399, max_id=93284
Partition 4: min_id=3274, max_id=94123
Partition 5: min_id=4053, max_id=94844
Partition 6: min_id=4851, max_id=95671
Partition 7: min_id=5738, max_id=96522
Partition 8: min_id=6571, max_id=97335
Partition 9: min_id=7408, max_id=99999

Spark:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, [plan_id=197]
   +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Partition 0: min_id=0, max_id=9974
Partition 1: min_id=9975, max_id=19981
Partition 2: min_id=19982, max_id=29993
Partition 3: min_id=29994, max_id=39997
Partition 4: min_id=39998, max_id=49959
Partition 5: min_id=49960, max_id=59995
Partition 6: min_id=59996, max_id=69898
Partition 7: min_id=69899, max_id=79970
Partition 8: min_id=79971, max_id=89976
Partition 9: min_id=89977, max_id=99999

@mbutrovich
Copy link
Copy Markdown
Contributor Author

mbutrovich commented Jun 18, 2025

@Kontinuation thank you for bringing this up! Let me investigate. In the meantime I will open an issue, change the default to false, and put a warning on the config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support RangePartitioning with native shuffle

5 participants