From 3635b3aa9bc5228a159c5c98406359153e3a2af7 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 19 Dec 2018 15:25:47 -0800 Subject: [PATCH 1/2] Refactoring --- .../catalyst/optimizer/ColumnPruning.scala | 139 ++++++++++++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 116 --------------- 2 files changed, 139 insertions(+), 116 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala new file mode 100644 index 0000000000000..d070ae9390faa --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.plans.LeftExistence +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Attempts to eliminate the reading of unneeded columns from the query plan. + * + * Since adding Project before Filter conflicts with PushPredicatesThroughProject, this rule will + * remove the Project p2 in the following pattern: + * + * p1 @ Project(_, Filter(_, p2 @ Project(_, child))) if p2.outputSet.subsetOf(p2.inputSet) + * + * p2 is usually inserted by this rule and useless, p1 could prune the columns anyway. + */ +object ColumnPruning extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform { + // Prunes the unused columns from project list of Project/Aggregate/Expand + case p @ Project(_, p2: Project) if !p2.outputSet.subsetOf(p.references) => + p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) + case p @ Project(_, a: Aggregate) if !a.outputSet.subsetOf(p.references) => + p.copy( + child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains))) + case a @ Project(_, e @ Expand(_, _, grandChild)) if !e.outputSet.subsetOf(a.references) => + val newOutput = e.output.filter(a.references.contains(_)) + val newProjects = e.projections.map { proj => + proj.zip(e.output).filter { case (_, a) => + newOutput.contains(a) + }.unzip._1 + } + a.copy(child = Expand(newProjects, newOutput, grandChild)) + + // Prunes the unused columns from child of `DeserializeToObject` + case d @ DeserializeToObject(_, _, child) if !child.outputSet.subsetOf(d.references) => + d.copy(child = prunedChild(child, d.references)) + + // Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation + case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) => + a.copy(child = prunedChild(child, a.references)) + case f @ FlatMapGroupsInPandas(_, _, _, child) if !child.outputSet.subsetOf(f.references) => + f.copy(child = prunedChild(child, f.references)) + case e @ Expand(_, _, child) if !child.outputSet.subsetOf(e.references) => + e.copy(child = prunedChild(child, e.references)) + case s @ ScriptTransformation(_, _, _, child, _) + if !child.outputSet.subsetOf(s.references) => + s.copy(child = prunedChild(child, s.references)) + + // prune unrequired references + case p @ Project(_, g: Generate) if p.references != g.outputSet => + val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references + val newChild = prunedChild(g.child, requiredAttrs) + val unrequired = g.generator.references -- p.references + val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1)) + .map(_._2) + p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)) + + // Eliminate unneeded attributes from right side of a Left Existence Join. + case j @ Join(_, right, LeftExistence(_), _) => + j.copy(right = prunedChild(right, j.references)) + + // all the columns will be used to compare, so we can't prune them + case p @ Project(_, _: SetOperation) => p + case p @ Project(_, _: Distinct) => p + // Eliminate unneeded attributes from children of Union. + case p @ Project(_, u: Union) => + if (!u.outputSet.subsetOf(p.references)) { + val firstChild = u.children.head + val newOutput = prunedChild(firstChild, p.references).output + // pruning the columns of all children based on the pruned first child. + val newChildren = u.children.map { p => + val selected = p.output.zipWithIndex.filter { case (a, i) => + newOutput.contains(firstChild.output(i)) + }.map(_._1) + Project(selected, p) + } + p.copy(child = u.withNewChildren(newChildren)) + } else { + p + } + + // Prune unnecessary window expressions + case p @ Project(_, w: Window) if !w.windowOutputSet.subsetOf(p.references) => + p.copy(child = w.copy( + windowExpressions = w.windowExpressions.filter(p.references.contains))) + + // Can't prune the columns on LeafNode + case p @ Project(_, _: LeafNode) => p + + // for all other logical plans that inherits the output from it's children + // Project over project is handled by the first case, skip it here. + case p @ Project(_, child) if !child.isInstanceOf[Project] => + val required = child.references ++ p.references + if (!child.inputSet.subsetOf(required)) { + val newChildren = child.children.map(c => prunedChild(c, required)) + p.copy(child = child.withNewChildren(newChildren)) + } else { + p + } + }) + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) = + if (!c.outputSet.subsetOf(allReferences)) { + Project(c.output.filter(allReferences.contains), c) + } else { + c + } + + /** + * The Project before Filter is not necessary but conflict with PushPredicatesThroughProject, + * so remove it. Since the Projects have been added top-down, we need to remove in bottom-up + * order, otherwise lower Projects can be missed. + */ + private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp { + case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child))) + if p2.outputSet.subsetOf(child.outputSet) => + p1.copy(child = f.copy(child = child)) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 44d5543114902..2ae8ef7e6c648 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -525,122 +525,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper } } -/** - * Attempts to eliminate the reading of unneeded columns from the query plan. - * - * Since adding Project before Filter conflicts with PushPredicatesThroughProject, this rule will - * remove the Project p2 in the following pattern: - * - * p1 @ Project(_, Filter(_, p2 @ Project(_, child))) if p2.outputSet.subsetOf(p2.inputSet) - * - * p2 is usually inserted by this rule and useless, p1 could prune the columns anyway. - */ -object ColumnPruning extends Rule[LogicalPlan] { - - def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform { - // Prunes the unused columns from project list of Project/Aggregate/Expand - case p @ Project(_, p2: Project) if !p2.outputSet.subsetOf(p.references) => - p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) - case p @ Project(_, a: Aggregate) if !a.outputSet.subsetOf(p.references) => - p.copy( - child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains))) - case a @ Project(_, e @ Expand(_, _, grandChild)) if !e.outputSet.subsetOf(a.references) => - val newOutput = e.output.filter(a.references.contains(_)) - val newProjects = e.projections.map { proj => - proj.zip(e.output).filter { case (_, a) => - newOutput.contains(a) - }.unzip._1 - } - a.copy(child = Expand(newProjects, newOutput, grandChild)) - - // Prunes the unused columns from child of `DeserializeToObject` - case d @ DeserializeToObject(_, _, child) if !child.outputSet.subsetOf(d.references) => - d.copy(child = prunedChild(child, d.references)) - - // Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation - case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) => - a.copy(child = prunedChild(child, a.references)) - case f @ FlatMapGroupsInPandas(_, _, _, child) if !child.outputSet.subsetOf(f.references) => - f.copy(child = prunedChild(child, f.references)) - case e @ Expand(_, _, child) if !child.outputSet.subsetOf(e.references) => - e.copy(child = prunedChild(child, e.references)) - case s @ ScriptTransformation(_, _, _, child, _) - if !child.outputSet.subsetOf(s.references) => - s.copy(child = prunedChild(child, s.references)) - - // prune unrequired references - case p @ Project(_, g: Generate) if p.references != g.outputSet => - val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references - val newChild = prunedChild(g.child, requiredAttrs) - val unrequired = g.generator.references -- p.references - val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1)) - .map(_._2) - p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)) - - // Eliminate unneeded attributes from right side of a Left Existence Join. - case j @ Join(_, right, LeftExistence(_), _) => - j.copy(right = prunedChild(right, j.references)) - - // all the columns will be used to compare, so we can't prune them - case p @ Project(_, _: SetOperation) => p - case p @ Project(_, _: Distinct) => p - // Eliminate unneeded attributes from children of Union. - case p @ Project(_, u: Union) => - if (!u.outputSet.subsetOf(p.references)) { - val firstChild = u.children.head - val newOutput = prunedChild(firstChild, p.references).output - // pruning the columns of all children based on the pruned first child. - val newChildren = u.children.map { p => - val selected = p.output.zipWithIndex.filter { case (a, i) => - newOutput.contains(firstChild.output(i)) - }.map(_._1) - Project(selected, p) - } - p.copy(child = u.withNewChildren(newChildren)) - } else { - p - } - - // Prune unnecessary window expressions - case p @ Project(_, w: Window) if !w.windowOutputSet.subsetOf(p.references) => - p.copy(child = w.copy( - windowExpressions = w.windowExpressions.filter(p.references.contains))) - - // Can't prune the columns on LeafNode - case p @ Project(_, _: LeafNode) => p - - // for all other logical plans that inherits the output from it's children - // Project over project is handled by the first case, skip it here. - case p @ Project(_, child) if !child.isInstanceOf[Project] => - val required = child.references ++ p.references - if (!child.inputSet.subsetOf(required)) { - val newChildren = child.children.map(c => prunedChild(c, required)) - p.copy(child = child.withNewChildren(newChildren)) - } else { - p - } - }) - - /** Applies a projection only when the child is producing unnecessary attributes */ - private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) = - if (!c.outputSet.subsetOf(allReferences)) { - Project(c.output.filter(allReferences.contains), c) - } else { - c - } - - /** - * The Project before Filter is not necessary but conflict with PushPredicatesThroughProject, - * so remove it. Since the Projects have been added top-down, we need to remove in bottom-up - * order, otherwise lower Projects can be missed. - */ - private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp { - case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child))) - if p2.outputSet.subsetOf(child.outputSet) => - p1.copy(child = f.copy(child = child)) - } -} - /** * Combines two adjacent [[Project]] operators into one and perform alias substitution, * merging the expressions into one single expression. From bf8f1b9a03d48f1feaec1f3ece33b616d29ce074 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 19 Dec 2018 15:54:21 -0800 Subject: [PATCH 2/2] styling --- .../apache/spark/sql/catalyst/optimizer/ColumnPruning.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala index d070ae9390faa..e4aac5ab411ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruning.scala @@ -62,7 +62,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case e @ Expand(_, _, child) if !child.outputSet.subsetOf(e.references) => e.copy(child = prunedChild(child, e.references)) case s @ ScriptTransformation(_, _, _, child, _) - if !child.outputSet.subsetOf(s.references) => + if !child.outputSet.subsetOf(s.references) => s.copy(child = prunedChild(child, s.references)) // prune unrequired references @@ -133,7 +133,7 @@ object ColumnPruning extends Rule[LogicalPlan] { */ private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp { case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child))) - if p2.outputSet.subsetOf(child.outputSet) => + if p2.outputSet.subsetOf(child.outputSet) => p1.copy(child = f.copy(child = child)) } }