From e3755c52c3f7043fb55afe7634d80012067f042d Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Sat, 8 Aug 2020 17:32:51 +0800 Subject: [PATCH 1/9] [SPARK-32573][SQL] Eliminate Anti Join when BuildSide is Empty In [SPARK-32290](https://issues.apache.org/jira/browse/SPARK-32290), we introduced several new types of HashedRelation. * EmptyHashedRelation * EmptyHashedRelationWithAllNullKeys They were all limited to used only in NAAJ scenario. But as for a improvement, EmptyHashedRelation could also be used in Normal AntiJoin for fast stop, and as for in AQE, we can even eliminate anti join when we knew that buildSide is empty. This Patch including two changes. * In Non-AQE, using EmptyHashedRelation to do fast stop for common anti join as well * In AQE, eliminate anti join if buildSide is a EmptyHashedRelation of ShuffleWriteRecord is 0 LeftAntiJoin could apply `fast stop` when BuildSide is Empty, While within AQE, we can even eliminate the anti join. This should be a performance improvement in AntiJoin. No. * added case in AdaptiveQueryExecSuite. * added case in HashedRelationSuite. * Make sure SubquerySuite JoinSuite SQLQueryTestSuite passed. Change-Id: I718227725e319a54b99c91e6a0d1b9f022343b16 --- .../sql/catalyst/planning/patterns.scala | 2 +- .../adaptive/AdaptiveSparkPlanExec.scala | 3 +- .../adaptive/EliminateAntiJoin.scala | 44 +++++++++++++++++++ .../exchange/BroadcastExchangeExec.scala | 8 +++- .../joins/BroadcastHashJoinExec.scala | 20 ++++----- .../spark/sql/execution/joins/HashJoin.scala | 5 +++ .../sql/execution/joins/HashedRelation.scala | 14 +++++- .../adaptive/AdaptiveQueryExecSuite.scala | 34 +++++++++++++- .../execution/joins/HashedRelationSuite.scala | 12 +++++ 9 files changed, 125 insertions(+), 17 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 5a994f1ad0a39..2880e87ab1566 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -410,7 +410,7 @@ object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with Pre */ def unapply(join: Join): Option[ReturnType] = join match { case Join(left, right, LeftAnti, - Some(Or(e @ EqualTo(leftAttr: AttributeReference, rightAttr: AttributeReference), + Some(Or(e @ EqualTo(leftAttr: Expression, rightAttr: Expression), IsNull(e2 @ EqualTo(_, _)))), _) if SQLConf.get.optimizeNullAwareAntiJoin && e.semanticEquals(e2) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index b160b8ac2ed68..35938a7656ad1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -79,7 +79,8 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( - Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) + Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), + Batch("Eliminate AntiJoin", Once, EliminateAntiJoin(conf)) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala new file mode 100644 index 0000000000000..7a97ee570cb64 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.plans.LeftAnti +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf + +/** + * This optimization rule detects and eliminate a LeftAntiJoin when buildSide is empty. + */ +case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { + + private def shouldEliminate(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined + && stage.getRuntimeStatistics.rowCount.isDefined + && stage.getRuntimeStatistics.rowCount.get == 0L => true + case _ => false + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { + // If the right side is empty, LeftAntiJoin simply returns the left side. + // Eliminate Join with left LogicalPlan instead. + case Join(left, right, LeftAnti, _, _) + if shouldEliminate(right) => + left + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 6d8d37022ea42..2c7a805303e85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.joins.HashedRelation +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, LongHashedRelation, UnsafeHashedRelation} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.unsafe.map.BytesToBytesMap @@ -82,6 +82,8 @@ case class BroadcastExchangeExec( "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) + var zeroNumRows: Boolean = false + override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) override def doCanonicalize(): SparkPlan = { @@ -90,7 +92,7 @@ case class BroadcastExchangeExec( override def runtimeStatistics: Statistics = { val dataSize = metrics("dataSize").value - Statistics(dataSize) + Statistics(dataSize, rowCount = if (zeroNumRows) Some(0L) else None) } @transient @@ -135,6 +137,8 @@ case class BroadcastExchangeExec( s"type: ${relation.getClass.getName}") } + zeroNumRows = relation == EmptyHashedRelation + longMetric("dataSize") += dataSize if (dataSize >= MAX_BROADCAST_TABLE_BYTES) { throw new SparkException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index e4935c8c72228..f965f0bd1362d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -223,19 +223,19 @@ case class BroadcastHashJoinExec( * Handles NULL-aware anti join (NAAJ) separately here. */ protected override def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = { - if (isNullAwareAntiJoin) { - val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) + val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) + val numOutput = metricTerm(ctx, "numOutputRows") + if (broadcastRelation.value == EmptyHashedRelation) { + s""" + |// If the right side is empty, AntiJoin simply returns the left side. + |$numOutput.add(1); + |${consume(ctx, input)} + """.stripMargin + } else if (isNullAwareAntiJoin) { val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, _, _) = getJoinCondition(ctx, input) - val numOutput = metricTerm(ctx, "numOutputRows") - if (broadcastRelation.value == EmptyHashedRelation) { - s""" - |// If the right side is empty, NAAJ simply returns the left side. - |$numOutput.add(1); - |${consume(ctx, input)} - """.stripMargin - } else if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { + if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { s""" |// If the right side contains any all-null key, NAAJ simply returns Nothing. """.stripMargin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 1c6504b141890..9d6095620ffc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -270,6 +270,11 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { private def antiJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation): Iterator[InternalRow] = { + // If the right side is empty, AntiJoin simply returns the left side. + if (hashedRelation == EmptyHashedRelation) { + return streamIter + } + val joinKeys = streamSideKeyGenerator() val joinedRow = new JoinedRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index f2835c2fa6626..437df1080d300 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -108,7 +108,7 @@ private[execution] object HashedRelation { 0) } - if (isNullAware && !input.hasNext) { + if (!input.hasNext) { EmptyHashedRelation } else if (key.length == 1 && key.head.dataType == LongType) { LongHashedRelation(input, key, sizeEstimate, mm, isNullAware) @@ -125,7 +125,7 @@ private[execution] object HashedRelation { * [number of keys] * [size of key] [size of value] [key bytes] [bytes for value] */ -private[joins] class UnsafeHashedRelation( +class UnsafeHashedRelation( private var numKeys: Int, private var numFields: Int, private var binaryMap: BytesToBytesMap) @@ -950,8 +950,18 @@ trait NullAwareHashedRelation extends HashedRelation with Externalizable { /** * A special HashedRelation indicates it built from a empty input:Iterator[InternalRow]. + * get & getValue will return null just like + * Empty LongHashedRelation or Empty UnsafeHashedRelation does. */ object EmptyHashedRelation extends NullAwareHashedRelation { + override def get(key: Long): Iterator[InternalRow] = null + + override def get(key: InternalRow): Iterator[InternalRow] = null + + override def getValue(key: Long): InternalRow = null + + override def getValue(key: InternalRow): InternalRow = null + override def asReadOnlyCopy(): EmptyHashedRelation.type = this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 7fdcbd0d089cc..c9c8de72ca3e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -100,6 +100,12 @@ class AdaptiveQueryExecSuite } } + private def findTopLevelBaseJoin(plan: SparkPlan): Seq[BaseJoinExec] = { + collect(plan) { + case j: BaseJoinExec => j + } + } + private def findReusedExchange(plan: SparkPlan): Seq[ReusedExchangeExec] = { collectWithSubqueries(plan) { case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e @@ -1148,4 +1154,30 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-32573: Eliminate Anti Join when BuildSide is Empty") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData a LEFT ANTI JOIN emptyTestData b ON a.key = b.key") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(join.isEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + } + + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData a LEFT ANTI JOIN emptyTestData b ON a.key = b.key") + val bhj = findTopLevelBroadcastHashJoin(plan) + assert(bhj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(join.isEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 21ee88f0d7426..8b270bd5a2636 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -580,4 +580,16 @@ class HashedRelationSuite extends SharedSparkSession { assert(proj(packedKeys).get(0, dt) == -i - 1) } } + + test("EmptyHashedRelation return null in get / getValue") { + val buildKey = Seq(BoundReference(0, LongType, false)) + val hashed = HashedRelation(Seq.empty[InternalRow].toIterator, buildKey, 1, mm) + assert(hashed == EmptyHashedRelation) + + val key = InternalRow(1L) + assert(hashed.get(0L) == null) + assert(hashed.get(key) == null) + assert(hashed.getValue(0L) == null) + assert(hashed.getValue(key) == null) + } } From 25dd44417e118ea87470c00ed11393055eb2ee52 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Sat, 8 Aug 2020 17:45:21 +0800 Subject: [PATCH 2/9] remove useless import. Change-Id: I47ebe0cc2e751b018a6ee02fc3018601805c4060 --- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 2c7a805303e85..f1783d1f1f2b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, LongHashedRelation, UnsafeHashedRelation} +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.unsafe.map.BytesToBytesMap From 2063288e072a266218b57772ada874d35432b344 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Sun, 9 Aug 2020 10:19:12 +0800 Subject: [PATCH 3/9] Nit & Code Style. Change-Id: Icfe57ed02de8dd1b414f4d0c81a51353d49a6ead --- .../spark/sql/execution/adaptive/EliminateAntiJoin.scala | 4 ++-- .../sql/execution/exchange/BroadcastExchangeExec.scala | 9 +++++---- .../spark/sql/execution/joins/HashedRelation.scala | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala index 7a97ee570cb64..6b64da7a96b9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf */ case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { - private def shouldEliminate(plan: LogicalPlan): Boolean = plan match { + private def canEliminate(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined && stage.getRuntimeStatistics.rowCount.isDefined && stage.getRuntimeStatistics.rowCount.get == 0L => true @@ -38,7 +38,7 @@ case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { // If the right side is empty, LeftAntiJoin simply returns the left side. // Eliminate Join with left LogicalPlan instead. case Join(left, right, LeftAnti, _, _) - if shouldEliminate(right) => + if canEliminate(right) => left } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index f1783d1f1f2b0..5241fe9df5ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -82,7 +82,7 @@ case class BroadcastExchangeExec( "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) - var zeroNumRows: Boolean = false + private var knownRowCount: Option[BigInt] = None override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) @@ -92,7 +92,7 @@ case class BroadcastExchangeExec( override def runtimeStatistics: Statistics = { val dataSize = metrics("dataSize").value - Statistics(dataSize, rowCount = if (zeroNumRows) Some(0L) else None) + Statistics(dataSize, rowCount = knownRowCount) } @transient @@ -128,6 +128,9 @@ case class BroadcastExchangeExec( val relation = mode.transform(input, Some(numRows)) val dataSize = relation match { + case EmptyHashedRelation => + knownRowCount = Some(0L) + 0L case map: HashedRelation => map.estimatedSize case arr: Array[InternalRow] => @@ -137,8 +140,6 @@ case class BroadcastExchangeExec( s"type: ${relation.getClass.getName}") } - zeroNumRows = relation == EmptyHashedRelation - longMetric("dataSize") += dataSize if (dataSize >= MAX_BROADCAST_TABLE_BYTES) { throw new SparkException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 437df1080d300..cbf57c15a52a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -125,7 +125,7 @@ private[execution] object HashedRelation { * [number of keys] * [size of key] [size of value] [key bytes] [bytes for value] */ -class UnsafeHashedRelation( +private[joins] class UnsafeHashedRelation( private var numKeys: Int, private var numFields: Int, private var binaryMap: BytesToBytesMap) From 257745ae314de2ea7a186916aa7b47ce66a60ca9 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 10 Aug 2020 09:29:09 +0800 Subject: [PATCH 4/9] nit & indentation. Change-Id: If5d776916fc12674068c486d605ff9a1fc28526f --- .../spark/sql/execution/adaptive/EliminateAntiJoin.scala | 3 +-- .../spark/sql/execution/joins/BroadcastHashJoinExec.scala | 2 +- .../org/apache/spark/sql/execution/joins/HashedRelation.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala index 6b64da7a96b9b..0c7ff9b08872f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala @@ -37,8 +37,7 @@ case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { // If the right side is empty, LeftAntiJoin simply returns the left side. // Eliminate Join with left LogicalPlan instead. - case Join(left, right, LeftAnti, _, _) - if canEliminate(right) => + case Join(left, right, LeftAnti, _, _) if canEliminate(right) => left } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index f965f0bd1362d..9aa32d041616d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -230,7 +230,7 @@ case class BroadcastHashJoinExec( |// If the right side is empty, AntiJoin simply returns the left side. |$numOutput.add(1); |${consume(ctx, input)} - """.stripMargin + """.stripMargin } else if (isNullAwareAntiJoin) { val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, _, _) = getJoinCondition(ctx, input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index cbf57c15a52a0..0d40520ae71a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -951,7 +951,7 @@ trait NullAwareHashedRelation extends HashedRelation with Externalizable { /** * A special HashedRelation indicates it built from a empty input:Iterator[InternalRow]. * get & getValue will return null just like - * Empty LongHashedRelation or Empty UnsafeHashedRelation does. + * empty LongHashedRelation or empty UnsafeHashedRelation does. */ object EmptyHashedRelation extends NullAwareHashedRelation { override def get(key: Long): Iterator[InternalRow] = null From 817de1eaf6c8f868c491eb66709279140e88db28 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 10 Aug 2020 14:30:53 +0800 Subject: [PATCH 5/9] eliminate normal anti join when EmptyHashedRelation is not neccesary. eliminate naaj which buildSide is EmptyHashedRelationWithAllNullKeys while AQE is on, should gain more Performance improvement. Change-Id: I13c667628220da923fc59718281642460c80f71d --- .../adaptive/EliminateAntiJoin.scala | 19 +++++++++---------- .../exchange/BroadcastExchangeExec.scala | 9 ++------- .../adaptive/AdaptiveQueryExecSuite.scala | 16 ++-------------- 3 files changed, 13 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala index 0c7ff9b08872f..588414576a391 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala @@ -17,27 +17,26 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.catalyst.plans.LeftAnti -import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys import org.apache.spark.sql.internal.SQLConf /** - * This optimization rule detects and eliminate a LeftAntiJoin when buildSide is empty. + * This optimization rule detects and convert a NAAJ to an Empty LocalRelation + * when buildSide is EmptyHashedRelationWithAllNullKeys. */ case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { private def canEliminate(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined - && stage.getRuntimeStatistics.rowCount.isDefined - && stage.getRuntimeStatistics.rowCount.get == 0L => true + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined + && stage.broadcast.relationFuture.get().value == EmptyHashedRelationWithAllNullKeys => true case _ => false } def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - // If the right side is empty, LeftAntiJoin simply returns the left side. - // Eliminate Join with left LogicalPlan instead. - case Join(left, right, LeftAnti, _, _) if canEliminate(right) => - left + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if canEliminate(j.right) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 5241fe9df5ca3..6d8d37022ea42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation} +import org.apache.spark.sql.execution.joins.HashedRelation import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.unsafe.map.BytesToBytesMap @@ -82,8 +82,6 @@ case class BroadcastExchangeExec( "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) - private var knownRowCount: Option[BigInt] = None - override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) override def doCanonicalize(): SparkPlan = { @@ -92,7 +90,7 @@ case class BroadcastExchangeExec( override def runtimeStatistics: Statistics = { val dataSize = metrics("dataSize").value - Statistics(dataSize, rowCount = knownRowCount) + Statistics(dataSize) } @transient @@ -128,9 +126,6 @@ case class BroadcastExchangeExec( val relation = mode.transform(input, Some(numRows)) val dataSize = relation match { - case EmptyHashedRelation => - knownRowCount = Some(0L) - 0L case map: HashedRelation => map.estimatedSize case arr: Array[InternalRow] => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index c9c8de72ca3e1..d3fb63e407f1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1155,24 +1155,12 @@ class AdaptiveQueryExecSuite } } - test("SPARK-32573: Eliminate Anti Join when BuildSide is Empty") { - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( - "SELECT * FROM testData a LEFT ANTI JOIN emptyTestData b ON a.key = b.key") - val smj = findTopLevelSortMergeJoin(plan) - assert(smj.size == 1) - val join = findTopLevelBaseJoin(adaptivePlan) - assert(join.isEmpty) - checkNumLocalShuffleReaders(adaptivePlan) - } - + test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( - "SELECT * FROM testData a LEFT ANTI JOIN emptyTestData b ON a.key = b.key") + "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") val bhj = findTopLevelBroadcastHashJoin(plan) assert(bhj.size == 1) val join = findTopLevelBaseJoin(adaptivePlan) From caec89e362c1be5143f7284ebd642591f179e6fa Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 10 Aug 2020 14:34:22 +0800 Subject: [PATCH 6/9] file renamed to EliminateNullAwareAntiJoin since it's for NAAJ. Change-Id: I0d0a96da107145b9fc681e15f2533abd165f344a --- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- ...EliminateAntiJoin.scala => EliminateNullAwareAntiJoin.scala} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{EliminateAntiJoin.scala => EliminateNullAwareAntiJoin.scala} (95%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 35938a7656ad1..cb47145a25679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Eliminate AntiJoin", Once, EliminateAntiJoin(conf)) + Batch("Eliminate Null Aware Anti Join", Once, EliminateNullAwareAntiJoin(conf)) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala index 588414576a391..fe9137e80a382 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf * This optimization rule detects and convert a NAAJ to an Empty LocalRelation * when buildSide is EmptyHashedRelationWithAllNullKeys. */ -case class EliminateAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { +case class EliminateNullAwareAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { private def canEliminate(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined From 381db1115c1543513c77369306ca11a3ce00ad47 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 10 Aug 2020 15:01:02 +0800 Subject: [PATCH 7/9] remove SQLConf in EliminateNullAwareAntiJoin. Change-Id: I3039efd9e472431131b52bec0d0619b84e2c6ee5 --- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/execution/adaptive/EliminateNullAwareAntiJoin.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index cb47145a25679..6b56feceb2665 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Eliminate Null Aware Anti Join", Once, EliminateNullAwareAntiJoin(conf)) + Batch("Eliminate Null Aware Anti Join", Once, EliminateNullAwareAntiJoin) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala index fe9137e80a382..4e0247e2f4bb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala @@ -21,13 +21,12 @@ import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJo import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys -import org.apache.spark.sql.internal.SQLConf /** * This optimization rule detects and convert a NAAJ to an Empty LocalRelation * when buildSide is EmptyHashedRelationWithAllNullKeys. */ -case class EliminateNullAwareAntiJoin(conf: SQLConf) extends Rule[LogicalPlan] { +object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] { private def canEliminate(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined From c8eae30493ba7df0c5918a00d0b1c58c681b27ca Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 10 Aug 2020 23:38:07 +0800 Subject: [PATCH 8/9] refine prepareRelation to avoid call prepareBroadcast twice. Change-Id: Ieb631235d4bc8f1446387eb22f0c1a1cc1d3ed27 --- .../joins/BroadcastHashJoinExec.scala | 26 +++++++------ .../spark/sql/execution/joins/HashJoin.scala | 37 +++++++++++++------ .../joins/ShuffledHashJoinExec.scala | 4 +- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 9aa32d041616d..5df06f8e4d4fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -213,9 +213,11 @@ case class BroadcastHashJoinExec( (broadcastRelation, relationTerm) } - protected override def prepareRelation(ctx: CodegenContext): (String, Boolean) = { + protected override def prepareRelation(ctx: CodegenContext): HashedRelationInfo = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) - (relationTerm, broadcastRelation.value.keyIsUnique) + HashedRelationInfo(relationTerm, + broadcastRelation.value.keyIsUnique, + broadcastRelation.value == EmptyHashedRelation) } /** @@ -223,19 +225,19 @@ case class BroadcastHashJoinExec( * Handles NULL-aware anti join (NAAJ) separately here. */ protected override def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) - val numOutput = metricTerm(ctx, "numOutputRows") - if (broadcastRelation.value == EmptyHashedRelation) { - s""" - |// If the right side is empty, AntiJoin simply returns the left side. - |$numOutput.add(1); - |${consume(ctx, input)} - """.stripMargin - } else if (isNullAwareAntiJoin) { + if (isNullAwareAntiJoin) { + val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, _, _) = getJoinCondition(ctx, input) + val numOutput = metricTerm(ctx, "numOutputRows") - if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { + if (broadcastRelation.value == EmptyHashedRelation) { + s""" + |// If the right side is empty, NAAJ simply returns the left side. + |$numOutput.add(1); + |${consume(ctx, input)} + """.stripMargin + } else if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { s""" |// If the right side contains any all-null key, NAAJ simply returns Nothing. """.stripMargin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 9d6095620ffc7..f65dec2e14a96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -29,6 +29,16 @@ import org.apache.spark.sql.execution.{CodegenSupport, ExplainUtils, RowIterator import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{BooleanType, IntegralType, LongType} +/** + * @param relationTerm variable name for HashedRelation + * @param keyIsUnique indicate whether keys of HashedRelation known to be unique in code-gen time + * @param isEmptyHashedRelation indicate whether it known to be EmptyHashedRelation in code-gen time + */ +private[joins] case class HashedRelationInfo( + relationTerm: String, + keyIsUnique: Boolean, + isEmptyHashedRelation: Boolean) + trait HashJoin extends BaseJoinExec with CodegenSupport { def buildSide: BuildSide @@ -422,7 +432,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for Inner join. */ protected def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (relationTerm, keyIsUnique) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") @@ -472,7 +482,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for left or right outer join. */ protected def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (relationTerm, keyIsUnique) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val matched = ctx.freshName("matched") val buildVars = genBuildSideVars(ctx, matched) @@ -549,7 +559,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for left semi join. */ protected def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (relationTerm, keyIsUnique) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, _) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") @@ -598,10 +608,18 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for anti join. */ protected def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (relationTerm, keyIsUnique) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, isEmptyHashedRelation) = prepareRelation(ctx) + val numOutput = metricTerm(ctx, "numOutputRows") + if (isEmptyHashedRelation) { + return s""" + |// If the right side is empty, Anti Join simply returns the left side. + |$numOutput.add(1); + |${consume(ctx, input)} + |""".stripMargin + } + val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, _) = getJoinCondition(ctx, input) - val numOutput = metricTerm(ctx, "numOutputRows") if (keyIsUnique) { val found = ctx.freshName("found") @@ -659,7 +677,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for existence join. */ protected def codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val (relationTerm, keyIsUnique) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") val existsVar = ctx.freshName("exists") @@ -720,12 +738,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { } } - /** - * Returns a tuple of variable name for HashedRelation, - * and a boolean to indicate whether keys of HashedRelation - * known to be unique in code-gen time. - */ - protected def prepareRelation(ctx: CodegenContext): (String, Boolean) + protected def prepareRelation(ctx: CodegenContext): HashedRelationInfo } object HashJoin { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 9f811cddef6a7..83e053c2b4b2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -81,13 +81,13 @@ case class ShuffledHashJoinExec( override def needCopyResult: Boolean = true - protected override def prepareRelation(ctx: CodegenContext): (String, Boolean) = { + protected override def prepareRelation(ctx: CodegenContext): HashedRelationInfo = { val thisPlan = ctx.addReferenceObj("plan", this) val clsName = classOf[HashedRelation].getName // Inline mutable state since not many join operations in a task val relationTerm = ctx.addMutableState(clsName, "relation", v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = true) - (relationTerm, false) + HashedRelationInfo(relationTerm, false, false) } } From 5309b66b23af2ffe7999edbd8f0c11f26628805d Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Tue, 11 Aug 2020 07:45:30 +0800 Subject: [PATCH 9/9] nit. Change-Id: I726afddeea0c49d5299b7dc9e8e27f0021411418 --- .../scala/org/apache/spark/sql/execution/joins/HashJoin.scala | 4 ++-- .../spark/sql/execution/joins/ShuffledHashJoinExec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index f65dec2e14a96..2154e370a1596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -32,12 +32,12 @@ import org.apache.spark.sql.types.{BooleanType, IntegralType, LongType} /** * @param relationTerm variable name for HashedRelation * @param keyIsUnique indicate whether keys of HashedRelation known to be unique in code-gen time - * @param isEmptyHashedRelation indicate whether it known to be EmptyHashedRelation in code-gen time + * @param isEmpty indicate whether it known to be EmptyHashedRelation in code-gen time */ private[joins] case class HashedRelationInfo( relationTerm: String, keyIsUnique: Boolean, - isEmptyHashedRelation: Boolean) + isEmpty: Boolean) trait HashJoin extends BaseJoinExec with CodegenSupport { def buildSide: BuildSide diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 83e053c2b4b2e..41cefd03dd931 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -88,6 +88,6 @@ case class ShuffledHashJoinExec( // Inline mutable state since not many join operations in a task val relationTerm = ctx.addMutableState(clsName, "relation", v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = true) - HashedRelationInfo(relationTerm, false, false) + HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false) } }