From c090dbf68a68027b29de734c8556e01401c9013c Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 14 Jun 2023 14:55:42 -0700 Subject: [PATCH 1/8] [SPARK-44060][SQL] Code-gen for build side outer shuffled hash join ### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) ### Why are the changes needed? The implementation of https://github.com/apache/spark/pull/41398 was only for non-codegen version, and codegen was disabled in this scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test in WholeStageCodegenSuite --- .../apache/spark/sql/internal/SQLConf.scala | 9 ++ .../joins/ShuffledHashJoinExec.scala | 40 +++-- .../org/apache/spark/sql/JoinSuite.scala | 140 +++++++++--------- .../execution/WholeStageCodegenSuite.scala | 88 +++++++++++ 4 files changed, 200 insertions(+), 77 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d60f5d170e709..270508139e49b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2182,6 +2182,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN = + buildConf("spark.sql.codegen.join.buildSideOuterShuffledHashJoin.enabled") + .internal() + .doc("When true, enable code-gen for an OUTER shuffled hash join where outer side" + + " is the build side.") + .version("3.5.0") + .booleanConf + .createWithDefault(true) + val ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN = buildConf("spark.sql.codegen.join.fullOuterSortMergeJoin.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 8953bf19f35ca..40237d15933ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -340,8 +340,10 @@ case class ShuffledHashJoinExec( override def supportCodegen: Boolean = joinType match { case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) - case LeftOuter if buildSide == BuildLeft => false - case RightOuter if buildSide == BuildRight => false + case LeftOuter if buildSide == BuildLeft => + conf.getConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) + case RightOuter if buildSide == BuildRight => + conf.getConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) case _ => true } @@ -364,7 +366,13 @@ case class ShuffledHashJoinExec( override def doProduce(ctx: CodegenContext): String = { // Specialize `doProduce` code for full outer join, because full outer join needs to // iterate streamed and build side separately. - if (joinType != FullOuter) { + val specializedProduce = joinType match { + case FullOuter => true + case LeftOuter if buildSide == BuildLeft => true + case RightOuter if buildSide == BuildRight => true + case _ => false + } + if (!specializedProduce) { return super.doProduce(ctx) } @@ -416,12 +424,15 @@ case class ShuffledHashJoinExec( |} """.stripMargin) + val isFullOuterJoin = joinType == FullOuter val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, - streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow) + streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, + isFullOuterJoin) val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, - streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow) + streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, + isFullOuterJoin) s""" |if ($keyIsUnique) { @@ -445,7 +456,8 @@ case class ShuffledHashJoinExec( streamedKeyValue: ExprValue, relationTerm: String, conditionCheck: String, - consumeFullOuterJoinRow: String): String = { + consumeFullOuterJoinRow: String, + isFullOuterJoin: Boolean): String = { // Inline mutable state since not many join operations in a task val matchedKeySetClsName = classOf[BitSet].getName val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet", @@ -484,7 +496,14 @@ case class ShuffledHashJoinExec( | } | } | - | $consumeFullOuterJoinRow(); + | if ($foundMatch) { + | $consumeFullOuterJoinRow(); + | } else { + | if ($isFullOuterJoin) { + | $consumeFullOuterJoinRow(); + | } + | } + | | if (shouldStop()) return; |} """.stripMargin @@ -526,7 +545,8 @@ case class ShuffledHashJoinExec( streamedKeyValue: ExprValue, relationTerm: String, conditionCheck: String, - consumeFullOuterJoinRow: String): String = { + consumeFullOuterJoinRow: String, + isFullOuterJoin: Boolean): String = { // Inline mutable state since not many join operations in a task val matchedRowSetClsName = classOf[OpenHashSet[_]].getName val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet", @@ -578,7 +598,9 @@ case class ShuffledHashJoinExec( | | if (!$foundMatch) { | $buildRow = null; - | $consumeFullOuterJoinRow(); + | if ($isFullOuterJoin) { + | $consumeFullOuterJoinRow(); + | } | } | | if (shouldStop()) return; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 4d0fd2e65134b..b11b1ef08e721 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1315,78 +1315,82 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-36612: Support left outer join build left or right outer join build right in " + "shuffled hash join") { - val inputDFs = Seq( - // Test unique join key - (spark.range(10).selectExpr("id as k1"), - spark.range(30).selectExpr("id as k2"), - $"k1" === $"k2"), - // Test non-unique join key - (spark.range(10).selectExpr("id % 5 as k1"), - spark.range(30).selectExpr("id % 5 as k2"), - $"k1" === $"k2"), - // Test empty build side - (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), - spark.range(30).selectExpr("id as k2"), - $"k1" === $"k2"), - // Test empty stream side - (spark.range(10).selectExpr("id as k1"), - spark.range(30).selectExpr("id as k2").filter("k2 < -1"), - $"k1" === $"k2"), - // Test empty build and stream side - (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), - spark.range(30).selectExpr("id as k2").filter("k2 < -1"), - $"k1" === $"k2"), - // Test string join key - (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), - spark.range(30).selectExpr("cast(id as string) as k2"), - $"k1" === $"k2"), - // Test build side at right - (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), - spark.range(10).selectExpr("cast(id as string) as k2"), - $"k1" === $"k2"), - // Test NULL join key - (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), - spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), - $"k1" === $"k2"), - (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), - spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), - $"k1" === $"k2"), - // Test multiple join keys - (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( - "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), - spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( - "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), - $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") - ) + withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") { + val inputDFs = Seq( + // Test unique join key + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test non-unique join key + (spark.range(10).selectExpr("id % 5 as k1"), + spark.range(30).selectExpr("id % 5 as k2"), + $"k1" === $"k2"), + // Test empty build side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test empty stream side + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test empty build and stream side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test string join key + (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), + spark.range(30).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test build side at right + (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), + spark.range(10).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test NULL join key + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + // Test multiple join keys + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( + "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), + spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( + "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), + $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") + ) - // test left outer with left side build - inputDFs.foreach { case (df1, df2, joinExprs) => - val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") - assert(collect(smjDF.queryExecution.executedPlan) { - case _: SortMergeJoinExec => true }.size === 1) - val smjResult = smjDF.collect() + // test left outer with left side build + inputDFs.foreach { case (df1, df2, joinExprs) => + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() - val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") - assert(collect(shjDF.queryExecution.executedPlan) { - case _: ShuffledHashJoinExec => true - }.size === 1) - // Same result between shuffled hash join and sort merge join - checkAnswer(shjDF, smjResult) - } + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } - // test right outer with right side build - inputDFs.foreach { case (df2, df1, joinExprs) => - val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") - assert(collect(smjDF.queryExecution.executedPlan) { - case _: SortMergeJoinExec => true }.size === 1) - val smjResult = smjDF.collect() + // test right outer with right side build + inputDFs.foreach { case (df2, df1, joinExprs) => + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() - val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") - assert(collect(shjDF.queryExecution.executedPlan) { - case _: ShuffledHashJoinExec => true - }.size === 1) - // Same result between shuffled hash join and sort merge join - checkAnswer(shjDF, smjResult) + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index ac710c3229647..ceee6109a4d48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -232,6 +232,94 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } } + + test("Build-side Outer ShuffledHashJoin and SortMergeJoin should be included " + + "in WholeStageCodegen") { + val df1 = spark.range(0, 5).select($"id".as("k1")) + val df2 = spark.range(1, 11).select($"id".as("k2")) + val df3 = spark.range(2, 5).select($"id".as("k3")) + + Seq("SHUFFLE_HASH", "SHUFFLE_MERGE").foreach { hint => + // test right join with unique key from build side + val rightJoinUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") + assert(rightJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinUniqueDf, Seq(Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), + Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9), + Row(null, 10))) + assert(rightJoinUniqueDf.count() === 10) + + // test left join with unique key from build side + val leftJoinUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer") + assert(leftJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinUniqueDf, Seq(Row(0, null), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) + assert(leftJoinUniqueDf.count() === 5) + + // test right join with non-unique key from build side + val rightJoinNonUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2" % 3, "right_outer") + assert(rightJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8))) + + // test left join with non-unique key from build side + val leftJoinNonUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2" % 3, "left_outer") + assert(leftJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), + Row(4, null))) + + // test right join with non-equi condition + val rightJoinWithNonEquiDf = df1.join(df2.hint(hint), + $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "right_outer") + assert(rightJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(null, 3), Row(null, 4), Row(null, 5))) + + // test left join with non-equi condition + val leftJoinWithNonEquiDf = df1.hint(hint).join(df2, + $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "left_outer") + assert(leftJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null))) + + // test two right joins + val twoRightJoinsDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") + .join(df3.hint(hint), $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "right_outer") + assert(twoRightJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoRightJoinsDf, Seq(Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + + // test two left joins + val twoLeftJoinsDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer").hint(hint) + .join(df3, $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "left_outer") + assert(twoLeftJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoLeftJoinsDf, + Seq(Row(0, null, null), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + } + } + test("Left/Right Outer SortMergeJoin should be included in WholeStageCodegen") { val df1 = spark.range(10).select($"id".as("k1")) val df2 = spark.range(4).select($"id".as("k2")) From 769266bfc538e2b5dabe8b5a46f8cc6d3e90726a Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 15 Jun 2023 09:28:43 -0700 Subject: [PATCH 2/8] Small fixups --- .../spark/sql/execution/joins/ShuffledHashJoinExec.scala | 6 +----- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 40237d15933ec..9ddb612769e18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -496,12 +496,8 @@ case class ShuffledHashJoinExec( | } | } | - | if ($foundMatch) { + | if ($foundMatch || $isFullOuterJoin) { | $consumeFullOuterJoinRow(); - | } else { - | if ($isFullOuterJoin) { - | $consumeFullOuterJoinRow(); - | } | } | | if (shouldStop()) return; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index ceee6109a4d48..d11a4a9c0ad35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -233,8 +233,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } - test("Build-side Outer ShuffledHashJoin and SortMergeJoin should be included " + - "in WholeStageCodegen") { + test("SPARK-44060 Code-gen for build side outer shuffled hash join") { val df1 = spark.range(0, 5).select($"id".as("k1")) val df2 = spark.range(1, 11).select($"id".as("k2")) val df3 = spark.range(2, 5).select($"id".as("k3")) From efbae5363b0383e5e5a83f8ec3d54529d0fcdd8e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 15 Jun 2023 18:23:53 -0700 Subject: [PATCH 3/8] Add explicit config in WholeStageCodegenSuite new test --- .../execution/WholeStageCodegenSuite.scala | 158 +++++++++--------- 1 file changed, 80 insertions(+), 78 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index d11a4a9c0ad35..0aaeedd5f06d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -238,84 +238,86 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession val df2 = spark.range(1, 11).select($"id".as("k2")) val df3 = spark.range(2, 5).select($"id".as("k3")) - Seq("SHUFFLE_HASH", "SHUFFLE_MERGE").foreach { hint => - // test right join with unique key from build side - val rightJoinUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") - assert(rightJoinUniqueDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(rightJoinUniqueDf, Seq(Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), - Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9), - Row(null, 10))) - assert(rightJoinUniqueDf.count() === 10) - - // test left join with unique key from build side - val leftJoinUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer") - assert(leftJoinUniqueDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(leftJoinUniqueDf, Seq(Row(0, null), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) - assert(leftJoinUniqueDf.count() === 5) - - // test right join with non-unique key from build side - val rightJoinNonUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2" % 3, "right_outer") - assert(rightJoinNonUniqueDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(rightJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), - Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8))) - - // test left join with non-unique key from build side - val leftJoinNonUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2" % 3, "left_outer") - assert(leftJoinNonUniqueDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(leftJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), - Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), - Row(4, null))) - - // test right join with non-equi condition - val rightJoinWithNonEquiDf = df1.join(df2.hint(hint), - $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "right_outer") - assert(rightJoinWithNonEquiDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(rightJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), - Row(1, 10), Row(2, 2), Row(2, 8), Row(null, 3), Row(null, 4), Row(null, 5))) - - // test left join with non-equi condition - val leftJoinWithNonEquiDf = df1.hint(hint).join(df2, - $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "left_outer") - assert(leftJoinWithNonEquiDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 1) - checkAnswer(leftJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), - Row(1, 10), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null))) - - // test two right joins - val twoRightJoinsDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") - .join(df3.hint(hint), $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "right_outer") - assert(twoRightJoinsDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 2) - checkAnswer(twoRightJoinsDf, Seq(Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) - - // test two left joins - val twoLeftJoinsDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer").hint(hint) - .join(df3, $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "left_outer") - assert(twoLeftJoinsDf.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true - case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true - }.size === 2) - checkAnswer(twoLeftJoinsDf, - Seq(Row(0, null, null), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "true") { + Seq("SHUFFLE_HASH", "SHUFFLE_MERGE").foreach { hint => + // test right join with unique key from build side + val rightJoinUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") + assert(rightJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinUniqueDf, Seq(Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), + Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9), + Row(null, 10))) + assert(rightJoinUniqueDf.count() === 10) + + // test left join with unique key from build side + val leftJoinUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer") + assert(leftJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinUniqueDf, Seq(Row(0, null), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) + assert(leftJoinUniqueDf.count() === 5) + + // test right join with non-unique key from build side + val rightJoinNonUniqueDf = df1.join(df2.hint(hint), $"k1" === $"k2" % 3, "right_outer") + assert(rightJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8))) + + // test left join with non-unique key from build side + val leftJoinNonUniqueDf = df1.hint(hint).join(df2, $"k1" === $"k2" % 3, "left_outer") + assert(leftJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), + Row(4, null))) + + // test right join with non-equi condition + val rightJoinWithNonEquiDf = df1.join(df2.hint(hint), + $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "right_outer") + assert(rightJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(null, 3), Row(null, 4), Row(null, 5))) + + // test left join with non-equi condition + val leftJoinWithNonEquiDf = df1.hint(hint).join(df2, + $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "left_outer") + assert(leftJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null))) + + // test two right joins + val twoRightJoinsDf = df1.join(df2.hint(hint), $"k1" === $"k2", "right_outer") + .join(df3.hint(hint), $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "right_outer") + assert(twoRightJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoRightJoinsDf, Seq(Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + + // test two left joins + val twoLeftJoinsDf = df1.hint(hint).join(df2, $"k1" === $"k2", "left_outer").hint(hint) + .join(df3, $"k1" === $"k3" && $"k1" + $"k3" =!= 2, "left_outer") + assert(twoLeftJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoLeftJoinsDf, + Seq(Row(0, null, null), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + } } } From 216213142d9e6f56d2fc1d9421b0e68f1dd790d4 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 16 Jun 2023 09:43:33 -0700 Subject: [PATCH 4/8] Test both codegen and non-codegen in JoinSuite --- .../org/apache/spark/sql/JoinSuite.scala | 149 +++++++++--------- 1 file changed, 76 insertions(+), 73 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index b11b1ef08e721..7c2ab24661970 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1315,81 +1315,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-36612: Support left outer join build left or right outer join build right in " + "shuffled hash join") { - withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") { - val inputDFs = Seq( - // Test unique join key - (spark.range(10).selectExpr("id as k1"), - spark.range(30).selectExpr("id as k2"), - $"k1" === $"k2"), - // Test non-unique join key - (spark.range(10).selectExpr("id % 5 as k1"), - spark.range(30).selectExpr("id % 5 as k2"), - $"k1" === $"k2"), - // Test empty build side - (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), - spark.range(30).selectExpr("id as k2"), - $"k1" === $"k2"), - // Test empty stream side - (spark.range(10).selectExpr("id as k1"), - spark.range(30).selectExpr("id as k2").filter("k2 < -1"), - $"k1" === $"k2"), - // Test empty build and stream side - (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), - spark.range(30).selectExpr("id as k2").filter("k2 < -1"), - $"k1" === $"k2"), - // Test string join key - (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), - spark.range(30).selectExpr("cast(id as string) as k2"), - $"k1" === $"k2"), - // Test build side at right - (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), - spark.range(10).selectExpr("cast(id as string) as k2"), - $"k1" === $"k2"), - // Test NULL join key - (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), - spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), - $"k1" === $"k2"), - (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), - spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), - $"k1" === $"k2"), - // Test multiple join keys - (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( - "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), - spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( - "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), - $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") - ) - - // test left outer with left side build - inputDFs.foreach { case (df1, df2, joinExprs) => - val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") - assert(collect(smjDF.queryExecution.executedPlan) { - case _: SortMergeJoinExec => true - }.size === 1) - val smjResult = smjDF.collect() - - val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") - assert(collect(shjDF.queryExecution.executedPlan) { - case _: ShuffledHashJoinExec => true - }.size === 1) - // Same result between shuffled hash join and sort merge join - checkAnswer(shjDF, smjResult) - } + Seq("true", "false").foreach{ codegen => + withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> codegen) { + val inputDFs = Seq( + // Test unique join key + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test non-unique join key + (spark.range(10).selectExpr("id % 5 as k1"), + spark.range(30).selectExpr("id % 5 as k2"), + $"k1" === $"k2"), + // Test empty build side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test empty stream side + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test empty build and stream side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test string join key + (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), + spark.range(30).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test build side at right + (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), + spark.range(10).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test NULL join key + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + // Test multiple join keys + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( + "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), + spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( + "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), + $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") + ) - // test right outer with right side build - inputDFs.foreach { case (df2, df1, joinExprs) => - val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") - assert(collect(smjDF.queryExecution.executedPlan) { - case _: SortMergeJoinExec => true - }.size === 1) - val smjResult = smjDF.collect() + // test left outer with left side build + inputDFs.foreach { case (df1, df2, joinExprs) => + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } - val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") - assert(collect(shjDF.queryExecution.executedPlan) { - case _: ShuffledHashJoinExec => true - }.size === 1) - // Same result between shuffled hash join and sort merge join - checkAnswer(shjDF, smjResult) + // test right outer with right side build + inputDFs.foreach { case (df2, df1, joinExprs) => + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } } } } @@ -1442,6 +1444,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } + test("SPARK-34593: Preserve broadcast nested loop join partitioning and ordering") { withTable("t1", "t2", "t3", "t4", "t5") { spark.range(15).toDF("k").write.bucketBy(4, "k").saveAsTable("t1") From 221c1a30f25ab982b08749b1ceda2f4f23cefc19 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 23 Jun 2023 18:59:01 -0700 Subject: [PATCH 5/8] Review comments --- .../joins/ShuffledHashJoinExec.scala | 27 ++++++++++--------- .../org/apache/spark/sql/JoinSuite.scala | 1 - 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 9ddb612769e18..b5fb543fa80f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -364,8 +364,8 @@ case class ShuffledHashJoinExec( } override def doProduce(ctx: CodegenContext): String = { - // Specialize `doProduce` code for full outer join, because full outer join needs to - // iterate streamed and build side separately. + // Specialize `doProduce` code for full outer join and build-side outer join, + // because we need to iterate streamed and build side separately. val specializedProduce = joinType match { case FullOuter => true case LeftOuter if buildSide == BuildLeft => true @@ -425,11 +425,11 @@ case class ShuffledHashJoinExec( """.stripMargin) val isFullOuterJoin = joinType == FullOuter - val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey( + val joinWithUniqueKey = codegenBuildSideOrFullOuterJoinWithUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, isFullOuterJoin) - val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey( + val joinWithNonUniqueKey = codegenBuildSideOrFullOuterJoinNonUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, isFullOuterJoin) @@ -444,10 +444,10 @@ case class ShuffledHashJoinExec( } /** - * Generates the code for full outer join with unique join keys. - * This is code-gen version of `fullOuterJoinWithUniqueKey()`. + * Generates the code for build-side or full outer join with unique join keys. + * This is code-gen version of `buildSideOrFullOuterJoinUniqueKey()`. */ - private def codegenFullOuterJoinWithUniqueKey( + private def codegenBuildSideOrFullOuterJoinWithUniqueKey( ctx: CodegenContext, rows: (String, String), inputs: (String, String), @@ -529,10 +529,10 @@ case class ShuffledHashJoinExec( } /** - * Generates the code for full outer join with non-unique join keys. - * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`. + * Generates the code for build-side or full outer join with non-unique join keys. + * This is code-gen version of `buildSideOrFullOuterJoinNonUniqueKey()`. */ - private def codegenFullOuterJoinWithNonUniqueKey( + private def codegenBuildSideOrFullOuterJoinNonUniqueKey( ctx: CodegenContext, rows: (String, String), inputs: (String, String), @@ -594,9 +594,10 @@ case class ShuffledHashJoinExec( | | if (!$foundMatch) { | $buildRow = null; - | if ($isFullOuterJoin) { - | $consumeFullOuterJoinRow(); - | } + | } + | + | if ($isFullOuterJoin) { + | $consumeFullOuterJoinRow(); | } | | if (shouldStop()) return; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7c2ab24661970..eb58a77704ee8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1444,7 +1444,6 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } - test("SPARK-34593: Preserve broadcast nested loop join partitioning and ordering") { withTable("t1", "t2", "t3", "t4", "t5") { spark.range(15).toDF("k").write.bucketBy(4, "k").saveAsTable("t1") From f0eeca2a841c516a6a3902019a899406e259323a Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 26 Jun 2023 11:06:21 -0700 Subject: [PATCH 6/8] Fix error --- .../joins/ShuffledHashJoinExec.scala | 7 ++-- .../KeyGroupedPartitioningSuite.scala | 37 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index b5fb543fa80f5..5c559b832fd9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -594,10 +594,9 @@ case class ShuffledHashJoinExec( | | if (!$foundMatch) { | $buildRow = null; - | } - | - | if ($isFullOuterJoin) { - | $consumeFullOuterJoinRow(); + | if ($isFullOuterJoin) { + | $consumeFullOuterJoinRow(); + | } | } | | if (shouldStop()) return; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index be5e1b524e565..8e3aa07b2638b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -931,6 +931,43 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("szehon-test") { + val tbl1 = "tbl1" + val tbl2 = "tbl2" + val partitions = Array(identity("id"), identity("data")) + createTable("tbl1", schema, partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', cast('2020-01-01' as timestamp)), " + + "(2, 'bb', cast('2020-01-02' as timestamp)), " + + "(3, 'cc', cast('2020-02-03' as timestamp))") + + val purchases_partitions = Array(identity("item_id"), identity("name")) + createTable(purchases, purchases_schema, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 42.0, cast('2020-01-01' as timestamp)), " + + "(2, 19.5, cast('2020-02-01' as timestamp)), " + + "(4, 30.0, cast('2020-02-01' as timestamp))") + + Seq(true, false).foreach { pushDownValues => + withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { + val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + + "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + if (pushDownValues) { + assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") + } else { + assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + + "pushing down partition values is not enabled") + } + + checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 19.5))) + } + } + } + test("data source partitioning + dynamic partition filtering") { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", From 0adbfc635622d47e1291d0b20755b6463e37f831 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 26 Jun 2023 14:22:19 -0700 Subject: [PATCH 7/8] remove unnecessary change --- .../KeyGroupedPartitioningSuite.scala | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 8e3aa07b2638b..be5e1b524e565 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -931,43 +931,6 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } - test("szehon-test") { - val tbl1 = "tbl1" - val tbl2 = "tbl2" - val partitions = Array(identity("id"), identity("data")) - createTable("tbl1", schema, partitions) - - sql(s"INSERT INTO testcat.ns.$items VALUES " + - "(1, 'aa', cast('2020-01-01' as timestamp)), " + - "(2, 'bb', cast('2020-01-02' as timestamp)), " + - "(3, 'cc', cast('2020-02-03' as timestamp))") - - val purchases_partitions = Array(identity("item_id"), identity("name")) - createTable(purchases, purchases_schema, purchases_partitions) - sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - "(1, 42.0, cast('2020-01-01' as timestamp)), " + - "(2, 19.5, cast('2020-02-01' as timestamp)), " + - "(4, 30.0, cast('2020-02-01' as timestamp))") - - Seq(true, false).foreach { pushDownValues => - withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - - val shuffles = collectShuffles(df.queryExecution.executedPlan) - if (pushDownValues) { - assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") - } else { - assert(shuffles.nonEmpty, "should add shuffle when partition values mismatch, and " + - "pushing down partition values is not enabled") - } - - checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 19.5))) - } - } - } - test("data source partitioning + dynamic partition filtering") { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", From f9dccffee798d879ba67cb8d3997d6a2fe6a170e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 29 Jun 2023 22:12:02 -0700 Subject: [PATCH 8/8] consumeFullOuterJoinRow => consumeOuterJoinRow --- .../joins/ShuffledHashJoinExec.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 5c559b832fd9a..974f6f9e50c2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -415,10 +415,10 @@ case class ShuffledHashJoinExec( case BuildLeft => buildResultVars ++ streamedResultVars case BuildRight => streamedResultVars ++ buildResultVars } - val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow") - ctx.addNewFunction(consumeFullOuterJoinRow, + val consumeOuterJoinRow = ctx.freshName("consumeOuterJoinRow") + ctx.addNewFunction(consumeOuterJoinRow, s""" - |private void $consumeFullOuterJoinRow() throws java.io.IOException { + |private void $consumeOuterJoinRow() throws java.io.IOException { | ${metricTerm(ctx, "numOutputRows")}.add(1); | ${consume(ctx, resultVars)} |} @@ -427,11 +427,11 @@ case class ShuffledHashJoinExec( val isFullOuterJoin = joinType == FullOuter val joinWithUniqueKey = codegenBuildSideOrFullOuterJoinWithUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, - streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, + streamedKeyExprCode.value, relationTerm, conditionCheck, consumeOuterJoinRow, isFullOuterJoin) val joinWithNonUniqueKey = codegenBuildSideOrFullOuterJoinNonUniqueKey( ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull, - streamedKeyExprCode.value, relationTerm, conditionCheck, consumeFullOuterJoinRow, + streamedKeyExprCode.value, relationTerm, conditionCheck, consumeOuterJoinRow, isFullOuterJoin) s""" @@ -456,7 +456,7 @@ case class ShuffledHashJoinExec( streamedKeyValue: ExprValue, relationTerm: String, conditionCheck: String, - consumeFullOuterJoinRow: String, + consumeOuterJoinRow: String, isFullOuterJoin: Boolean): String = { // Inline mutable state since not many join operations in a task val matchedKeySetClsName = classOf[BitSet].getName @@ -497,7 +497,7 @@ case class ShuffledHashJoinExec( | } | | if ($foundMatch || $isFullOuterJoin) { - | $consumeFullOuterJoinRow(); + | $consumeOuterJoinRow(); | } | | if (shouldStop()) return; @@ -515,7 +515,7 @@ case class ShuffledHashJoinExec( | // check if key index is not in matched keys set | if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) { | $buildRow = $rowWithIndex.getValue(); - | $consumeFullOuterJoinRow(); + | $consumeOuterJoinRow(); | } | | if (shouldStop()) return; @@ -541,7 +541,7 @@ case class ShuffledHashJoinExec( streamedKeyValue: ExprValue, relationTerm: String, conditionCheck: String, - consumeFullOuterJoinRow: String, + consumeOuterJoinRow: String, isFullOuterJoin: Boolean): String = { // Inline mutable state since not many join operations in a task val matchedRowSetClsName = classOf[OpenHashSet[_]].getName @@ -588,14 +588,14 @@ case class ShuffledHashJoinExec( | // set row index in matched row set | $matchedRowSet.add($rowIndex); | $foundMatch = true; - | $consumeFullOuterJoinRow(); + | $consumeOuterJoinRow(); | } | } | | if (!$foundMatch) { | $buildRow = null; | if ($isFullOuterJoin) { - | $consumeFullOuterJoinRow(); + | $consumeOuterJoinRow(); | } | } | @@ -621,7 +621,7 @@ case class ShuffledHashJoinExec( | // check if row index is not in matched row set | if (!$matchedRowSet.contains($rowIndex)) { | $buildRow = $rowWithIndex.getValue(); - | $consumeFullOuterJoinRow(); + | $consumeOuterJoinRow(); | } | | if (shouldStop()) return;