Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
93d77ee
determine_bounds for f64.
mbutrovich Jun 5, 2025
29fa8cc
Generic determine_bounds.
mbutrovich Jun 5, 2025
0871dab
reservoir_sample.
mbutrovich Jun 5, 2025
ff18a39
Change reservoir_sample to return a RecordBatch since that makes arbi…
mbutrovich Jun 6, 2025
784b416
Checkpoint with plan serialization.
mbutrovich Jun 6, 2025
4871058
Add CometPartitioning.
mbutrovich Jun 6, 2025
d7cbb54
reservoir_sample_indices.
mbutrovich Jun 6, 2025
a2b4d29
checkpoint on sampling batch, converting it to rows, sorting, and the…
mbutrovich Jun 6, 2025
fc3dea6
Checkpoint on bounds for rows.
mbutrovich Jun 6, 2025
a50aad9
Cleanup.
mbutrovich Jun 6, 2025
cbdaf82
Cleanup.
mbutrovich Jun 7, 2025
c17a028
Stash bounds and converter.
mbutrovich Jun 7, 2025
090d7b0
Add partition_indices.
mbutrovich Jun 7, 2025
f71ff39
Use scratch space.
mbutrovich Jun 7, 2025
9fa063a
It works with a lot of copy-pasted code.
mbutrovich Jun 7, 2025
402f446
Cleanup.
mbutrovich Jun 7, 2025
c9b49f9
More testings.
mbutrovich Jun 7, 2025
509ef66
clean up warnings.
mbutrovich Jun 7, 2025
13b542d
clean up warnings.
mbutrovich Jun 7, 2025
e3a28e3
Update test.
mbutrovich Jun 7, 2025
ab03b5c
More testing.
mbutrovich Jun 7, 2025
1a887b5
More testing.
mbutrovich Jun 7, 2025
3fe5ad0
More testing.
mbutrovich Jun 7, 2025
cb5931d
Update shuffle_writer benchmark.
mbutrovich Jun 7, 2025
45cafd2
Remove assertion in tight loop.
mbutrovich Jun 7, 2025
842ab42
Remove OwnedRows transformations to improve performance.
mbutrovich Jun 7, 2025
037daae
Fix logic.
mbutrovich Jun 7, 2025
caa7cc2
reservoir_sample_fuzz test, remove HashSet validation from reservoir_…
mbutrovich Jun 8, 2025
7c58754
Fix missing include.
mbutrovich Jun 8, 2025
eb6fb03
determine_bounds_fuzz test.
mbutrovich Jun 8, 2025
3eca544
More tests.
mbutrovich Jun 8, 2025
f270ab0
Checkpoint pulling partition indices for batch over to RangePartition…
mbutrovich Jun 8, 2025
ed1f894
Checkpoint before adding more tests and trying to refactor shuffle_wr…
mbutrovich Jun 8, 2025
0c6910b
shuffle writer uses external partition mapper.
mbutrovich Jun 8, 2025
7a0bff9
Move bounds generation to RangePartitioner.
mbutrovich Jun 9, 2025
d2f3c69
Break out some repeated code into functions within MultiPartitionShuf…
mbutrovich Jun 9, 2025
cca57b3
Merge branch 'main' into range_partitioning
mbutrovich Jun 9, 2025
a69b564
Update CometFuzzTestSuite.
mbutrovich Jun 9, 2025
6526887
Update golden plans.
mbutrovich Jun 9, 2025
19466c1
Update CometFuzzTestSuite.
mbutrovich Jun 9, 2025
4a451cf
Reduce warnings.
mbutrovich Jun 9, 2025
b39ccb0
clippy
mbutrovich Jun 9, 2025
6f7f0eb
More clippy.
mbutrovich Jun 9, 2025
0d24b56
Merge branch 'main' into range_partitioning
mbutrovich Jun 9, 2025
49ad618
More clippy that my local runs do not show.
mbutrovich Jun 9, 2025
de956ee
Get RangePartitioning sample size from SQLConf.
mbutrovich Jun 10, 2025
dc250fd
Fast path generate_bounds for sample_size >= num_rows.
mbutrovich Jun 10, 2025
c8df6cb
Seed reservoir sampling from partition number like Spark.
mbutrovich Jun 10, 2025
295f532
Docs and refactoring to shuffle_writer. More to do yet.
mbutrovich Jun 10, 2025
b5e4a6e
Update docs and fix sampled columns in generate_bounds.
mbutrovich Jun 10, 2025
ad79a0f
Reduce test time.
mbutrovich Jun 10, 2025
50ab569
More docs and more testing.
mbutrovich Jun 10, 2025
18a62cf
Add warning for large sampleSize.
mbutrovich Jun 10, 2025
1383699
More test docs.
mbutrovich Jun 10, 2025
6e850d4
More tests.
mbutrovich Jun 10, 2025
4663d29
Update test.
mbutrovich Jun 11, 2025
640c6ef
Address feedback.
mbutrovich Jun 11, 2025
a19ac49
Added configs, extended CometNativeShuffleSuite.
mbutrovich Jun 11, 2025
47cb983
Update tests.
mbutrovich Jun 11, 2025
727ef16
Update docs, update expression checking for range partitioning.
mbutrovich Jun 11, 2025
88e80a5
avoid converting to vector of Row for every batch.
mbutrovich Jun 11, 2025
8228429
Simplify CometPartitioning enum.
mbutrovich Jun 11, 2025
b0d9a76
Merge branch 'main' into range_partitioning
mbutrovich Jun 11, 2025
9dc2863
Merge branch 'main' into range_partitioning
mbutrovich Jun 12, 2025
545b42e
Merge branch 'main' into range_partitioning
mbutrovich Jun 12, 2025
02c5ec7
Merge branch 'main' into range_partitioning
mbutrovich Jun 13, 2025
bdb67af
Clippy.
mbutrovich Jun 13, 2025
1852db2
Merge branch 'main' into range_partitioning
mbutrovich Jun 13, 2025
4746937
Better message on fallback.
mbutrovich Jun 13, 2025
385bad7
3.5.6 diff.
mbutrovich Jun 13, 2025
20895e2
3.4.3 diff.
mbutrovich Jun 13, 2025
9074335
4.0.0-preview1 diff.
mbutrovich Jun 13, 2025
0ed15ac
Add comments to diffs.
mbutrovich Jun 14, 2025
b8d1aca
Merge tests.
mbutrovich Jun 14, 2025
cc22548
Change math in determine_bounds_for_rows to see if that makes Miri ha…
mbutrovich Jun 17, 2025
399c87c
Miri-specific loop bounds.
mbutrovich Jun 17, 2025
5fc29cc
Add comments about Miri.
mbutrovich Jun 17, 2025
46a0842
Reduce Miri loop iterations further.
mbutrovich Jun 17, 2025
d527dfe
Revert math changes since the Spark SQL test 22160 doesn't like it, a…
mbutrovich Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a user have both configs enabled? What happens?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is both enabled. They individually control whether hash or range partitioning falls back, respectively.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought. Is there a way to add a unit test with both enabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically every unit test already (including the updated native shuffle suite and fuzz test).

