diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index f77d4f21ac..3f04dd43b8 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..b7a5fd72f7d 100644 +index f33432ddb6f..914afa6b01d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -458,17 +458,7 @@ index f33432ddb6f..b7a5fd72f7d 100644 case _ => Nil } } -@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - } - -- test("avoid reordering broadcast join keys to match input hash partitioning") { -+ test("avoid reordering broadcast join keys to match input hash partitioning", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withTable("large", "dimTwo", "dimThree") { -@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase val plan = df.queryExecution.executedPlan val countSubqueryBroadcasts = @@ -487,27 +477,27 @@ index f33432ddb6f..b7a5fd72f7d 100644 assert(countSubqueryBroadcasts == 1) assert(countReusedSubqueryBroadcasts == 1) -@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("SPARK-34637: DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { case s: SubqueryBroadcastExec => s @@ -515,7 +505,7 @@ index f33432ddb6f..b7a5fd72f7d 100644 } assert(subqueryBroadcastExecs.size === 1) subqueryBroadcastExecs.foreach { subqueryBroadcastExec => -@@ -1729,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index e4af1fd202..00a6963dd9 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -398,7 +398,7 @@ index c4fb4fa943c..a04b23870a8 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..b7a5fd72f7d 100644 +index f33432ddb6f..914afa6b01d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -439,17 +439,7 @@ index f33432ddb6f..b7a5fd72f7d 100644 case _ => Nil } } -@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - } - -- test("avoid reordering broadcast join keys to match input hash partitioning") { -+ test("avoid reordering broadcast join keys to match input hash partitioning", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withTable("large", "dimTwo", "dimThree") { -@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase val plan = df.queryExecution.executedPlan val countSubqueryBroadcasts = @@ -468,27 +458,27 @@ index f33432ddb6f..b7a5fd72f7d 100644 assert(countSubqueryBroadcasts == 1) assert(countReusedSubqueryBroadcasts == 1) -@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("SPARK-34637: DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { case s: SubqueryBroadcastExec => s @@ -496,7 +486,7 @@ index f33432ddb6f..b7a5fd72f7d 100644 } assert(subqueryBroadcastExecs.size === 1) subqueryBroadcastExecs.foreach { subqueryBroadcastExec => -@@ -1729,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 700cb148b2..919b7d51a8 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -589,7 +589,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..753737a1057 100644 +index 2c24cc7d570..5a1fe7017c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -630,17 +630,7 @@ index 2c24cc7d570..753737a1057 100644 case _ => Nil } } -@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - } - -- test("avoid reordering broadcast join keys to match input hash partitioning") { -+ test("avoid reordering broadcast join keys to match input hash partitioning", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withTable("large", "dimTwo", "dimThree") { -@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase val plan = df.queryExecution.executedPlan val countSubqueryBroadcasts = @@ -659,17 +649,17 @@ index 2c24cc7d570..753737a1057 100644 assert(countSubqueryBroadcasts == 1) assert(countReusedSubqueryBroadcasts == 1) -@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1330,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("Subquery reuse across the whole plan", @@ -677,17 +667,17 @@ index 2c24cc7d570..753737a1057 100644 DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", -@@ -1424,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("SPARK-34637: DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly", -+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1578,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) { case s: SubqueryBroadcastExec => s @@ -695,7 +685,7 @@ index 2c24cc7d570..753737a1057 100644 } assert(subqueryBroadcastExecs.size === 1) subqueryBroadcastExecs.foreach { subqueryBroadcastExec => -@@ -1730,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -1319,24 +1309,23 @@ index 0df7f806272..92390bd819f 100644 test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -index 2e33f6505ab..3a8b154b565 100644 +index 2e33f6505ab..949fdea0003 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -@@ -23,11 +23,13 @@ import org.apache.spark.SparkRuntimeException +@@ -23,10 +23,12 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union} +import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.FileScanRDD -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} -+import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession - @@ -1529,6 +1531,18 @@ class SubquerySuite extends QueryTest fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall(