From 89664b4f657811ef3c30eb77450e8d636742faa0 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 16 Mar 2020 10:52:05 +0530 Subject: [PATCH 1/7] Additional checks on deciding the pruning side --- .../dynamicpruning/PartitionPruning.scala | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 43c6581632687..2f74bafc166f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -196,14 +196,34 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { case _ => false } + def sameOutput(planA: LogicalPlan, planB: LogicalPlan): Boolean = { + val planAOutput = planA.output + val planBOutput = planB.output + planAOutput.length == planBOutput.length && planAOutput.zip(planBOutput).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } + + private def prune(plan: LogicalPlan): LogicalPlan = { plan transformUp { // skip this rule if there's already a DPP subquery on the LHS of a join case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j case j @ Join(left, right, joinType, Some(condition), hint) => - var newLeft = left - var newRight = right + + // If left side is smaller that the right side, pruningHasBenefit will always return false. + // Even if this join turns out to be a BHJ, build side would be left side and because we + // would have inserted DPP predicate with filteringPlan as right side, DPP node will become + // a pass-through. This re-ordering will help in optimizing cases when both the sides are + // partitioned and the Left side is smaller than right side. + // This will also help in avoiding the overhead for the case when only left side is + // partitioned and left is smaller than the right side. + var (newLeft, newRight) = if (left.stats.sizeInBytes > right.stats.sizeInBytes) { + (left, right) + } else { + (right, left) + } // extract the left and right keys of the join condition val (leftKeys, rightKeys) = j match { @@ -246,7 +266,16 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { } case _ => } - Join(newLeft, newRight, joinType, Some(condition), hint) + val reOrderedJoin = Join(newLeft, newRight, joinType, Some(condition), hint) + if (sameOutput(reOrderedJoin, j)) { + reOrderedJoin + } else { + // Reordering the joins has changed the order of the columns. + // Inject a projection to make sure we restore to the expected ordering. + // Deserialization will also break otherwise if the join expression + // column names are identical + Project(j.output, reOrderedJoin) + } } } @@ -257,3 +286,4 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { case _ => prune(plan) } } + From a3019bdb44d9c80ad3a511b2b69e34107623bda6 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 9 Jul 2020 17:17:19 +0530 Subject: [PATCH 2/7] Revert "Additional checks on deciding the pruning side" This reverts commit 89664b4f657811ef3c30eb77450e8d636742faa0. --- .../dynamicpruning/PartitionPruning.scala | 36 ++----------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 2f74bafc166f2..43c6581632687 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -196,34 +196,14 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { case _ => false } - def sameOutput(planA: LogicalPlan, planB: LogicalPlan): Boolean = { - val planAOutput = planA.output - val planBOutput = planB.output - planAOutput.length == planBOutput.length && planAOutput.zip(planBOutput).forall { - case (a1, a2) => a1.semanticEquals(a2) - } - } - - private def prune(plan: LogicalPlan): LogicalPlan = { plan transformUp { // skip this rule if there's already a DPP subquery on the LHS of a join case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j case j @ Join(left, right, joinType, Some(condition), hint) => - - // If left side is smaller that the right side, pruningHasBenefit will always return false. - // Even if this join turns out to be a BHJ, build side would be left side and because we - // would have inserted DPP predicate with filteringPlan as right side, DPP node will become - // a pass-through. This re-ordering will help in optimizing cases when both the sides are - // partitioned and the Left side is smaller than right side. - // This will also help in avoiding the overhead for the case when only left side is - // partitioned and left is smaller than the right side. - var (newLeft, newRight) = if (left.stats.sizeInBytes > right.stats.sizeInBytes) { - (left, right) - } else { - (right, left) - } + var newLeft = left + var newRight = right // extract the left and right keys of the join condition val (leftKeys, rightKeys) = j match { @@ -266,16 +246,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { } case _ => } - val reOrderedJoin = Join(newLeft, newRight, joinType, Some(condition), hint) - if (sameOutput(reOrderedJoin, j)) { - reOrderedJoin - } else { - // Reordering the joins has changed the order of the columns. - // Inject a projection to make sure we restore to the expected ordering. - // Deserialization will also break otherwise if the join expression - // column names are identical - Project(j.output, reOrderedJoin) - } + Join(newLeft, newRight, joinType, Some(condition), hint) } } @@ -286,4 +257,3 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { case _ => prune(plan) } } - From bcb6c098109fc102a809f677ee71ebfa3ca78830 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 9 Jul 2020 19:04:20 +0530 Subject: [PATCH 3/7] Orient SMJ based on the adaptive stats --- .../adaptive/AdaptiveJoinOrientation.scala | 38 +++++++++++++++ .../adaptive/AdaptiveSparkPlanExec.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 48 +++++++++++++++++-- 3 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala new file mode 100644 index 0000000000000..31d96c4e65d65 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +object AdaptiveJoinOrientation extends Rule[LogicalPlan] { + + private def isMaterializedShuffleStage(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, shuffleExec: ShuffleQueryStageExec) + if shuffleExec.resultOption.get.isDefined => true + case _ => + false + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case j @ Join(left, right, _, _, _) + if isMaterializedShuffleStage(left) && isMaterializedShuffleStage(right) && + (left.stats.sizeInBytes < right.stats.sizeInBytes) => + Project(j.output, j.copy(left = right, right = left)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index bc924e6978ddc..23f9c032571b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -79,7 +79,8 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( - Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) + Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), + Batch("Orient Sort Merge Join", Once, AdaptiveJoinOrientation) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index c696d3f648ed1..8e5d4f5f22fbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -21,20 +21,19 @@ import java.io.File import java.net.URI import org.apache.log4j.Level - import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.execution._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql._ import org.apache.spark.util.Utils class AdaptiveQueryExecSuite @@ -1147,4 +1146,47 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-SMJ_ORIENTATION: reorient SMJ using adaptive stats") { + Seq(true, false).foreach { enableAQE => + withTable("testTbl1", "testTbl2") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + + def getLeftTableName(plan: SparkPlan): String = { + val node = plan.collectLeaves()(0) match { + case x: ShuffleQueryStageExec => + x.plan.collectLeaves()(0) + case x: SparkPlan => + x + } + node.asInstanceOf[FileSourceScanExec].tableIdentifier.get.table + } + + val df1 = (0 until 10).toDF("col1").as("df1") + df1.write.format("parquet").saveAsTable("testTbl1") + + val df2 = (0 until 100).toDF("col1").as("df2") + df2.write.format("parquet").saveAsTable("testTbl2") + + val df = + spark.sql("SELECT * from testTbl1 JOIN testTbl2 ON testTbl1.col1 = testTbl2.col1") + + if (enableAQE) { + val initialPlan = df.queryExecution.executedPlan + df.collect + val adaptivePlan = df.queryExecution.executedPlan + val executedPlan = adaptivePlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert(getLeftTableName(executedPlan) == "testTbl2") + assert(initialPlan.output == adaptivePlan.output) + } else { + df.collect + val executedPlan = df.queryExecution.executedPlan + assert(getLeftTableName(executedPlan) == "testTbl1") + } + } + } + } + } } From 011052733e3f80922922d886bd611a3e5ac77ae2 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 9 Jul 2020 19:08:53 +0530 Subject: [PATCH 4/7] Nit --- .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 23f9c032571b2..83cd28f7f3238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Orient Sort Merge Join", Once, AdaptiveJoinOrientation) + Batch("Orient SortMergeJoin", Once, AdaptiveJoinOrientation) ) } From c80d8d8fb7fd43396cd8d3194d94b6408017fca2 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 9 Jul 2020 19:24:03 +0530 Subject: [PATCH 5/7] Changed formatting --- .../spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala index 31d96c4e65d65..2882c7faccd43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala @@ -31,7 +31,7 @@ object AdaptiveJoinOrientation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case j @ Join(left, right, _, _, _) - if isMaterializedShuffleStage(left) && isMaterializedShuffleStage(right) && + if Seq(left, right).forall(isMaterializedShuffleStage) && (left.stats.sizeInBytes < right.stats.sizeInBytes) => Project(j.output, j.copy(left = right, right = left)) } From dd5fb36e06d8253a7b191af5fcaa100b1769fa8e Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 13 Jul 2020 12:38:39 +0530 Subject: [PATCH 6/7] Do not change join order if SMJ will be converted to a BHJ --- .../adaptive/AdaptiveJoinOrientation.scala | 16 +++- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../adaptive/AdaptiveQueryExecSuite.scala | 82 +++++++++++-------- 3 files changed, 62 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala index 2882c7faccd43..8131f6d58f504 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveJoinOrientation.scala @@ -17,10 +17,17 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.plans.InnerLike import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf -object AdaptiveJoinOrientation extends Rule[LogicalPlan] { +/** + * This optimization rule detects if the probe side of the SortMerge join is smaller + * than build side. While joining, probe side is streamed and build side is buffered, + * so having a larger build side can cause memory issues. + */ +case class AdaptiveJoinOrientation(conf: SQLConf) extends Rule[LogicalPlan] { private def isMaterializedShuffleStage(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, shuffleExec: ShuffleQueryStageExec) @@ -30,9 +37,10 @@ object AdaptiveJoinOrientation extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case j @ Join(left, right, _, _, _) - if Seq(left, right).forall(isMaterializedShuffleStage) && - (left.stats.sizeInBytes < right.stats.sizeInBytes) => + case j @ Join(left, right, _: InnerLike, _, _) + if Seq(left, right).forall(plan => isMaterializedShuffleStage(plan) && + plan.stats.sizeInBytes > conf.autoBroadcastJoinThreshold) && + left.stats.sizeInBytes < right.stats.sizeInBytes => Project(j.output, j.copy(left = right, right = left)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 83cd28f7f3238..1f7a22705b972 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Orient SortMergeJoin", Once, AdaptiveJoinOrientation) + Batch("Orient SortMergeJoin", Once, AdaptiveJoinOrientation(conf)) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8e5d4f5f22fbd..09b4e683820b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,6 +23,7 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.InnerLike import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} @@ -1148,44 +1149,59 @@ class AdaptiveQueryExecSuite } test("SPARK-SMJ_ORIENTATION: reorient SMJ using adaptive stats") { - Seq(true, false).foreach { enableAQE => - withTable("testTbl1", "testTbl2") { - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - - def getLeftTableName(plan: SparkPlan): String = { - val node = plan.collectLeaves()(0) match { - case x: ShuffleQueryStageExec => - x.plan.collectLeaves()(0) - case x: SparkPlan => - x - } - node.asInstanceOf[FileSourceScanExec].tableIdentifier.get.table - } - - val df1 = (0 until 10).toDF("col1").as("df1") - df1.write.format("parquet").saveAsTable("testTbl1") - - val df2 = (0 until 100).toDF("col1").as("df2") - df2.write.format("parquet").saveAsTable("testTbl2") + withTable("testTbl1", "testTbl2") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val df = - spark.sql("SELECT * from testTbl1 JOIN testTbl2 ON testTbl1.col1 = testTbl2.col1") + def getProbeBuildSides(plan: SparkPlan): Option[(SparkPlan, SparkPlan)] = { + plan collectFirst { + case SortMergeJoinExec(_, _, _: InnerLike, _, left, right, _) => + (left, right) + } + } - if (enableAQE) { - val initialPlan = df.queryExecution.executedPlan - df.collect - val adaptivePlan = df.queryExecution.executedPlan - val executedPlan = adaptivePlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan - assert(getLeftTableName(executedPlan) == "testTbl2") - assert(initialPlan.output == adaptivePlan.output) + def validateOrientation(initialPlan: SparkPlan, updatedPlan: SparkPlan, + orderShouldBeChanged: Boolean): Unit = { + val (initialProbePlan, initialBuildPlan) = getProbeBuildSides(initialPlan).get + val (updatedProbePlan, updatedBuildPlan) = getProbeBuildSides(updatedPlan).get + if (orderShouldBeChanged) { + assert(updatedBuildPlan.output == initialProbePlan.output) + assert(updatedProbePlan.output == initialBuildPlan.output) } else { - df.collect - val executedPlan = df.queryExecution.executedPlan - assert(getLeftTableName(executedPlan) == "testTbl1") + assert(updatedBuildPlan.output == initialBuildPlan.output) + assert(updatedProbePlan.output == initialProbePlan.output) } } + + val df1 = (0 until 10).toDF("col1").as("df1") + df1.write.format("parquet").saveAsTable("testTbl1") + + val df2 = (0 until 100).toDF("col1").as("df2") + df2.write.format("parquet").saveAsTable("testTbl2") + + val dfWrongOrder = + spark.sql("SELECT * from testTbl1 JOIN testTbl2 ON testTbl1.col1 = testTbl2.col1") + val dfCorrectOrder = + spark.sql("SELECT * from testTbl2 JOIN testTbl1 ON testTbl1.col1 = testTbl2.col1") + + val initialAdaptivePlan1 = dfWrongOrder.queryExecution.executedPlan + val initialPlan1 = initialAdaptivePlan1.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + dfWrongOrder.collect + val adaptivePlan1 = dfWrongOrder.queryExecution.executedPlan + val updatedPlan1 = adaptivePlan1.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + // Join orientation should have changed + validateOrientation(initialPlan1, updatedPlan1, true) + // Project should correct the result order + assert(initialPlan1.output == adaptivePlan1.output) + + val initialAdaptivePlan2 = dfCorrectOrder.queryExecution.executedPlan + val initialPlan2 = initialAdaptivePlan2.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + dfCorrectOrder.collect + val adaptivePlan2 = dfCorrectOrder.queryExecution.executedPlan + val updatedPlan2 = adaptivePlan2.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + // Join orientation should not have changed + validateOrientation(initialPlan2, updatedPlan2, false) } } } From e5c7db32ea688c06a955393c96651c3534b8efae Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 13 Jul 2020 18:06:16 +0530 Subject: [PATCH 7/7] Fixed imports --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 09b4e683820b5..a616882f2b51d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -21,20 +21,21 @@ import java.io.File import java.net.URI import org.apache.log4j.Level + import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} +import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.InnerLike import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate -import org.apache.spark.sql.execution._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} -import org.apache.spark.sql._ import org.apache.spark.util.Utils class AdaptiveQueryExecSuite