conf("spark.comet.native.shuffle.partitioning.hash.enabled")
.doc("Whether to enable hash partitioning for Comet native shuffle.")
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.range.enabled")
.doc("Whether to enable range partitioning for Comet native shuffle.")
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
Expand Down Expand Up @@ -770,11 +782,13 @@ private[comet] abstract class ConfigEntry[T](

/**
* Retrieves the config value from the current thread-local [[SQLConf]]
*
* @return
*/
def get(): T = get(SQLConf.get)

def defaultValue: Option[T] = None

def defaultValueString: String

override def toString: String = {
Expand All @@ -793,6 +807,7 @@ private[comet] class ConfigEntryWithDefault[T](
version: String)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic, version) {
override def defaultValue: Option[T] = Some(_defaultValue)

override def defaultValueString: String = stringConverter(_defaultValue)

def get(conf: SQLConf): T = {
Expand Down Expand Up @@ -828,6 +843,7 @@ private[comet] class OptionalConfigEntry[T](
}

private[comet] case class ConfigBuilder(key: String) {

import ConfigHelpers._

var _public = true
Expand Down
82 changes: 75 additions & 7 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2282,10 +2282,17 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 266bb343526..c3e3d155813 100644
index 266bb343526..6675cf7b636 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources

import scala.util.Random

+import org.apache.comet.CometConf
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
Expand All @@ -2299,7 +2306,7 @@ index 266bb343526..c3e3d155813 100644
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}

Expand All @@ -2322,7 +2329,7 @@ index 266bb343526..c3e3d155813 100644
// To verify if the bucket pruning works, this function checks two conditions:
// 1) Check if the pruned buckets (before filtering) are empty.
// 2) Verify the final result is the same as the expected one
@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
.queryExecution.executedPlan
val fileScan = getFileScan(planWithoutBucketedScan)
Expand All @@ -2332,7 +2339,7 @@ index 266bb343526..c3e3d155813 100644

val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
@@ -451,28 +461,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -451,28 +463,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
Expand Down Expand Up @@ -2390,7 +2397,7 @@ index 266bb343526..c3e3d155813 100644
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")

// check the output partitioning
@@ -835,11 +866,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -835,11 +868,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")

val scanDF = spark.table("bucketed_table").select("j")
Expand All @@ -2404,7 +2411,40 @@ index 266bb343526..c3e3d155813 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -1026,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -894,7 +927,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
Comment thread
mbutrovich marked this conversation as resolved.
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
@@ -913,7 +949,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {

@@ -943,7 +982,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("bucket coalescing eliminates shuffle") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
@@ -1026,15 +1068,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
val plan = sql(query).queryExecution.executedPlan
Expand Down Expand Up @@ -2816,6 +2856,34 @@ index 52abd248f3a..7a199931a08 100644
case h: HiveTableScanExec => h.partitionPruningPred.collect {
case d: DynamicPruningExpression => d.child
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a902cb3a69e..800a3acbe99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp}
import java.util.{Locale, Set}

import com.google.common.io.Files
+import org.apache.comet.CometConf
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkException, TestUtils}
@@ -838,8 +839,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}

test("SPARK-2554 SumDistinct partial aggregation") {
- checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
- sql("SELECT distinct key FROM src order by key").collect().toSeq)
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false")
+ {
+ checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
+ sql("SELECT distinct key FROM src order by key").collect().toSeq)
+ }
}

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..b4d53dbe900 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
Expand Down
83 changes: 76 additions & 7 deletions dev/diffs/3.5.6.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2278,10 +2278,18 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 746f289c393..0c99d028163 100644
index 746f289c393..a90106a1463 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources

