diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 3c88d6a7ee..c2a7488972 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -352,7 +352,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -423,7 +423,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -433,7 +443,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -443,7 +453,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -452,7 +462,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -462,7 +472,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -472,7 +482,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -482,7 +492,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -2430,7 +2440,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..293e9dc2986 100644 +index dd55fcfe42c..e7fcd0a9e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -2579,7 +2589,7 @@ index 1966e1e64fd..cde97a0aafe 100644 spark.sql( """ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 07361cfdce9..25b0dc3ef7e 100644 +index 07361cfdce9..6673c141c9a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -55,25 +55,53 @@ object TestHive diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index 19eda87497..cb8d409c5b 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -331,7 +331,7 @@ index c2fe31520ac..0f54b233d14 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -402,7 +402,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -412,7 +422,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -422,7 +432,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -431,7 +441,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -441,7 +451,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -451,7 +461,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -461,7 +471,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 69016bc38e..f876f92eee 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -415,7 +415,7 @@ index 16a493b5290..3f0b70e2d59 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..d46dc5e138a 100644 +index 2c24cc7d570..e7ea9b4a824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -486,7 +486,17 @@ index 2c24cc7d570..d46dc5e138a 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -496,7 +506,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1471,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1471,7 +1482,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -506,7 +516,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1486,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1486,7 +1498,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -515,7 +525,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1558,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1558,7 +1570,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -525,7 +535,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1589,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1589,7 +1602,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -535,7 +545,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1618,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1618,7 +1632,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -545,7 +555,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1730,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1745,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f944b2b045..fc58c0f438 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2724,11 +2724,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } - if ((join.leftKeys ++ join.rightKeys).exists(_.dataType.isInstanceOf[StructType])) { - withInfo(join, "Unsupported struct data type in join keys") - return None - } - if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index b2e225b15d..fbd593f5a8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.Decimal import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus @@ -197,6 +198,66 @@ class CometJoinSuite extends CometTestBase { } } + test("HashJoin struct key") { + withSQLConf( + "spark.sql.join.forceApplyShuffledHashJoin" -> "true", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + + def manyTypes(idx: Int, v: Int) = + ( + idx, + v, + v.toLong, + v.toFloat, + v.toDouble, + v.toString, + v % 2 == 0, + v.toString().getBytes, + Decimal(v)) + + withParquetTable((0 until 10).map(i => manyTypes(i, i % 5)), "tbl_a") { + withParquetTable((0 until 10).map(i => manyTypes(i, i % 10)), "tbl_b") { + // Full join: struct key + val df1 = + sql( + "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)") + checkSparkAnswerAndOperator(df1) + + // Full join: struct key with nulls + val df2 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON IF(tbl_a._1 > 5, named_struct('2', tbl_a._2), NULL) = IF(tbl_b._2 > 5, named_struct('2', tbl_b._1), NULL)") + checkSparkAnswerAndOperator(df2) + + // Full join: struct key with nulls in the struct + val df3 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('2', IF(tbl_a._1 > 5, tbl_a._2, NULL)) = named_struct('2', IF(tbl_b._2 > 5, tbl_b._1, NULL))") + checkSparkAnswerAndOperator(df3) + + // Full join: nested structs + val df4 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('1', named_struct('2', tbl_a._2)) = named_struct('1', named_struct('2', tbl_b._1))") + checkSparkAnswerAndOperator(df4) + + val columnCount = manyTypes(0, 0).productArity + def key(tbl: String) = + (1 to columnCount).map(i => s"${tbl}._$i").mkString("struct(", ", ", ")") + // Using several different types in the struct key + val df5 = + sql( + "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + s"ON ${key("tbl_a")} = ${key("tbl_b")}") + checkSparkAnswerAndOperator(df5) + } + } + } + } + test("HashJoin with join filter") { withSQLConf( SQLConf.PREFER_SORTMERGEJOIN.key -> "false",