Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..b7a5fd72f7d 100644
index f33432ddb6f..914afa6b01d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -458,17 +458,7 @@ index f33432ddb6f..b7a5fd72f7d 100644
case _ => Nil
}
}
@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("avoid reordering broadcast join keys to match input hash partitioning") {
+ test("avoid reordering broadcast join keys to match input hash partitioning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
Expand All @@ -487,35 +477,35 @@ index f33432ddb6f..b7a5fd72f7d 100644

assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1729,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down
26 changes: 8 additions & 18 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ index c4fb4fa943c..a04b23870a8 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..b7a5fd72f7d 100644
index f33432ddb6f..914afa6b01d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -439,17 +439,7 @@ index f33432ddb6f..b7a5fd72f7d 100644
case _ => Nil
}
}
@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("avoid reordering broadcast join keys to match input hash partitioning") {
+ test("avoid reordering broadcast join keys to match input hash partitioning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
Expand All @@ -468,35 +458,35 @@ index f33432ddb6f..b7a5fd72f7d 100644

assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1729,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down
35 changes: 12 additions & 23 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ index 81713c777bc..b5f92ed9742 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 2c24cc7d570..753737a1057 100644
index 2c24cc7d570..5a1fe7017c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -630,17 +630,7 @@ index 2c24cc7d570..753737a1057 100644
case _ => Nil
}
}
@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("avoid reordering broadcast join keys to match input hash partitioning") {
+ test("avoid reordering broadcast join keys to match input hash partitioning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
Expand All @@ -659,43 +649,43 @@ index 2c24cc7d570..753737a1057 100644

assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1330,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("Subquery reuse across the whole plan",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -1424,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) {
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1578,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1730,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -1319,24 +1309,23 @@ index 0df7f806272..92390bd819f 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 2e33f6505ab..3a8b154b565 100644
index 2e33f6505ab..949fdea0003 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,11 +23,13 @@ import org.apache.spark.SparkRuntimeException
@@ -23,10 +23,12 @@ import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union}
+import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

@@ -1529,6 +1531,18 @@ class SubquerySuite extends QueryTest
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down
Loading