Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ index 02990a7a40d..bddf5e1ccc2 100644
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
index cfc8b2cc845..c6fcfd7bd08 100644
index cfc8b2cc845..c4be7eb3731 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -2385,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 266bb343526..e58a2f49eb9 100644
index 266bb343526..f8ad838e2b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
Expand All @@ -2409,7 +2409,7 @@ index 266bb343526..e58a2f49eb9 100644
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -101,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}

Expand All @@ -2419,6 +2419,7 @@ index 266bb343526..e58a2f49eb9 100644
+ val fileScan = collect(plan) {
+ case f: FileSourceScanExec => f
+ case f: CometScanExec => f
+ case f: CometNativeScanExec => f
+ }
assert(fileScan.nonEmpty, plan)
fileScan.head
Expand All @@ -2427,12 +2428,13 @@ index 266bb343526..e58a2f49eb9 100644
+ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match {
+ case fs: FileSourceScanExec => fs.bucketedScan
+ case bs: CometScanExec => bs.bucketedScan
+ case ns: CometNativeScanExec => ns.bucketedScan
+ }
+
// To verify if the bucket pruning works, this function checks two conditions:
// 1) Check if the pruned buckets (before filtering) are empty.
// 2) Verify the final result is the same as the expected one
@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -155,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
.queryExecution.executedPlan
val fileScan = getFileScan(planWithoutBucketedScan)
Expand All @@ -2442,7 +2444,7 @@ index 266bb343526..e58a2f49eb9 100644

val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -451,28 +465,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
Expand Down Expand Up @@ -2505,7 +2507,7 @@ index 266bb343526..e58a2f49eb9 100644
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")

// check the output partitioning
@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -835,11 +875,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")

val scanDF = spark.table("bucketed_table").select("j")
Expand All @@ -2519,7 +2521,7 @@ index 266bb343526..e58a2f49eb9 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -894,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
Expand All @@ -2530,7 +2532,7 @@ index 266bb343526..e58a2f49eb9 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -913,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
Expand All @@ -2541,7 +2543,7 @@ index 266bb343526..e58a2f49eb9 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {

@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -943,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}

test("bucket coalescing eliminates shuffle") {
Expand All @@ -2552,7 +2554,7 @@ index 266bb343526..e58a2f49eb9 100644
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -1026,15 +1075,26 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
val plan = sql(query).queryExecution.executedPlan
Expand All @@ -2565,6 +2567,7 @@ index 266bb343526..e58a2f49eb9 100644
val scans = plan.collect {
case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f
+ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b
+ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b
}
if (expectedCoalescedNumBuckets.isDefined) {
assert(scans.length == 1)
Expand All @@ -2574,6 +2577,8 @@ index 266bb343526..e58a2f49eb9 100644
+ assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
+ case b: CometScanExec =>
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
+ case b: CometNativeScanExec =>
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
+ }
} else {
assert(scans.isEmpty)
Expand Down Expand Up @@ -2604,25 +2609,26 @@ index b5f6d2f9f68..277784a92af 100644

protected override lazy val sql = spark.sql _
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
index 1f55742cd67..42377f7cf26 100644
index 1f55742cd67..f20129d9dd8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
@@ -71,7 +72,11 @@ abstract class DisableUnnecessaryBucketedScanSuite

def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = {
val plan = sql(query).queryExecution.executedPlan
- val bucketedScan = collect(plan) { case s: FileSourceScanExec if s.bucketedScan => s }
+ val bucketedScan = collect(plan) {
+ case s: FileSourceScanExec if s.bucketedScan => s
+ case s: CometScanExec if s.bucketedScan => s
+ case s: CometNativeScanExec if s.bucketedScan => s
+ }
assert(bucketedScan.length == expectedNumBucketedScan)
}
Expand Down
Loading
Loading