From 0700d84aba13ac5aae4ba5ad847c002e97b3885e Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 21 Oct 2016 23:56:52 +0800 Subject: [PATCH 1/4] fix replace alias on constraints rule. --- .../catalyst/plans/logical/LogicalPlan.scala | 14 +++++++----- .../InferFiltersFromConstraintsSuite.scala | 22 ++++++++++++++++--- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 09725473a384d..47ea11195d852 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,15 +293,17 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - projectList.flatMap { + var aliasedConstraints = child.constraints + projectList.foreach { case a @ Alias(e, _) => - child.constraints.map(_ transform { + aliasedConstraints = ExpressionSet(aliasedConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute - }).union(Set(EqualNullSafe(e, a.toAttribute))) - case _ => - Set.empty[Expression] - }.toSet + }).union(Set(EqualNullSafe(e, a.toAttribute)))) + case _ => // Don't change. + } + + aliasedConstraints } override protected def validConstraints: Set[Expression] = child.constraints diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index e7fdd5a6202b6..d97750eb2cbbf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -27,9 +27,11 @@ import org.apache.spark.sql.catalyst.rules._ class InferFiltersFromConstraintsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) :: - Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) :: - Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil + val batches = Batch("InferAndPushDownFilters", FixedPoint(100), + PushPredicateThroughJoin, + PushDownPredicate, + InferFiltersFromConstraints, + CombineFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -120,4 +122,18 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("inner join: infer filters on multiple alias output") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = x.select('a.as('a1), 'b.as('b1)).as("x") + .join(y, Inner, Some("x.a1".attr === "y.b".attr && "x.b1".attr === "y.b".attr)).analyze + val correctAnswer = x.where('b === 'a && IsNotNull('b) && IsNotNull('a)) + .select('a.as('a1), 'b.as('b1)).as("x") + .join(y.where(IsNotNull('b)), Inner, + Some("x.a1".attr === "y.b".attr && "x.b1".attr === "y.b".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From 2fe728fe155c0e70d31ef03675a331ac9337c2f0 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sat, 22 Oct 2016 12:02:01 +0800 Subject: [PATCH 2/4] modify test cases. --- .../catalyst/plans/logical/LogicalPlan.scala | 8 +++---- .../InferFiltersFromConstraintsSuite.scala | 22 +++---------------- .../plans/ConstraintPropagationSuite.scala | 8 +++++++ 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 47ea11195d852..40edc0bc9afc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,17 +293,17 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var aliasedConstraints = child.constraints + var aliasedConstraints = child.constraints.toSeq projectList.foreach { case a @ Alias(e, _) => - aliasedConstraints = ExpressionSet(aliasedConstraints.map(_ transform { + aliasedConstraints ++= aliasedConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute - }).union(Set(EqualNullSafe(e, a.toAttribute)))) + }) :+ EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } - aliasedConstraints + aliasedConstraints.toSet } override protected def validConstraints: Set[Expression] = child.constraints diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index d97750eb2cbbf..e7fdd5a6202b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -27,11 +27,9 @@ import org.apache.spark.sql.catalyst.rules._ class InferFiltersFromConstraintsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("InferAndPushDownFilters", FixedPoint(100), - PushPredicateThroughJoin, - PushDownPredicate, - InferFiltersFromConstraints, - CombineFilters) :: Nil + val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) :: + Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) :: + Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -122,18 +120,4 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } - - test("inner join: infer filters on multiple alias output") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = x.select('a.as('a1), 'b.as('b1)).as("x") - .join(y, Inner, Some("x.a1".attr === "y.b".attr && "x.b1".attr === "y.b".attr)).analyze - val correctAnswer = x.where('b === 'a && IsNotNull('b) && IsNotNull('a)) - .select('a.as('a1), 'b.as('b1)).as("x") - .join(y.where(IsNotNull('b)), Inner, - Some("x.a1".attr === "y.b".attr && "x.b1".attr === "y.b".attr)).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) - } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 8d6a49a8a37b4..8068ce922e636 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -128,8 +128,16 @@ class ConstraintPropagationSuite extends SparkFunSuite { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), + resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), resolveColumn(aliasedRelation.analyze, "z") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "z"))))) + + val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) + verifyConstraints(multiAlias.analyze.constraints, + ExpressionSet(Seq(IsNotNull(resolveColumn(multiAlias.analyze, "x")), + IsNotNull(resolveColumn(multiAlias.analyze, "y")), + resolveColumn(multiAlias.analyze, "x") === resolveColumn(multiAlias.analyze, "y") + 10)) + ) } test("propagating constraints in union") { From 32e0b36eb3c2441ecf6418fed1c0944d1841c13c Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Mon, 24 Oct 2016 12:46:39 +0800 Subject: [PATCH 3/4] bugfix --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 40edc0bc9afc0..a0830402b8cae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,17 +293,19 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var aliasedConstraints = child.constraints.toSeq + var aliasedConstraints = child.constraints.toSet projectList.foreach { case a @ Alias(e, _) => + // For every alias in `projectList`, replace the reference in constraints by its attribute. aliasedConstraints ++= aliasedConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute - }) :+ EqualNullSafe(e, a.toAttribute) + }) + aliasedConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } - aliasedConstraints.toSet + aliasedConstraints } override protected def validConstraints: Set[Expression] = child.constraints From 3f0749338c917301c24330d95a839ab1e7151a1d Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 25 Oct 2016 16:57:39 +0800 Subject: [PATCH 4/4] improve generate of constraints set. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index a0830402b8cae..b0a4145f37767 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,19 +293,19 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var aliasedConstraints = child.constraints.toSet + var allConstraints = child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - aliasedConstraints ++= aliasedConstraints.map(_ transform { + allConstraints ++= allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) - aliasedConstraints += EqualNullSafe(e, a.toAttribute) + allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } - aliasedConstraints + allConstraints -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints