From ae60ab38e729c977cac83e956c553e53254dd01f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 23 May 2014 18:57:29 +0800 Subject: [PATCH 1/3] [SPARK-1913] Attributes referenced only in predicates pushed down should remain in ParquetTableScan operator --- .../org/apache/spark/sql/SQLContext.scala | 9 ++++- .../spark/sql/execution/SparkStrategies.scala | 36 +++++++++---------- .../spark/sql/parquet/ParquetQuerySuite.scala | 6 +++- .../spark/sql/hive/HiveStrategies.scala | 1 + 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bfebfa0c28c52..cb4f109552597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,17 +206,24 @@ class SQLContext(@transient val sparkContext: SparkContext) * final desired output requires complex expressions to be evaluated or when columns can be * further eliminated out after filtering has been done. * + * The `prunePushedDownFilter` is used to remove those filters that can be removed by the filter + * pushdown optimization. + * * The required attributes for both filtering and expression evaluation are passed to the * provided `scanBuilder` function so that it can avoid unnecessary column materialization. */ def pruneFilterProject( projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], + prunePushedDownFilter: Option[Expression => Boolean], scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { val projectSet = projectList.flatMap(_.references).toSet val filterSet = filterPredicates.flatMap(_.references).toSet - val filterCondition = filterPredicates.reduceLeftOption(And) + val filterCondition = prunePushedDownFilter + .map(filterPredicates.filter) + .getOrElse(filterPredicates) + .reduceLeftOption(And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 394a59700dbaf..4444b821c491e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,31 +141,27 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { - val remainingFilters = + val prunePushedDownFilter = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - filters.filter { - // Note: filters cannot be pushed down to Parquet if they contain more complex - // expressions than simple "Attribute cmp Literal" comparisons. Here we remove - // all filters that have been pushed down. Note that a predicate such as - // "(A AND B) OR C" can result in "A OR C" being pushed down. - filter => - val recordFilter = ParquetFilters.createFilter(filter) - if (!recordFilter.isDefined) { - // First case: the pushdown did not result in any record filter. - true - } else { - // Second case: a record filter was created; here we are conservative in - // the sense that even if "A" was pushed and we check for "A AND B" we - // still want to keep "A AND B" in the higher-level filter, not just "B". - !ParquetFilters.findExpression(recordFilter.get, filter).isDefined - } - } + Some((filter: Expression) => { + val recordFilter = ParquetFilters.createFilter(filter) + if (!recordFilter.isDefined) { + // First case: the pushdown did not result in any record filter. + true + } else { + // Second case: a record filter was created; here we are conservative in + // the sense that even if "A" was pushed and we check for "A AND B" we + // still want to keep "A AND B" in the higher-level filter, not just "B". + !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + } + }) } else { - filters + None } pruneFilterProject( projectList, - remainingFilters, + filters, + prunePushedDownFilter, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 65f4c17aeee3a..f9731e82e4924 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -358,5 +358,9 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(stringResult(0).getString(2) == "100", "stringvalue incorrect") assert(stringResult(0).getInt(1) === 100) } -} + test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { + val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10") + assert(query.collect().size === 10) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b2157074a41bf..ae8f8e728c54b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -69,6 +69,7 @@ private[hive] trait HiveStrategies { pruneFilterProject( projectList, otherPredicates, + None, HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil case _ => Nil From f5b257dc7830b663f1a52c37371e0821da0a684b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 23 May 2014 19:25:26 +0800 Subject: [PATCH 2/3] Added back comments deleted by mistake --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4444b821c491e..4113a4c64c267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -143,6 +143,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { val prunePushedDownFilter = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "(A AND B) OR C" can result in "A OR C" being pushed down. Some((filter: Expression) => { val recordFilter = ParquetFilters.createFilter(filter) if (!recordFilter.isDefined) { From f976b73daaa63f692d224174dcd8a9f8ccf8c0f4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 24 May 2014 18:43:56 +0800 Subject: [PATCH 3/3] Addessed the readability issue commented by @rxin --- .../org/apache/spark/sql/SQLContext.scala | 11 ++---- .../spark/sql/execution/SparkStrategies.scala | 38 ++++++++++--------- .../spark/sql/hive/HiveStrategies.scala | 2 +- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cb4f109552597..043be58edc91b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,8 +206,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * final desired output requires complex expressions to be evaluated or when columns can be * further eliminated out after filtering has been done. * - * The `prunePushedDownFilter` is used to remove those filters that can be removed by the filter - * pushdown optimization. + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. * * The required attributes for both filtering and expression evaluation are passed to the * provided `scanBuilder` function so that it can avoid unnecessary column materialization. @@ -215,15 +215,12 @@ class SQLContext(@transient val sparkContext: SparkContext) def pruneFilterProject( projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], - prunePushedDownFilter: Option[Expression => Boolean], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { val projectSet = projectList.flatMap(_.references).toSet val filterSet = filterPredicates.flatMap(_.references).toSet - val filterCondition = prunePushedDownFilter - .map(filterPredicates.filter) - .getOrElse(filterPredicates) - .reduceLeftOption(And) + val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4113a4c64c267..cfa8bdae58b11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,31 +141,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { - val prunePushedDownFilter = + val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - // Note: filters cannot be pushed down to Parquet if they contain more complex - // expressions than simple "Attribute cmp Literal" comparisons. Here we remove - // all filters that have been pushed down. Note that a predicate such as - // "(A AND B) OR C" can result in "A OR C" being pushed down. - Some((filter: Expression) => { - val recordFilter = ParquetFilters.createFilter(filter) - if (!recordFilter.isDefined) { - // First case: the pushdown did not result in any record filter. - true - } else { - // Second case: a record filter was created; here we are conservative in - // the sense that even if "A" was pushed and we check for "A AND B" we - // still want to keep "A AND B" in the higher-level filter, not just "B". - !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + (filters: Seq[Expression]) => { + filters.filter { filter => + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "(A AND B) OR C" can result in "A OR C" being pushed down. + val recordFilter = ParquetFilters.createFilter(filter) + if (!recordFilter.isDefined) { + // First case: the pushdown did not result in any record filter. + true + } else { + // Second case: a record filter was created; here we are conservative in + // the sense that even if "A" was pushed and we check for "A AND B" we + // still want to keep "A AND B" in the higher-level filter, not just "B". + !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + } } - }) + } } else { - None + identity[Seq[Expression]] _ } pruneFilterProject( projectList, filters, - prunePushedDownFilter, + prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ae8f8e728c54b..8b51957162e04 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -69,7 +69,7 @@ private[hive] trait HiveStrategies { pruneFilterProject( projectList, otherPredicates, - None, + identity[Seq[Expression]], HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil case _ => Nil