From a2d5d6de483f86eac9a6ef49ad37e6d00c635ba4 Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Tue, 21 Nov 2023 21:51:59 +0800 Subject: [PATCH 1/2] first --- .../spark/sql/execution/joins/HashJoin.scala | 5 ++-- .../org/apache/spark/sql/JoinSuite.scala | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 3ae76a1db22b2..c04332a5a3f79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport { UnsafeProjection.create(streamedBoundKeys) @transient protected[this] lazy val boundCondition = if (condition.isDefined) { - if (joinType == FullOuter && buildSide == BuildLeft) { - // Put join left side before right side. This is to be consistent with - // `ShuffledHashJoinExec.fullOuterJoin`. + if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) { + // Put join left side before right side. This is to be consistent with ShuffledHashJoinExec. Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _ } else { Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 909a05ce26f78..6c1357a543eeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1755,4 +1755,27 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan cached.unpersist() } } + + test("SPARK-46037: When Left Join build Left, ShuffledHashJoinExec may " + + "result in incorrect results") { + withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") { + val df1 = sql( + """ + |SELECT /*+ SHUFFLE_HASH(t1) */ * + |FROM testData t1 + |LEFT OUTER JOIN + |testData2 t2 + |ON key = a AND concat(value, b) = '12' + |""".stripMargin) + val df2 = sql( + """ + |SELECT /*+ SHUFFLE_MERGE(t1) */ * + |FROM testData t1 + |LEFT OUTER JOIN + |testData2 t2 + |ON key = a AND concat(value, b) = '12' + |""".stripMargin) + checkAnswer(df1, df2.collect()) + } + } } From aa81f424aee2db9d85f1ab6720a519b9552cc7a7 Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Wed, 22 Nov 2023 15:30:50 +0800 Subject: [PATCH 2/2] Empty-Commit