Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..9cf7a9dd4e3 100644
index f33432ddb6f..10702dba7a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -423,7 +423,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644
Given("dynamic pruning filter on the build side")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") {
+ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode =>
Seq(true, false).foreach { pruning =>
withSQLConf(
@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -433,7 +443,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase
checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
}

Expand All @@ -443,7 +453,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " +
Expand All @@ -452,7 +462,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq(
"f.store_id = 1" -> false,
@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -462,7 +472,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withTable("duplicate_keys") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US"))
@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -472,7 +482,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -482,7 +492,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -2430,7 +2440,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..293e9dc2986 100644
index dd55fcfe42c..e7fcd0a9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down Expand Up @@ -2579,7 +2589,7 @@ index 1966e1e64fd..cde97a0aafe 100644
spark.sql(
"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..25b0dc3ef7e 100644
index 07361cfdce9..6673c141c9a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,53 @@ object TestHive
Expand Down
26 changes: 18 additions & 8 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ index c2fe31520ac..0f54b233d14 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..9cf7a9dd4e3 100644
index f33432ddb6f..10702dba7a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -402,7 +402,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644
Given("dynamic pruning filter on the build side")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") {
+ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode =>
Seq(true, false).foreach { pruning =>
withSQLConf(
@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -412,7 +422,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase
checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
}

Expand All @@ -422,7 +432,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " +
Expand All @@ -431,7 +441,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq(
"f.store_id = 1" -> false,
@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -441,7 +451,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withTable("duplicate_keys") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US"))
@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -451,7 +461,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -461,7 +471,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down
26 changes: 18 additions & 8 deletions dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ index 16a493b5290..3f0b70e2d59 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 2c24cc7d570..d46dc5e138a 100644
index 2c24cc7d570..e7ea9b4a824 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -486,7 +486,17 @@ index 2c24cc7d570..d46dc5e138a 100644
Given("dynamic pruning filter on the build side")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") {
+ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode =>
Seq(true, false).foreach { pruning =>
withSQLConf(
@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -496,7 +506,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@@ -1471,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1471,7 +1482,8 @@ abstract class DynamicPartitionPruningSuiteBase
checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
}

Expand All @@ -506,7 +516,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1486,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1486,7 +1498,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " +
Expand All @@ -515,7 +525,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq(
"f.store_id = 1" -> false,
@@ -1558,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1558,7 +1570,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -525,7 +535,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withTable("duplicate_keys") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US"))
@@ -1589,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1589,7 +1602,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -535,7 +545,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1618,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1618,7 +1632,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -545,7 +555,7 @@ index 2c24cc7d570..d46dc5e138a 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
@@ -1730,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1730,6 +1745,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2724,11 +2724,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
return None
}

if ((join.leftKeys ++ join.rightKeys).exists(_.dataType.isInstanceOf[StructType])) {
withInfo(join, "Unsupported struct data type in join keys")
return None
}
Comment thread
eejbyfeldt marked this conversation as resolved.

if (join.buildSide == BuildRight && join.joinType == LeftAnti) {
withInfo(join, "BuildRight with LeftAnti is not supported")
return None
Expand Down
61 changes: 61 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.Tag
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.Decimal

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
Expand Down Expand Up @@ -197,6 +198,66 @@ class CometJoinSuite extends CometTestBase {
}
}

test("HashJoin struct key") {
withSQLConf(
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {

def manyTypes(idx: Int, v: Int) =
(
idx,
v,
v.toLong,
v.toFloat,
v.toDouble,
v.toString,
v % 2 == 0,
v.toString().getBytes,
Decimal(v))

withParquetTable((0 until 10).map(i => manyTypes(i, i % 5)), "tbl_a") {
withParquetTable((0 until 10).map(i => manyTypes(i, i % 10)), "tbl_b") {
// Full join: struct key
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test. I wonder if we should make this more comprehensive to cover structs containing different types, nulls, and nested structs?

Also, what happens with structs containing unsupported types such as array and map? Do we still fall back for those? It would be good to have a test for this case as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call will add more test cases. Created #797 to make it easier to create nulls of struct type.

Also, what happens with structs containing unsupported types such as array and map?

Do you mean unsupported by comet here? I think the answer for map is that spark does not support joining on map so it will fail during the Spark planning stage. I will add a test for array by I guess that requires something like #793 first since we do not support reading arrays from parquet. Or is there some other way testing with arrays?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about map not being supported by Spark.

I think we should fall back for array for now because we don't really support array yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do fallback currently. Is there someway or even desired to add a test for that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could improve our test framework to make it easier to test for fallback but it is possible with code like this (must be used after calling collect on a DataFrame). I think we can improve the tests in a future PR.

 val str =
          new ExtendedExplainInfo().generateExtendedInfo(df.queryExecution.executedPlan)
        assert(str.contains(expectedMessage))

val df1 =
sql(
"SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " +
"ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)")
checkSparkAnswerAndOperator(df1)

// Full join: struct key with nulls
val df2 =
sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " +
"ON IF(tbl_a._1 > 5, named_struct('2', tbl_a._2), NULL) = IF(tbl_b._2 > 5, named_struct('2', tbl_b._1), NULL)")
checkSparkAnswerAndOperator(df2)

// Full join: struct key with nulls in the struct
val df3 =
sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " +
"ON named_struct('2', IF(tbl_a._1 > 5, tbl_a._2, NULL)) = named_struct('2', IF(tbl_b._2 > 5, tbl_b._1, NULL))")
checkSparkAnswerAndOperator(df3)

// Full join: nested structs
val df4 =
sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " +
"ON named_struct('1', named_struct('2', tbl_a._2)) = named_struct('1', named_struct('2', tbl_b._1))")
checkSparkAnswerAndOperator(df4)

val columnCount = manyTypes(0, 0).productArity
def key(tbl: String) =
(1 to columnCount).map(i => s"${tbl}._$i").mkString("struct(", ", ", ")")
// Using several different types in the struct key
val df5 =
sql(
"SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " +
s"ON ${key("tbl_a")} = ${key("tbl_b")}")
checkSparkAnswerAndOperator(df5)
}
}
}
}

test("HashJoin with join filter") {
withSQLConf(
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
Expand Down