From 7ebbaface0473f7254ae76b914ab6a4657607bc9 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 8 Apr 2022 20:20:40 +0800 Subject: [PATCH 1/4] Remove unnecessary distinct in aggregate expression by distinctKeys --- .../sql/catalyst/optimizer/Optimizer.scala | 23 +++++++++++++++---- .../optimizer/EliminateDistinctSuite.scala | 23 +++++++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 66c2ad84ccee8..c465359247240 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -411,14 +411,27 @@ abstract class Optimizer(catalogManager: CatalogManager) } /** - * Remove useless DISTINCT for MAX and MIN. + * Remove useless DISTINCT: + * 1. For some aggregate expression, e.g.: MAX and MIN. + * 2. If the distinct semantics is guaranteed by child. + * * This rule should be applied before RewriteDistinctAggregates. */ object EliminateDistinct extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning( - _.containsPattern(AGGREGATE_EXPRESSION)) { - case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) => - ae.copy(isDistinct = false) + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( + _.containsPattern(AGGREGATE)) { + case agg: Aggregate => + agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { + case ae: AggregateExpression if ae.isDistinct && + isDuplicateAgnostic(ae.aggregateFunction) => + ae.copy(isDistinct = false) + + case ae: AggregateExpression if ae.isDistinct && + agg.child.distinctKeys.exists( + _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) && + NormalizeFloatingNumbers.normalize(ae).eq(ae) => + ae.copy(isDistinct = false) + } } def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 9c57ced8492b8..0a2a80a2e8e8a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -33,6 +33,7 @@ class EliminateDistinctSuite extends PlanTest { } val testRelation = LocalRelation($"a".int) + val testRelation2 = LocalRelation($"a".int, $"b".string, $"c".double) Seq( Max(_), @@ -71,4 +72,26 @@ class EliminateDistinctSuite extends PlanTest { comparePlans(Optimize.execute(query), answer) } } + + test("SPARK-38832: Remove unnecessary distinct in aggregate expression by distinctKeys") { + val q1 = testRelation2.groupBy($"a")($"a") + .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze + val r1 = testRelation2.groupBy($"a")($"a") + .rebalance().groupBy()(count($"a") as "x", sum($"a") as "y").analyze + comparePlans(Optimize.execute(q1), r1) + + // not a subset of distinct attr + val q2 = testRelation2.groupBy($"a", $"b")($"a", $"b") + .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze + comparePlans(Optimize.execute(q2), q2) + + // avoid remove double data type attr + val q3 = testRelation2.groupBy($"c")($"c") + .rebalance().groupBy()(sumDistinct($"c") as "x").analyze + comparePlans(Optimize.execute(q3), q3) + + // child distinct key is empty + val q4 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze + comparePlans(Optimize.execute(q4), q4) + } } From 2ffc5ee676d767e8a0c18ad79861fca468daf109 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 11 Apr 2022 14:35:35 +0800 Subject: [PATCH 2/4] address comment --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- .../catalyst/optimizer/EliminateDistinctSuite.scala | 11 +++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c465359247240..74d9be7c573d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -428,8 +428,7 @@ object EliminateDistinct extends Rule[LogicalPlan] { case ae: AggregateExpression if ae.isDistinct && agg.child.distinctKeys.exists( - _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) && - NormalizeFloatingNumbers.normalize(ae).eq(ae) => + _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) => ae.copy(isDistinct = false) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 0a2a80a2e8e8a..798cc0a42dd3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -33,7 +33,7 @@ class EliminateDistinctSuite extends PlanTest { } val testRelation = LocalRelation($"a".int) - val testRelation2 = LocalRelation($"a".int, $"b".string, $"c".double) + val testRelation2 = LocalRelation($"a".int, $"b".string) Seq( Max(_), @@ -85,13 +85,8 @@ class EliminateDistinctSuite extends PlanTest { .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze comparePlans(Optimize.execute(q2), q2) - // avoid remove double data type attr - val q3 = testRelation2.groupBy($"c")($"c") - .rebalance().groupBy()(sumDistinct($"c") as "x").analyze - comparePlans(Optimize.execute(q3), q3) - // child distinct key is empty - val q4 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze - comparePlans(Optimize.execute(q4), q4) + val q3 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze + comparePlans(Optimize.execute(q3), q3) } } From dc67cfd1a9f168095a7a51009f17dcaa2956631f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 12 Apr 2022 14:22:20 +0800 Subject: [PATCH 3/4] ordering --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 74d9be7c573d5..bb788336c6d77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager) PushDownPredicates) :: Nil } - val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) :: + val batches = ( // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), @@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// + Batch("Eliminate Distinct", Once, EliminateDistinct) :: // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. From c16e9957b7f45ecb4c245a89857545d2959d3e75 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 12 Apr 2022 16:56:15 +0800 Subject: [PATCH 4/4] address comment --- .../catalyst/plans/logical/LogicalPlanDistinctKeys.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala index 1843c2da478ef..2ffa5a0e594e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala @@ -29,6 +29,12 @@ import org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED */ trait LogicalPlanDistinctKeys { self: LogicalPlan => lazy val distinctKeys: Set[ExpressionSet] = { - if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty + if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) { + val keys = DistinctKeyVisitor.visit(self) + require(keys.forall(_.nonEmpty)) + keys + } else { + Set.empty + } } }