From fffd0fdeab7272f8d9a7c67a968a489000c0a358 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 18 Jan 2018 18:42:46 +0900 Subject: [PATCH 1/3] Fix --- .../spark/sql/catalyst/optimizer/joins.scala | 30 +++++- .../sql/catalyst/planning/patterns.scala | 25 +++-- .../optimizer/JoinOptimizationSuite.scala | 100 ++++++++++++++++-- .../optimizer/StarJoinReorderSuite.scala | 9 +- .../StatsEstimationTestBase.scala | 3 +- 5 files changed, 141 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b65221c236bfe..9366c39201cde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -88,6 +88,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) + : Seq[LogicalPlan] = plan match { + case Join(left, right, _: InnerLike, _, _) => right +: extractLeftDeepInnerJoins(left) + case Filter(_, child) => extractLeftDeepInnerJoins(child) + case Project(_, child) => extractLeftDeepInnerJoins(child) + case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => @@ -103,12 +118,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { - reordered + // Checks if joins were reordered. If not reordered, returns the original plan + if (!sameJoinOrder(reordered, p)) { + if (p.sameOutput(reordered)) { + reordered + } else { + // Reordering the joins have changed the order of the columns. + // Inject a projection to make sure we restore to the expected ordering. + Project(p.output, reordered) + } } else { - // Reordering the joins have changed the order of the columns. - // Inject a projection to make sure we restore to the expected ordering. - Project(p.output, reordered) + reordered } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 415ce46788119..6af07d4a4cd98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -222,7 +222,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins and skip projections with attributes only. * * Filter * | @@ -230,6 +230,8 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { * / \ ----> (Seq(plan0, plan1, plan2), conditions) * Filter plan2 * | + * Project + * | * inner join * / \ * plan0 plan1 @@ -253,19 +255,20 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - + case p @ Project(_, child) + // Keep flattening joins when the project has attributes only + if p.projectList.forall(_.isInstanceOf[Attribute]) => + flattenJoin(child) case _ => (Seq((plan, parentJoinType)), Seq.empty) } - def unapply(plan: LogicalPlan) - : Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { - case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _, hint)) - if hint == JoinHint.NONE => - Some(flattenJoin(f)) - case j @ Join(_, _, joinType, _, hint) if hint == JoinHint.NONE => - Some(flattenJoin(j)) - case _ => None + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] = { + val (plans, conditions) = flattenJoin(plan) + if (plans.size > 1) { + Some((plans, conditions)) + } else { + None + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 3d81c567eff11..b3d2a2338277a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Expression} import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan +import org.apache.spark.sql.internal.SQLConf class JoinOptimizationSuite extends PlanTest { @@ -47,6 +49,20 @@ class JoinOptimizationSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation1 = LocalRelation('d.int) + private def testExtractCheckCross( + plan: LogicalPlan, + expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]): Unit = { + ExtractFiltersAndInnerJoins.unapply(plan) match { + case Some((input, conditions)) => + expected.map { case (expectedPlans, expectedConditions) => + assert(expectedPlans === input) + assert(expectedConditions.toSet === conditions.toSet) + } + case None => + assert(expected.isEmpty) + } + } + test("extract filters and joins") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) @@ -64,12 +80,6 @@ class JoinOptimizationSuite extends PlanTest { testExtractCheckCross(plan, expectedNoCross) } - def testExtractCheckCross(plan: LogicalPlan, - expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]): Unit = { - assert( - ExtractFiltersAndInnerJoins.unapply(plan) === expected.map(e => (e._1, e._2))) - } - testExtract(x, None) testExtract(x.where("x.b".attr === 1), None) testExtract(x.join(y), Some((Seq(x, y), Seq()))) @@ -126,4 +136,80 @@ class JoinOptimizationSuite extends PlanTest { comparePlans(optimized, queryAnswerPair._2.analyze) } } + + test("SPARK-23172 skip projections when flattening joins") { + def checkExtractInnerJoins(plan: LogicalPlan): Unit = { + val expectedTables = plan.collectLeaves().map { case p => (p, Inner) } + val expectedConditions = plan.collect { + case Join(_, _, _, Some(cond), _) => cond + case Filter(cond, _) => cond + } + testExtractCheckCross(plan, Some((expectedTables, expectedConditions))) + } + + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + var joined = x.join(z, Inner, Some($"x.b" === $"z.b")) + .select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze + checkExtractInnerJoins(joined) + + // test case for project-over-filter + joined = x.join(z, Inner, Some($"x.b" === $"z.b")) + .select($"x.a", $"z.a", $"z.c") + .where($"y.d" === 3) + .join(y, Inner, Some($"y.d" === $"z.a")).analyze + checkExtractInnerJoins(joined) + + // test case for filter-over-project + joined = x.join(z, Inner, Some($"x.b" === $"z.b")) + .where($"z.a" === 1) + .select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze + checkExtractInnerJoins(joined) + } + + test("SPARK-23172 reorder joins with projections") { + withSQLConf( + SQLConf.STARSCHEMA_DETECTION.key -> "true", + SQLConf.CBO_ENABLED.key -> "false") { + val r0output = Seq('a.int, 'b.int, 'c.int) + val r0colStat = ColumnStat(distinctCount = Some(100000000), nullCount = Some(0)) + val r0colStats = AttributeMap(r0output.map(_ -> r0colStat)) + val r0 = StatsTestPlan(r0output, 100000000, r0colStats, name = Some("r0")).subquery('r0) + + val r1output = Seq('a.int, 'd.int) + val r1colStat = ColumnStat(distinctCount = Some(10), nullCount = Some(0)) + val r1colStats = AttributeMap(r1output.map(_ -> r1colStat)) + val r1 = StatsTestPlan(r1output, 10, r1colStats, name = Some("r1")).subquery('r1) + + val r2output = Seq('b.int, 'e.int) + val r2colStat = ColumnStat(distinctCount = Some(100), nullCount = Some(0)) + val r2colStats = AttributeMap(r2output.map(_ -> r2colStat)) + val r2 = StatsTestPlan(r2output, 100, r2colStats, name = Some("r2")).subquery('r2) + + val r3output = Seq('c.int, 'f.int) + val r3colStat = ColumnStat(distinctCount = Some(1), nullCount = Some(0)) + val r3colStats = AttributeMap(r3output.map(_ -> r3colStat)) + val r3 = StatsTestPlan(r3output, 1, r3colStats, name = Some("r3")).subquery('r3) + + val joined = r0.join(r1, Inner, Some($"r0.a" === $"r1.a")) + .select($"r0.b", $"r0.c", $"r1.d") + .where($"r1.d" >= 3) + .join(r2, Inner, Some($"r0.b" === $"r2.b")) + .where($"r2.e" >= 5) + .select($"r0.c", $"r1.d", $"r2.e") + .join(r3, Inner, Some($"r0.c" === $"r3.c")) + .select($"r1.d", $"r2.e", $"r3.f") + .where($"r3.f" <= 100) + .analyze + + val optimized = Optimize.execute(joined) + val optJoins = ReorderJoin.extractLeftDeepInnerJoins(optimized) + val joinOrder = optJoins.flatMap(_.collect{ case p: StatsTestPlan => p }.headOption) + .flatMap(_.name) + assert(joinOrder === Seq("r2", "r1", "r3", "r0")) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala index 10e970d534c49..cd9087c647fe1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} import org.apache.spark.sql.internal.SQLConf._ @@ -580,7 +580,12 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def assertEqualPlans(plan1: LogicalPlan, plan2: LogicalPlan): Unit = { val analyzed = plan1.analyze - val optimized = Optimize.execute(analyzed) + val optimized = Optimize.execute(analyzed) match { + // `ReorderJoin` adds `Project` to keep the same order of output attributes. + // So, we drop a top `Project` for tests. + case project: Project => project.child + case p => p + } val expected = plan2.analyze assert(equivalentOutput(analyzed, expected)) // if this fails, the expected itself is incorrect diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala index 9dceca59f5b87..7f4618787ace0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala @@ -69,7 +69,8 @@ case class StatsTestPlan( outputList: Seq[Attribute], rowCount: BigInt, attributeStats: AttributeMap[ColumnStat], - size: Option[BigInt] = None) extends LeafNode { + size: Option[BigInt] = None, + name: Option[String] = None) extends LeafNode { override def output: Seq[Attribute] = outputList override def computeStats(): Statistics = Statistics( // If sizeInBytes is useless in testing, we just use a fake value From 85d2435a0a83c6e52c35e40ca4e11d8d0933718c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 13 Dec 2019 21:33:35 +0900 Subject: [PATCH 2/3] Fix --- .../apache/spark/sql/catalyst/optimizer/joins.scala | 3 ++- .../sql/catalyst/optimizer/JoinOptimizationSuite.scala | 10 +++++----- .../statsEstimation/StatsEstimationTestBase.scala | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 9366c39201cde..2e5a18a036394 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -93,7 +93,8 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { // the same strategy to extract the plan list. private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) : Seq[LogicalPlan] = plan match { - case Join(left, right, _: InnerLike, _, _) => right +: extractLeftDeepInnerJoins(left) + case Join(left, right, _: InnerLike, _, hint) if hint == JoinHint.NONE => + right +: extractLeftDeepInnerJoins(left) case Filter(_, child) => extractLeftDeepInnerJoins(child) case Project(_, child) => extractLeftDeepInnerJoins(child) case _ => Seq(plan) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index b3d2a2338277a..5a62a6c2e1eb8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -177,22 +177,22 @@ class JoinOptimizationSuite extends PlanTest { val r0output = Seq('a.int, 'b.int, 'c.int) val r0colStat = ColumnStat(distinctCount = Some(100000000), nullCount = Some(0)) val r0colStats = AttributeMap(r0output.map(_ -> r0colStat)) - val r0 = StatsTestPlan(r0output, 100000000, r0colStats, name = Some("r0")).subquery('r0) + val r0 = StatsTestPlan(r0output, 100000000, r0colStats, identifier = Some("r0")).subquery('r0) val r1output = Seq('a.int, 'd.int) val r1colStat = ColumnStat(distinctCount = Some(10), nullCount = Some(0)) val r1colStats = AttributeMap(r1output.map(_ -> r1colStat)) - val r1 = StatsTestPlan(r1output, 10, r1colStats, name = Some("r1")).subquery('r1) + val r1 = StatsTestPlan(r1output, 10, r1colStats, identifier = Some("r1")).subquery('r1) val r2output = Seq('b.int, 'e.int) val r2colStat = ColumnStat(distinctCount = Some(100), nullCount = Some(0)) val r2colStats = AttributeMap(r2output.map(_ -> r2colStat)) - val r2 = StatsTestPlan(r2output, 100, r2colStats, name = Some("r2")).subquery('r2) + val r2 = StatsTestPlan(r2output, 100, r2colStats, identifier = Some("r2")).subquery('r2) val r3output = Seq('c.int, 'f.int) val r3colStat = ColumnStat(distinctCount = Some(1), nullCount = Some(0)) val r3colStats = AttributeMap(r3output.map(_ -> r3colStat)) - val r3 = StatsTestPlan(r3output, 1, r3colStats, name = Some("r3")).subquery('r3) + val r3 = StatsTestPlan(r3output, 1, r3colStats, identifier = Some("r3")).subquery('r3) val joined = r0.join(r1, Inner, Some($"r0.a" === $"r1.a")) .select($"r0.b", $"r0.c", $"r1.d") @@ -208,7 +208,7 @@ class JoinOptimizationSuite extends PlanTest { val optimized = Optimize.execute(joined) val optJoins = ReorderJoin.extractLeftDeepInnerJoins(optimized) val joinOrder = optJoins.flatMap(_.collect{ case p: StatsTestPlan => p }.headOption) - .flatMap(_.name) + .flatMap(_.identifier) assert(joinOrder === Seq("r2", "r1", "r3", "r0")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala index 7f4618787ace0..de743147c9dd1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala @@ -70,7 +70,7 @@ case class StatsTestPlan( rowCount: BigInt, attributeStats: AttributeMap[ColumnStat], size: Option[BigInt] = None, - name: Option[String] = None) extends LeafNode { + identifier: Option[String] = None) extends LeafNode { override def output: Seq[Attribute] = outputList override def computeStats(): Statistics = Statistics( // If sizeInBytes is useless in testing, we just use a fake value From 37e5fe284d6d35eda7d61ba01bf43bcba8af844d Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 15 Dec 2019 15:55:25 +0900 Subject: [PATCH 3/3] Fix --- .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 5 +++-- .../apache/spark/sql/catalyst/planning/patterns.scala | 4 ++-- .../sql/catalyst/optimizer/JoinOptimizationSuite.scala | 4 ++-- .../sql/catalyst/optimizer/StarJoinReorderSuite.scala | 9 ++------- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 2e5a18a036394..a2b31278326d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -119,7 +119,8 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - // Checks if joins were reordered. If not reordered, returns the original plan + // To avoid applying this rule repeatedly, we don't change the plan in case of + // the same join order between `p` and `reordered`. if (!sameJoinOrder(reordered, p)) { if (p.sameOutput(reordered)) { reordered @@ -129,7 +130,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } else { - reordered + p } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6af07d4a4cd98..e566938f28938 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -252,8 +252,8 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { val (plans, conditions) = flattenJoin(left, joinType) (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq.flatMap(splitConjunctivePredicates)) - case Filter(filterCondition, j @ Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => - val (plans, conditions) = flattenJoin(j) + case Filter(filterCondition, child) => + val (plans, conditions) = flattenJoin(child) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) case p @ Project(_, child) // Keep flattening joins when the project has attributes only diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 5a62a6c2e1eb8..d085d8dee204e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -137,7 +137,7 @@ class JoinOptimizationSuite extends PlanTest { } } - test("SPARK-23172 skip projections when flattening joins") { + test("Skip projections when flattening joins") { def checkExtractInnerJoins(plan: LogicalPlan): Unit = { val expectedTables = plan.collectLeaves().map { case p => (p, Inner) } val expectedConditions = plan.collect { @@ -170,7 +170,7 @@ class JoinOptimizationSuite extends PlanTest { checkExtractInnerJoins(joined) } - test("SPARK-23172 reorder joins with projections") { + test("Reorder joins with projections") { withSQLConf( SQLConf.STARSCHEMA_DETECTION.key -> "true", SQLConf.CBO_ENABLED.key -> "false") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala index cd9087c647fe1..10e970d534c49 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} import org.apache.spark.sql.internal.SQLConf._ @@ -580,12 +580,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def assertEqualPlans(plan1: LogicalPlan, plan2: LogicalPlan): Unit = { val analyzed = plan1.analyze - val optimized = Optimize.execute(analyzed) match { - // `ReorderJoin` adds `Project` to keep the same order of output attributes. - // So, we drop a top `Project` for tests. - case project: Project => project.child - case p => p - } + val optimized = Optimize.execute(analyzed) val expected = plan2.analyze assert(equivalentOutput(analyzed, expected)) // if this fails, the expected itself is incorrect