import scala.util.Random

+import org.apache.comet.CometConf
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.types.DataTypeUtils
Expand All @@ -2295,7 +2303,7 @@ index 746f289c393..0c99d028163 100644
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -102,12 +103,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}

Expand All @@ -2318,7 +2326,7 @@ index 746f289c393..0c99d028163 100644
// To verify if the bucket pruning works, this function checks two conditions:
// 1) Check if the pruned buckets (before filtering) are empty.
// 2) Verify the final result is the same as the expected one
@@ -156,7 +165,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
.queryExecution.executedPlan
val fileScan = getFileScan(planWithoutBucketedScan)
Expand All @@ -2328,7 +2336,7 @@ index 746f289c393..0c99d028163 100644

val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
@@ -452,28 +462,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -452,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
Expand Down Expand Up @@ -2386,7 +2394,7 @@ index 746f289c393..0c99d028163 100644
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")

// check the output partitioning
@@ -836,11 +867,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -836,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")

val scanDF = spark.table("bucketed_table").select("j")
Expand All @@ -2400,7 +2408,40 @@ index 746f289c393..0c99d028163 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -1029,15 +1060,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -895,7 +928,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
@@ -914,7 +950,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {

@@ -944,7 +983,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("bucket coalescing eliminates shuffle") {
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
withSQLConf(
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
@@ -1029,15 +1071,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
Expand Down Expand Up @@ -2830,6 +2871,34 @@ index 549431ef4f4..e48f1730da6 100644
withTempDir { dir =>
withSQLConf(
"parquet.crypto.factory.class" ->
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6160c3e5f6c..0956d7d9edc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp}
import java.util.{Locale, Set}

import com.google.common.io.Files
+import org.apache.comet.CometConf
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkException, TestUtils}
@@ -838,8 +839,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}

test("SPARK-2554 SumDistinct partial aggregation") {
- checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
- sql("SELECT distinct key FROM src order by key").collect().toSeq)
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
+ // the same results. Disable Comet native range partitioning.
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false")
+ {
+ checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
+ sql("SELECT distinct key FROM src order by key").collect().toSeq)
+ }
}

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d646f40b3e..7f2cdb8f061 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
Expand Down
Loading
Loading