From 1e93be4183251ef558fc7e4e3a7352db280ff41f Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Mon, 29 Jul 2024 08:56:08 +0200 Subject: [PATCH 1/5] fix: Supported nested types in HashJoin --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 ----- .../test/scala/org/apache/comet/exec/CometJoinSuite.scala | 7 +++++++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f944b2b045..fc58c0f438 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2724,11 +2724,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } - if ((join.leftKeys ++ join.rightKeys).exists(_.dataType.isInstanceOf[StructType])) { - withInfo(join, "Unsupported struct data type in join keys") - return None - } - if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index b2e225b15d..ae26f27706 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -192,6 +192,13 @@ class CometJoinSuite extends CometTestBase { // DataFusion HashJoin LeftAnti has bugs in handling nulls and is disabled for now. // left.join(right, left("_2") === right("_1"), "leftanti") + + // Full join: struct key + val df9 = + sql( + "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)") + checkSparkAnswerAndOperator(df9) } } } From 9b334e0c3b80ce0083239e6fc16ccb7858ebb133 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Wed, 7 Aug 2024 09:29:39 +0200 Subject: [PATCH 2/5] Update diffs ignore new failing specs Needs to be ignored for the same reason as the other specs in the DynamicPartitionPruningSuiteBase. But previously we did not hit the issue as we did not support the join being done. --- dev/diffs/3.4.3.diff | 30 ++++++++++++++++++++---------- dev/diffs/3.5.1.diff | 26 ++++++++++++++++++-------- dev/diffs/4.0.0-preview1.diff | 26 ++++++++++++++++++-------- 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 3c88d6a7ee..c2a7488972 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -352,7 +352,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -423,7 +423,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -433,7 +443,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -443,7 +453,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -452,7 +462,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -462,7 +472,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -472,7 +482,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -482,7 +492,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -2430,7 +2440,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..293e9dc2986 100644 +index dd55fcfe42c..e7fcd0a9e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -2579,7 +2589,7 @@ index 1966e1e64fd..cde97a0aafe 100644 spark.sql( """ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 07361cfdce9..25b0dc3ef7e 100644 +index 07361cfdce9..6673c141c9a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -55,25 +55,53 @@ object TestHive diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index 19eda87497..cb8d409c5b 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -331,7 +331,7 @@ index c2fe31520ac..0f54b233d14 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -402,7 +402,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -412,7 +422,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -422,7 +432,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -431,7 +441,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -441,7 +451,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -451,7 +461,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -461,7 +471,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 69016bc38e..f876f92eee 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -415,7 +415,7 @@ index 16a493b5290..3f0b70e2d59 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..d46dc5e138a 100644 +index 2c24cc7d570..e7ea9b4a824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -486,7 +486,17 @@ index 2c24cc7d570..d46dc5e138a 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -496,7 +506,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1471,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1471,7 +1482,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -506,7 +516,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1486,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1486,7 +1498,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -515,7 +525,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1558,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1558,7 +1570,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -525,7 +535,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1589,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1589,7 +1602,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -535,7 +545,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1618,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1618,7 +1632,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -545,7 +555,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1730,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1745,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) From bd3e92db29ef806c9dfc02d04cfff47d69a15c25 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 8 Aug 2024 21:42:35 +0200 Subject: [PATCH 3/5] Improve type support check --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index fc58c0f438..22dee1a329 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2724,6 +2724,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } + val keyTypes = (join.leftKeys ++ join.rightKeys).map(_.dataType) + if (keyTypes.exists(!supportedDataType(_, allowStruct = true))) { + withInfo(join, "Unsupported data type in join keys") + return None + } + if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None From c1ab4f4035187400ab0b98f11cc62f6633049e18 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 8 Aug 2024 19:55:51 +0200 Subject: [PATCH 4/5] Improve tests --- .../apache/comet/exec/CometJoinSuite.scala | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index ae26f27706..fbd593f5a8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.Decimal import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus @@ -192,13 +193,66 @@ class CometJoinSuite extends CometTestBase { // DataFusion HashJoin LeftAnti has bugs in handling nulls and is disabled for now. // left.join(right, left("_2") === right("_1"), "leftanti") + } + } + } + } + + test("HashJoin struct key") { + withSQLConf( + "spark.sql.join.forceApplyShuffledHashJoin" -> "true", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + def manyTypes(idx: Int, v: Int) = + ( + idx, + v, + v.toLong, + v.toFloat, + v.toDouble, + v.toString, + v % 2 == 0, + v.toString().getBytes, + Decimal(v)) + + withParquetTable((0 until 10).map(i => manyTypes(i, i % 5)), "tbl_a") { + withParquetTable((0 until 10).map(i => manyTypes(i, i % 10)), "tbl_b") { // Full join: struct key - val df9 = + val df1 = sql( "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + "ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)") - checkSparkAnswerAndOperator(df9) + checkSparkAnswerAndOperator(df1) + + // Full join: struct key with nulls + val df2 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON IF(tbl_a._1 > 5, named_struct('2', tbl_a._2), NULL) = IF(tbl_b._2 > 5, named_struct('2', tbl_b._1), NULL)") + checkSparkAnswerAndOperator(df2) + + // Full join: struct key with nulls in the struct + val df3 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('2', IF(tbl_a._1 > 5, tbl_a._2, NULL)) = named_struct('2', IF(tbl_b._2 > 5, tbl_b._1, NULL))") + checkSparkAnswerAndOperator(df3) + + // Full join: nested structs + val df4 = + sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + "ON named_struct('1', named_struct('2', tbl_a._2)) = named_struct('1', named_struct('2', tbl_b._1))") + checkSparkAnswerAndOperator(df4) + + val columnCount = manyTypes(0, 0).productArity + def key(tbl: String) = + (1 to columnCount).map(i => s"${tbl}._$i").mkString("struct(", ", ", ")") + // Using several different types in the struct key + val df5 = + sql( + "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + + s"ON ${key("tbl_a")} = ${key("tbl_b")}") + checkSparkAnswerAndOperator(df5) } } } From c4f2eee2306b200f880e2751bdb2907ae610796a Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Tue, 13 Aug 2024 09:17:18 +0200 Subject: [PATCH 5/5] Remove unneeded supportedDataType guard --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 22dee1a329..fc58c0f438 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2724,12 +2724,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } - val keyTypes = (join.leftKeys ++ join.rightKeys).map(_.dataType) - if (keyTypes.exists(!supportedDataType(_, allowStruct = true))) { - withInfo(join, "Unsupported data type in join keys") - return None - } - if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None