From ea32dff17685eabc68ab7140c1d5025cb1f59506 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 29 May 2023 22:37:15 +0800 Subject: [PATCH 1/6] [SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join --- .../spark/sql/catalyst/optimizer/joins.scala | 4 +- .../joins/ShuffledHashJoinExec.scala | 72 +++++++++++------ .../org/apache/spark/sql/JoinSuite.scala | 77 +++++++++++++++++++ 3 files changed, 127 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 972be43a946aa..48b4007a89750 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -374,14 +374,14 @@ trait JoinSelectionHelper { def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | RightOuter | FullOuter => true + case _: InnerLike | LeftOuter | FullOuter | RightOuter => true case _ => false } } def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | LeftOuter | FullOuter | + case _: InnerLike | LeftOuter | FullOuter | RightOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index cfe35d04778fb..2e52787c24192 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -57,6 +57,8 @@ case class ShuffledHashJoinExec( override def outputOrdering: Seq[SortOrder] = joinType match { case FullOuter => Nil + case LeftOuter if buildSide == BuildLeft => Nil + case RightOuter if buildSide == BuildRight => Nil case _ => super.outputOrdering } @@ -83,8 +85,10 @@ case class ShuffledHashJoinExec( iter, buildBoundKeys, taskMemoryManager = context.taskMemoryManager(), - // Full outer join needs support for NULL key in HashedRelation. - allowsNullKey = joinType == FullOuter, + // build-side outer join needs support for NULL key in HashedRelation. + allowsNullKey = joinType == FullOuter || + (joinType == LeftOuter && buildSide == BuildLeft) || + (joinType == RightOuter && buildSide == BuildRight), ignoresDuplicatedKey = ignoreDuplicatedKey) buildTime += NANOSECONDS.toMillis(System.nanoTime() - start) buildDataSize += relation.estimatedSize @@ -98,16 +102,21 @@ case class ShuffledHashJoinExec( streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) joinType match { - case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows) + case FullOuter => buildSideOuterJoin(streamIter, hashed, numOutputRows, full = true) + case LeftOuter if buildSide.equals(BuildLeft) => + buildSideOuterJoin(streamIter, hashed, numOutputRows, full = false) + case RightOuter if buildSide.equals(BuildRight) => + buildSideOuterJoin(streamIter, hashed, numOutputRows, full = false) case _ => join(streamIter, hashed, numOutputRows) } } } - private def fullOuterJoin( + private def buildSideOuterJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, - numOutputRows: SQLMetric): Iterator[InternalRow] = { + numOutputRows: SQLMetric, + full: Boolean): Iterator[InternalRow] = { val joinKeys = streamSideKeyGenerator() val joinRow = new JoinedRow val (joinRowWithStream, joinRowWithBuild) = { @@ -130,11 +139,11 @@ case class ShuffledHashJoinExec( } val iter = if (hashedRelation.keyIsUnique) { - fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, full) } else { - fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, full) } val resultProj = UnsafeProjection.create(output, output) @@ -145,7 +154,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with unique join keys: + * Shuffled hash join with unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A bit set is used to track matched rows with key index. @@ -153,23 +162,30 @@ case class ShuffledHashJoinExec( * Filter out rows from build side being matched already, * by checking key index from bit set. */ - private def fullOuterJoinWithUniqueKey( + private def buildSideOuterJoinUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + full: Boolean): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 + def noMatch = if (full) { + Some(joinRowWithBuild(buildNullRow)) + } else { + None + } + // Process stream side with looking up hash relation - val streamResultIter = streamIter.map { srow => + val streamResultIter = streamIter.flatMap { srow => joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - joinRowWithBuild(buildNullRow) + noMatch } else { val matched = hashedRelation.getValueWithKeyIndex(keys) if (matched != null) { @@ -178,12 +194,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithBuild(buildRow) if (boundCondition(joinRow)) { matchedKeys.set(keyIndex) - joinRow + Some(joinRow) } else { - joinRowWithBuild(buildNullRow) + noMatch } } else { - joinRowWithBuild(buildNullRow) + noMatch } } } @@ -205,7 +221,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with non-unique join keys: + * Shuffled hash join with non-unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A [[OpenHashSet]] (Long) is used to track matched rows with @@ -219,14 +235,15 @@ case class ShuffledHashJoinExec( * the value indices of its tuples will be 0, 1 and 2. * Note that value indices of tuples with different keys are incomparable. */ - private def fullOuterJoinWithNonUniqueKey( + private def buildSideOuterJoinNonUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + full: Boolean): Iterator[InternalRow] = { val matchedRows = new OpenHashSet[Long] TaskContext.get().addTaskCompletionListener[Unit](_ => { // At the end of the task, update the task's memory usage for this @@ -252,7 +269,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - Iterator.single(joinRowWithBuild(buildNullRow)) + // return row with build side NULL row to satisfy full outer join semantics if enabled + if (full) { + Iterator.single(joinRowWithBuild(buildNullRow)) + } else { + Iterator.empty + } } else { val buildIter = hashedRelation.getWithKeyIndex(keys) new RowIterator { @@ -272,8 +294,8 @@ case class ShuffledHashJoinExec( } // When we reach here, it means no match is found for this key. // So we need to return one row with build side NULL row, - // to satisfy the full outer join semantic. - if (!found) { + // to satisfy the full outer join semantic if enabled. + if (!found && full) { joinRowWithBuild(buildNullRow) // Set `found` to be true as we only need to return one row // but no more. @@ -314,7 +336,9 @@ case class ShuffledHashJoinExec( override def supportCodegen: Boolean = joinType match { case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) - case _ => true + case LeftOuter if buildSide == BuildLeft => false + case RightOuter if buildSide == BuildRight => false + case _ => super.supportCodegen } override def inputRDDs(): Seq[RDD[InternalRow]] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 2c24296533946..12e3f2eb2022b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1249,6 +1249,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } + test("SPARK-36612: Support left outer join build left or right outer join build right in " + + "shuffled hash join") { + val inputDFs = Seq( + // Test unique join key + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test non-unique join key + (spark.range(10).selectExpr("id % 5 as k1"), + spark.range(30).selectExpr("id % 5 as k2"), + $"k1" === $"k2"), + // Test empty build side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test empty stream side + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test empty build and stream side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test string join key + (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), + spark.range(30).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test build side at right + (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), + spark.range(10).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test NULL join key + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + // Test multiple join keys + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( + "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), + spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( + "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), + $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") + ) + + // test left outer with left side build + inputDFs.foreach { case (df1, df2, joinExprs) => + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + + // test right outer with right side build + inputDFs.foreach { case (df2, df1, joinExprs) => + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + } + test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") { val inputDFs = Seq( // Test empty build side for inner join From 532964bfef1fad112d015be6dc1399d643744ebc Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 31 May 2023 12:50:02 +0800 Subject: [PATCH 2/6] Revert unnecessary change --- .../apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 2e52787c24192..bad02aa267492 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -338,7 +338,7 @@ case class ShuffledHashJoinExec( case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) case LeftOuter if buildSide == BuildLeft => false case RightOuter if buildSide == BuildRight => false - case _ => super.supportCodegen + case _ => true } override def inputRDDs(): Seq[RDD[InternalRow]] = { From 6089beb6f92f383ebba25915eb58c730361db065 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 31 May 2023 23:43:17 +0800 Subject: [PATCH 3/6] Fix JoinHintSuite tests --- .../org/apache/spark/sql/JoinHintSuite.scala | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 1792b4c32eb11..7e6140856382a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -489,7 +489,7 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) assertShuffleHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) @@ -507,8 +507,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP BuildLeft) // Shuffle-hash hint specified but not doable - assertBroadcastHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight) assertBroadcastNLJoin( sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) } @@ -606,13 +604,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType)) + } + + val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("is not supported in the query:")) + assert(logs.size === 1) + logs.foreach(log => + assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) + } + + Seq("left_semi", "left_anti").foreach { joinType => + val hintAppender = new LogAppender(s"join hint build side check for $joinType") + withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType)) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) + assert(logs.size === 1) logs.foreach(log => assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) } @@ -622,8 +632,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertBroadcastHashJoin( df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight) - assertShuffleHashJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) @@ -631,19 +639,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assert(logs.isEmpty) } - Seq("right_outer").foreach { joinType => + Seq("left_semi", "left_anti").foreach { joinType => val hintAppender = new LogAppender(s"join hint build side check for $joinType") withLogAppender(hintAppender, level = Some(Level.WARN)) { - assertShuffleMergeJoin( - df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType)) - assertShuffleMergeJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType)) + assertShuffleHashJoin( + df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) } + val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) - logs.foreach(log => - assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join."))) + assert(logs.isEmpty) } Seq("right_outer").foreach { joinType => From 505e23439fa09409ddee3f613960b8bcc6a77ce3 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 1 Jun 2023 23:16:11 +0800 Subject: [PATCH 4/6] Review comments --- .../sql/execution/joins/HashedRelation.scala | 1 - .../joins/ShuffledHashJoinExec.scala | 21 +++++++++++-------- .../org/apache/spark/sql/JoinHintSuite.scala | 17 ++++++++++----- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 4d3e63282fabf..16345bb35db2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -127,7 +127,6 @@ private[execution] object HashedRelation { * Create a HashedRelation from an Iterator of InternalRow. * * @param allowsNullKey Allow NULL keys in HashedRelation. - * This is used for full outer join in `ShuffledHashJoinExec` only. * @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation. * This is only used for semi and anti join without join condition in * `ShuffledHashJoinExec` only. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index bad02aa267492..6d146491eda94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -56,6 +56,9 @@ case class ShuffledHashJoinExec( override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning override def outputOrdering: Seq[SortOrder] = joinType match { + // For outer joins where the outer side is build-side, order cannot be guaranteed. + // The algorithm performs an additional un-ordered iteration on build-side (HashedRelation) + // to find unmatched rows to satisfy the outer join semantic. case FullOuter => Nil case LeftOuter if buildSide == BuildLeft => Nil case RightOuter if buildSide == BuildRight => Nil @@ -163,18 +166,18 @@ case class ShuffledHashJoinExec( * by checking key index from bit set. */ private def buildSideOuterJoinUniqueKey( - streamIter: Iterator[InternalRow], - hashedRelation: HashedRelation, - joinKeys: UnsafeProjection, - joinRowWithStream: InternalRow => JoinedRow, - joinRowWithBuild: InternalRow => JoinedRow, - streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow, - full: Boolean): Iterator[InternalRow] = { + streamIter: Iterator[InternalRow], + hashedRelation: HashedRelation, + joinKeys: UnsafeProjection, + joinRowWithStream: InternalRow => JoinedRow, + joinRowWithBuild: InternalRow => JoinedRow, + streamNullJoinRowWithBuild: => InternalRow => JoinedRow, + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 - def noMatch = if (full) { + def noMatch = if (isFullOuterJoin) { Some(joinRowWithBuild(buildNullRow)) } else { None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 7e6140856382a..7af826583bd45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -493,6 +493,12 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) + // Determine build side based on hint + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight) + // Shuffle-hash hint prioritized over shuffle-replicate-nl hint assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), @@ -639,16 +645,17 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assert(logs.isEmpty) } - Seq("left_semi", "left_anti").foreach { joinType => + Seq("right_outer").foreach { joinType => val hintAppender = new LogAppender(s"join hint build side check for $joinType") withLogAppender(hintAppender, level = Some(Level.WARN)) { - assertShuffleHashJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) + assertShuffleMergeJoin( + df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType)) } - val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.isEmpty) + assert(logs.size === 1) + logs.foreach(log => + assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join."))) } Seq("right_outer").foreach { joinType => From 2940070b66a3c7811cdb97b8831ff4c9c6350762 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 1 Jun 2023 23:38:04 +0800 Subject: [PATCH 5/6] Review fixup --- .../joins/ShuffledHashJoinExec.scala | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 6d146491eda94..ffe93ae16567a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -105,11 +105,12 @@ case class ShuffledHashJoinExec( streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) joinType match { - case FullOuter => buildSideOuterJoin(streamIter, hashed, numOutputRows, full = true) + case FullOuter => buildSideOuterJoin(streamIter, hashed, numOutputRows, + isFullOuterJoin = true) case LeftOuter if buildSide.equals(BuildLeft) => - buildSideOuterJoin(streamIter, hashed, numOutputRows, full = false) + buildSideOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case RightOuter if buildSide.equals(BuildRight) => - buildSideOuterJoin(streamIter, hashed, numOutputRows, full = false) + buildSideOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case _ => join(streamIter, hashed, numOutputRows) } } @@ -119,7 +120,7 @@ case class ShuffledHashJoinExec( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, numOutputRows: SQLMetric, - full: Boolean): Iterator[InternalRow] = { + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val joinKeys = streamSideKeyGenerator() val joinRow = new JoinedRow val (joinRowWithStream, joinRowWithBuild) = { @@ -143,10 +144,10 @@ case class ShuffledHashJoinExec( val iter = if (hashedRelation.keyIsUnique) { buildSideOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, full) + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } else { buildSideOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, full) + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } val resultProj = UnsafeProjection.create(output, output) @@ -166,14 +167,14 @@ case class ShuffledHashJoinExec( * by checking key index from bit set. */ private def buildSideOuterJoinUniqueKey( - streamIter: Iterator[InternalRow], - hashedRelation: HashedRelation, - joinKeys: UnsafeProjection, - joinRowWithStream: InternalRow => JoinedRow, - joinRowWithBuild: InternalRow => JoinedRow, - streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow, - isFullOuterJoin: Boolean): Iterator[InternalRow] = { + streamIter: Iterator[InternalRow], + hashedRelation: HashedRelation, + joinKeys: UnsafeProjection, + joinRowWithStream: InternalRow => JoinedRow, + joinRowWithBuild: InternalRow => JoinedRow, + streamNullJoinRowWithBuild: => InternalRow => JoinedRow, + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 @@ -246,7 +247,7 @@ case class ShuffledHashJoinExec( joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, buildNullRow: GenericInternalRow, - full: Boolean): Iterator[InternalRow] = { + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedRows = new OpenHashSet[Long] TaskContext.get().addTaskCompletionListener[Unit](_ => { // At the end of the task, update the task's memory usage for this @@ -273,7 +274,7 @@ case class ShuffledHashJoinExec( val keys = joinKeys(srow) if (keys.anyNull) { // return row with build side NULL row to satisfy full outer join semantics if enabled - if (full) { + if (isFullOuterJoin) { Iterator.single(joinRowWithBuild(buildNullRow)) } else { Iterator.empty @@ -298,7 +299,7 @@ case class ShuffledHashJoinExec( // When we reach here, it means no match is found for this key. // So we need to return one row with build side NULL row, // to satisfy the full outer join semantic if enabled. - if (!found && full) { + if (!found && isFullOuterJoin) { joinRowWithBuild(buildNullRow) // Set `found` to be true as we only need to return one row // but no more. From d325d97d2d59f2626bb00a3de6d0da1c1e498391 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 2 Jun 2023 15:58:50 +0800 Subject: [PATCH 6/6] Clarify 'build-side outer join' to be 'build-side or full outer join' --- .../execution/joins/ShuffledHashJoinExec.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index ffe93ae16567a..8953bf19f35ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -88,7 +88,7 @@ case class ShuffledHashJoinExec( iter, buildBoundKeys, taskMemoryManager = context.taskMemoryManager(), - // build-side outer join needs support for NULL key in HashedRelation. + // build-side or full outer join needs support for NULL key in HashedRelation. allowsNullKey = joinType == FullOuter || (joinType == LeftOuter && buildSide == BuildLeft) || (joinType == RightOuter && buildSide == BuildRight), @@ -105,18 +105,18 @@ case class ShuffledHashJoinExec( streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) joinType match { - case FullOuter => buildSideOuterJoin(streamIter, hashed, numOutputRows, + case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = true) case LeftOuter if buildSide.equals(BuildLeft) => - buildSideOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case RightOuter if buildSide.equals(BuildRight) => - buildSideOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case _ => join(streamIter, hashed, numOutputRows) } } } - private def buildSideOuterJoin( + private def buildSideOrFullOuterJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, numOutputRows: SQLMetric, @@ -143,10 +143,10 @@ case class ShuffledHashJoinExec( } val iter = if (hashedRelation.keyIsUnique) { - buildSideOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } else { - buildSideOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } @@ -166,7 +166,7 @@ case class ShuffledHashJoinExec( * Filter out rows from build side being matched already, * by checking key index from bit set. */ - private def buildSideOuterJoinUniqueKey( + private def buildSideOrFullOuterJoinUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, @@ -239,7 +239,7 @@ case class ShuffledHashJoinExec( * the value indices of its tuples will be 0, 1 and 2. * Note that value indices of tuples with different keys are incomparable. */ - private def buildSideOuterJoinNonUniqueKey( + private def buildSideOrFullOuterJoinNonUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection,