From c78166de38f48b11f8e3dc328c95f80451206f7c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 11 Mar 2016 16:25:35 -0800 Subject: [PATCH 1/2] Remove redundant conditions while combining filters --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 85776670e5c4e..adb1a3637b152 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -819,12 +819,15 @@ object CombineUnions extends Rule[LogicalPlan] { } /** - * Combines two adjacent [[Filter]] operators into one, merging the - * conditions into one conjunctive predicate. + * Combines two adjacent [[Filter]] operators into one, merging the non-redundant conditions into + * one conjunctive predicate. */ -object CombineFilters extends Rule[LogicalPlan] { +object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild) + case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => + val ac = (ExpressionSet(splitConjunctivePredicates(nc)) -- + ExpressionSet(splitConjunctivePredicates(fc))).reduce(And) + Filter(And(ac, fc), grandChild) } } From f9aba06e2b133ce46f3a778e56bb7e481a35e0b8 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 11 Mar 2016 16:40:56 -0800 Subject: [PATCH 2/2] unit tests --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 +++++++--- .../catalyst/optimizer/FilterPushdownSuite.scala | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index adb1a3637b152..f50ac107adda1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -825,9 +825,13 @@ object CombineUnions extends Rule[LogicalPlan] { object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => - val ac = (ExpressionSet(splitConjunctivePredicates(nc)) -- - ExpressionSet(splitConjunctivePredicates(fc))).reduce(And) - Filter(And(ac, fc), grandChild) + (ExpressionSet(splitConjunctivePredicates(fc)) -- + ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { + case Some(ac) => + Filter(And(ac, nc), grandChild) + case None => + nf + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index a636d63012454..b84ae7c5bb6ad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -81,6 +81,21 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("combine redundant filters") { + val originalQuery = + testRelation + .where('a === 1 && 'b === 1) + .where('a === 1 && 'c === 1) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 1 && 'b === 1 && 'c === 1) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("can't push without rewrite") { val originalQuery = testRelation