From e945bf109f7e7df8683c14ae557d21d05e980efa Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 30 Aug 2018 16:25:08 +0200 Subject: [PATCH 1/2] [SPARK-25278][SQL] Avoid duplicated Exec nodes when the same logical plan appears in the query --- .../spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index bc41dd0465e34..6fa5203a06f7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -81,7 +81,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { - case p if p == placeholder => childPlan + case p if p.eq(placeholder) => childPlan } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a3a3f3851e21c..e6e1a93238d46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -497,6 +497,17 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } } + test("SPARK-25278: output metrics are wrong for plans repeated in the query") { + val name = "demo_view" + sql(s"CREATE OR REPLACE VIEW $name AS VALUES 1,2") + val view = spark.table(name) + val union = view.union(view) + testSparkPlanMetrics(union, 1, Map( + 0L -> ("Union" -> Map()), + 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), + 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) + } + test("writing data out metrics: parquet") { testMetricsNonDynamicPartition("parquet", "t1") } From 193d7b38cfb7ad2fad2f5d41128e0dcc4a573e42 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 31 Aug 2018 09:52:16 +0200 Subject: [PATCH 2/2] address comments --- .../spark/sql/execution/PlannerSuite.scala | 17 +++++++++++++++++ .../sql/execution/metric/SQLMetricsSuite.scala | 16 +++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3db89ecfad9fc..b10da6c70be16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -704,6 +704,23 @@ class PlannerSuite extends SharedSQLContext { df.queryExecution.executedPlan.execute() } + test("SPARK-25278: physical nodes should be different instances for same logical nodes") { + val range = Range(1, 1, 1, 1) + val df = Union(range, range) + val ranges = df.queryExecution.optimizedPlan.collect { + case r: Range => r + } + assert(ranges.length == 2) + // Ensure the two Range instances are equal according to their equal method + assert(ranges.head == ranges.last) + val execRanges = df.queryExecution.sparkPlan.collect { + case r: RangeExec => r + } + assert(execRanges.length == 2) + // Ensure the two RangeExec instances are different instances + assert(!execRanges.head.eq(execRanges.last)) + } + test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " + "and InMemoryTableScanExec") { def checkOutputPartitioningRewrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e6e1a93238d46..d45eb0c27a6b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -499,13 +499,15 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("SPARK-25278: output metrics are wrong for plans repeated in the query") { val name = "demo_view" - sql(s"CREATE OR REPLACE VIEW $name AS VALUES 1,2") - val view = spark.table(name) - val union = view.union(view) - testSparkPlanMetrics(union, 1, Map( - 0L -> ("Union" -> Map()), - 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), - 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) + withView(name) { + sql(s"CREATE OR REPLACE VIEW $name AS VALUES 1,2") + val view = spark.table(name) + val union = view.union(view) + testSparkPlanMetrics(union, 1, Map( + 0L -> ("Union" -> Map()), + 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), + 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) + } } test("writing data out metrics: parquet") {