diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 4fd1d3ca48..0205888433 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1150,7 +1150,7 @@ index 02990a7a40d..bddf5e1ccc2 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..c6fcfd7bd08 100644 +index cfc8b2cc845..c4be7eb3731 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer @@ -2385,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 266bb343526..e58a2f49eb9 100644 +index 266bb343526..f8ad838e2b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -19,15 +19,18 @@ package org.apache.spark.sql.sources @@ -2409,7 +2409,7 @@ index 266bb343526..e58a2f49eb9 100644 import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -101,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -2419,6 +2419,7 @@ index 266bb343526..e58a2f49eb9 100644 + val fileScan = collect(plan) { + case f: FileSourceScanExec => f + case f: CometScanExec => f ++ case f: CometNativeScanExec => f + } assert(fileScan.nonEmpty, plan) fileScan.head @@ -2427,12 +2428,13 @@ index 266bb343526..e58a2f49eb9 100644 + private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match { + case fs: FileSourceScanExec => fs.bucketedScan + case bs: CometScanExec => bs.bucketedScan ++ case ns: CometNativeScanExec => ns.bucketedScan + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -155,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -2442,7 +2444,7 @@ index 266bb343526..e58a2f49eb9 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -451,28 +465,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -2505,7 +2507,7 @@ index 266bb343526..e58a2f49eb9 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -835,11 +875,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -2519,7 +2521,7 @@ index 266bb343526..e58a2f49eb9 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -894,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { @@ -2530,7 +2532,7 @@ index 266bb343526..e58a2f49eb9 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) -@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -913,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") { @@ -2541,7 +2543,7 @@ index 266bb343526..e58a2f49eb9 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "9", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { -@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -943,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("bucket coalescing eliminates shuffle") { @@ -2552,7 +2554,7 @@ index 266bb343526..e58a2f49eb9 100644 SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. -@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1026,15 +1075,26 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti expectedNumShuffles: Int, expectedCoalescedNumBuckets: Option[Int]): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -2565,6 +2567,7 @@ index 266bb343526..e58a2f49eb9 100644 val scans = plan.collect { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b ++ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -2574,6 +2577,8 @@ index 266bb343526..e58a2f49eb9 100644 + assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + case b: CometScanExec => + assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) ++ case b: CometNativeScanExec => ++ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } } else { assert(scans.isEmpty) @@ -2604,18 +2609,18 @@ index b5f6d2f9f68..277784a92af 100644 protected override lazy val sql = spark.sql _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -index 1f55742cd67..42377f7cf26 100644 +index 1f55742cd67..f20129d9dd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite +@@ -71,7 +72,11 @@ abstract class DisableUnnecessaryBucketedScanSuite def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -2623,6 +2628,7 @@ index 1f55742cd67..42377f7cf26 100644 + val bucketedScan = collect(plan) { + case s: FileSourceScanExec if s.bucketedScan => s + case s: CometScanExec if s.bucketedScan => s ++ case s: CometNativeScanExec if s.bucketedScan => s + } assert(bucketedScan.length == expectedNumBucketedScan) } diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 72c41e4f82..beef445490 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2587,7 +2587,7 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 746f289c393..5b9e31c1fa6 100644 +index 746f289c393..7a6a88a9fce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources @@ -2612,7 +2612,7 @@ index 746f289c393..5b9e31c1fa6 100644 import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -2622,6 +2622,7 @@ index 746f289c393..5b9e31c1fa6 100644 + val fileScan = collect(plan) { + case f: FileSourceScanExec => f + case f: CometScanExec => f ++ case f: CometNativeScanExec => f + } assert(fileScan.nonEmpty, plan) fileScan.head @@ -2630,12 +2631,13 @@ index 746f289c393..5b9e31c1fa6 100644 + private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match { + case fs: FileSourceScanExec => fs.bucketedScan + case bs: CometScanExec => bs.bucketedScan ++ case ns: CometNativeScanExec => ns.bucketedScan + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -2645,7 +2647,7 @@ index 746f289c393..5b9e31c1fa6 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -2708,14 +2710,7 @@ index 746f289c393..5b9e31c1fa6 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti - } - } - -- test("disable bucketing when the output doesn't contain all bucketing columns") { -+ test("disable bucketing when the output doesn't contain all bucketing columns", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("bucketed_table") { +@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -2729,7 +2724,7 @@ index 746f289c393..5b9e31c1fa6 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { @@ -2740,7 +2735,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) -@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") { @@ -2751,7 +2746,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "9", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { -@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("bucket coalescing eliminates shuffle") { @@ -2762,17 +2757,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. -@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti - } - } - -- test("bucket coalescing is applied when join expressions match with partitioning expressions") { -+ test("bucket coalescing is applied when join expressions match with partitioning expressions", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") - df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") -@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan @@ -2783,6 +2768,7 @@ index 746f289c393..5b9e31c1fa6 100644 val scans = collect(plan) { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b ++ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -2792,6 +2778,8 @@ index 746f289c393..5b9e31c1fa6 100644 + assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + case b: CometScanExec => + assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) ++ case b: CometNativeScanExec => ++ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } } else { assert(scans.isEmpty) @@ -2821,20 +2809,18 @@ index 6f897a9c0b7..b0723634f68 100644 protected override lazy val sql = spark.sql _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -index d675503a8ba..c386a8cb686 100644 +index d675503a8ba..f220892396e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -@@ -17,7 +17,8 @@ - +@@ -18,6 +18,7 @@ package org.apache.spark.sql.sources --import org.apache.spark.sql.QueryTest -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest} -+import org.apache.spark.sql.comet.CometScanExec + import org.apache.spark.sql.QueryTest ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite +@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -2842,60 +2828,11 @@ index d675503a8ba..c386a8cb686 100644 + val bucketedScan = collect(plan) { + case s: FileSourceScanExec if s.bucketedScan => s + case s: CometScanExec if s.bucketedScan => s ++ case s: CometNativeScanExec if s.bucketedScan => s + } assert(bucketedScan.length == expectedNumBucketedScan) } -@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - basic test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - basic test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2") -@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - other operators test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - other operators test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") { -+ test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1") { - withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { - sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 7f6fa2a123e..c778b4e2c48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 7faed2825f..d6694e827f 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1436,7 +1436,7 @@ index 3eeed2e4175..9f21d547c1c 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index 2a0ab21ddb0..e8a5a891105 100644 +index 2a0ab21ddb0..6030e7c2b9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer @@ -3080,7 +3080,7 @@ index 7838e62013d..8fa09652921 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index c4b09c4b289..dd5763e8405 100644 +index c4b09c4b289..75c3437788e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.expressions @@ -3097,7 +3097,7 @@ index c4b09c4b289..dd5763e8405 100644 import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -103,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -103,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -3107,6 +3107,7 @@ index c4b09c4b289..dd5763e8405 100644 + val fileScan = collect(plan) { + case f: FileSourceScanExec => f + case f: CometScanExec => f ++ case f: CometNativeScanExec => f + } assert(fileScan.nonEmpty, plan) fileScan.head @@ -3115,12 +3116,13 @@ index c4b09c4b289..dd5763e8405 100644 + private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match { + case fs: FileSourceScanExec => fs.bucketedScan + case bs: CometScanExec => bs.bucketedScan ++ case ns: CometNativeScanExec => ns.bucketedScan + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -157,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -157,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -3130,7 +3132,7 @@ index c4b09c4b289..dd5763e8405 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -454,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -454,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val joinOperator = if (joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -3193,7 +3195,7 @@ index c4b09c4b289..dd5763e8405 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -838,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -838,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -3207,7 +3209,7 @@ index c4b09c4b289..dd5763e8405 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -1031,15 +1067,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1031,15 +1069,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan @@ -3218,6 +3220,7 @@ index c4b09c4b289..dd5763e8405 100644 val scans = collect(plan) { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b ++ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -3227,6 +3230,8 @@ index c4b09c4b289..dd5763e8405 100644 + assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + case b: CometScanExec => + assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) ++ case b: CometNativeScanExec => ++ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } } else { assert(scans.isEmpty) @@ -3256,18 +3261,18 @@ index 95c2fcbd7b5..e2d4a20c5d9 100644 protected override lazy val sql = spark.sql _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -index c5c56f081d8..197cd241f48 100644 +index c5c56f081d8..6cc51f93b4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.QueryTest -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite +@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -3275,6 +3280,7 @@ index c5c56f081d8..197cd241f48 100644 + val bucketedScan = collect(plan) { + case s: FileSourceScanExec if s.bucketedScan => s + case s: CometScanExec if s.bucketedScan => s ++ case s: CometNativeScanExec if s.bucketedScan => s + } assert(bucketedScan.length == expectedNumBucketedScan) }