From 965969958a6264b3ce7633dbc6620cba7131f9a0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 21 Apr 2020 21:25:26 -0700 Subject: [PATCH 1/7] Add NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST. --- .../apache/spark/sql/internal/SQLConf.scala | 22 ++--- .../sql/execution/DataSourceScanExec.scala | 5 +- .../datasources/DataSourceStrategy.scala | 90 ++++++++++++------- .../datasources/DataSourceUtils.scala | 21 +++++ .../datasources/FileSourceStrategy.scala | 7 +- .../datasources/v2/DataSourceV2Strategy.scala | 19 ++-- .../datasources/v2/PushDownUtils.scala | 3 +- .../v2/V2ScanRelationPushDown.scala | 2 +- .../datasources/DataSourceStrategySuite.scala | 8 +- .../parquet/ParquetFilterSuite.scala | 2 +- 10 files changed, 121 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e6a39669fb80e..fa02af2f47ba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2062,16 +2062,17 @@ object SQLConf { .booleanConf .createWithDefault(true) - val NESTED_PREDICATE_PUSHDOWN_ENABLED = - buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled") - .internal() - .doc("When true, Spark tries to push down predicates for nested columns and or names " + - "containing `dots` to data sources. Currently, Parquet implements both optimizations " + - "while ORC only supports predicates for names containing `dots`. The other data sources" + - "don't support this feature yet.") + val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST = + buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList") + .internal() + .doc("A comma-separated list of data source short names or fully qualified data source " + + "implementation class names for which Spark tries to push down predicates for nested " + + "columns and or names containing `dots` to data sources. Currently, Parquet implements " + + "both optimizations while ORC only supports predicates for names containing `dots`. The " + + "other data sources don't support this feature yet.") .version("3.0.0") - .booleanConf - .createWithDefault(true) + .stringConf + .createWithDefault("parquet,orc") val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED = buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled") @@ -3097,7 +3098,8 @@ class SQLConf extends Serializable with Logging { def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) - def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED) + def nestedPredicatePushdownv1SourceList: String = + getConf(NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) def serializerNestedSchemaPruningEnabled: Boolean = getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 90b08ea558044..fdca7785313a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -326,7 +326,10 @@ case class FileSourceScanExec( } @transient - private lazy val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + private lazy val pushedDownFilters = { + val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) + dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + } override lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a58038d127818..741b889de1d32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -448,60 +448,62 @@ object DataSourceStrategy { } } - private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match { - case expressions.EqualTo(PushableColumn(name), Literal(v, t)) => + private def translateLeafNodeFilter( + predicate: Expression, + pushableColumn: PushableColumnBase): Option[Filter] = predicate match { + case expressions.EqualTo(pushableColumn(name), Literal(v, t)) => Some(sources.EqualTo(name, convertToScala(v, t))) - case expressions.EqualTo(Literal(v, t), PushableColumn(name)) => + case expressions.EqualTo(Literal(v, t), pushableColumn(name)) => Some(sources.EqualTo(name, convertToScala(v, t))) - case expressions.EqualNullSafe(PushableColumn(name), Literal(v, t)) => + case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) => Some(sources.EqualNullSafe(name, convertToScala(v, t))) - case expressions.EqualNullSafe(Literal(v, t), PushableColumn(name)) => + case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) => Some(sources.EqualNullSafe(name, convertToScala(v, t))) - case expressions.GreaterThan(PushableColumn(name), Literal(v, t)) => + case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) => Some(sources.GreaterThan(name, convertToScala(v, t))) - case expressions.GreaterThan(Literal(v, t), PushableColumn(name)) => + case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) => Some(sources.LessThan(name, convertToScala(v, t))) - case expressions.LessThan(PushableColumn(name), Literal(v, t)) => + case expressions.LessThan(pushableColumn(name), Literal(v, t)) => Some(sources.LessThan(name, convertToScala(v, t))) - case expressions.LessThan(Literal(v, t), PushableColumn(name)) => + case expressions.LessThan(Literal(v, t), pushableColumn(name)) => Some(sources.GreaterThan(name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(PushableColumn(name), Literal(v, t)) => + case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) => Some(sources.GreaterThanOrEqual(name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(Literal(v, t), PushableColumn(name)) => + case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) => Some(sources.LessThanOrEqual(name, convertToScala(v, t))) - case expressions.LessThanOrEqual(PushableColumn(name), Literal(v, t)) => + case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) => Some(sources.LessThanOrEqual(name, convertToScala(v, t))) - case expressions.LessThanOrEqual(Literal(v, t), PushableColumn(name)) => + case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) => Some(sources.GreaterThanOrEqual(name, convertToScala(v, t))) - case expressions.InSet(e @ PushableColumn(name), set) => + case expressions.InSet(e @ pushableColumn(name), set) => val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType) Some(sources.In(name, set.toArray.map(toScala))) // Because we only convert In to InSet in Optimizer when there are more than certain // items. So it is possible we still get an In expression here that needs to be pushed // down. - case expressions.In(e @ PushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) => + case expressions.In(e @ pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) => val hSet = list.map(_.eval(EmptyRow)) val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType) Some(sources.In(name, hSet.toArray.map(toScala))) - case expressions.IsNull(PushableColumn(name)) => + case expressions.IsNull(pushableColumn(name)) => Some(sources.IsNull(name)) - case expressions.IsNotNull(PushableColumn(name)) => + case expressions.IsNotNull(pushableColumn(name)) => Some(sources.IsNotNull(name)) - case expressions.StartsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringStartsWith(name, v.toString)) - case expressions.EndsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringEndsWith(name, v.toString)) - case expressions.Contains(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringContains(name, v.toString)) case expressions.Literal(true, BooleanType) => @@ -518,8 +520,9 @@ object DataSourceStrategy { * * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ - protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { - translateFilterWithMapping(predicate, None) + protected[sql] def translateFilter( + predicate: Expression, supportNestedPredicatePushdown: Boolean): Option[Filter] = { + translateFilterWithMapping(predicate, None, supportNestedPredicatePushdown) } /** @@ -529,11 +532,14 @@ object DataSourceStrategy { * @param translatedFilterToExpr An optional map from leaf node filter expressions to its * translated [[Filter]]. The map is used for rebuilding * [[Expression]] from [[Filter]]. + * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. Default is + * disabled. * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ protected[sql] def translateFilterWithMapping( predicate: Expression, - translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]]) + translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]], + nestedPredicatePushdownEnabled: Boolean) : Option[Filter] = { predicate match { case expressions.And(left, right) => @@ -547,21 +553,31 @@ object DataSourceStrategy { // Pushing one leg of AND down is only safe to do at the top level. // You can see ParquetFilters' createFilter for more details. for { - leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) - rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) + leftFilter <- translateFilterWithMapping( + left, translatedFilterToExpr, nestedPredicatePushdownEnabled) + rightFilter <- translateFilterWithMapping( + right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.And(leftFilter, rightFilter) case expressions.Or(left, right) => for { - leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) - rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) + leftFilter <- translateFilterWithMapping( + left, translatedFilterToExpr, nestedPredicatePushdownEnabled) + rightFilter <- translateFilterWithMapping( + right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.Or(leftFilter, rightFilter) case expressions.Not(child) => - translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not) + translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled) + .map(sources.Not) case other => - val filter = translateLeafNodeFilter(other) + val pushableColumn = if (nestedPredicatePushdownEnabled) { + PushableColumnAndNestedColumn + } else { + PushableColumnWithoutNestedColumn + } + val filter = translateLeafNodeFilter(other, pushableColumn) if (filter.isDefined && translatedFilterToExpr.isDefined) { translatedFilterToExpr.get(filter.get) = predicate } @@ -608,8 +624,9 @@ object DataSourceStrategy { // A map from original Catalyst expressions to corresponding translated data source filters. // If a predicate is not in this map, it means it cannot be pushed down. + val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) val translatedMap: Map[Expression, Filter] = predicates.flatMap { p => - translateFilter(p).map(f => p -> f) + translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f) }.toMap val pushedFilters: Seq[Filter] = translatedMap.values.toSeq @@ -650,9 +667,10 @@ object DataSourceStrategy { /** * Find the column name of an expression that can be pushed down. */ -object PushableColumn { +abstract class PushableColumnBase { + val nestedPredicatePushdownEnabled: Boolean + def unapply(e: Expression): Option[String] = { - val nestedPredicatePushdownEnabled = SQLConf.get.nestedPredicatePushdownEnabled import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper def helper(e: Expression): Option[Seq[String]] = e match { case a: Attribute => @@ -668,3 +686,11 @@ object PushableColumn { helper(e).map(_.quoted) } } + +object PushableColumnAndNestedColumn extends PushableColumnBase { + override val nestedPredicatePushdownEnabled = true +} + +object PushableColumnWithoutNestedColumn extends PushableColumnBase { + override val nestedPredicatePushdownEnabled = false +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index bd56635084c34..4d53f598ea443 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,11 +17,17 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ @@ -64,4 +70,19 @@ object DataSourceUtils { private[sql] def isDataFile(fileName: String) = !(fileName.startsWith("_") || fileName.startsWith(".")) + + /** + * Returns if the given relation's V1 datasource provider supports nested predicate pushdown. + */ + private[sql] def supportNestedPredicatePushdown(relation: BaseRelation): Boolean = + relation match { + case hs: HadoopFsRelation => + val supportedDatasources = + SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) + .toLowerCase(Locale.ROOT) + .split(",").map(_.trim) + supportedDatasources.contains(hs.toString) + case _ => false + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f45495121a980..ac856fa452d05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging { // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty) - logInfo(s"Pushed Filters: " + - s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}") + val supportNestedPredicatePushdown = + DataSourceUtils.supportNestedPredicatePushdown(fsRelation) + val pushedFilters = dataFilters + .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}") // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8f4e2d256c714..48723cfc3a718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -179,15 +179,22 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(deleteExpr).map { - filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( - throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) - }.toArray + val filters = splitConjunctivePredicates(deleteExpr) + def transferFilters = + (filters: Seq[Expression], supportNestedPredicatePushdown: Boolean) => { + filters.map { filter => + DataSourceStrategy.translateFilter(deleteExpr, supportNestedPredicatePushdown) + .getOrElse(throw new AnalysisException( + s"Cannot translate expression to source filter: $filter")) + }.toArray + } r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil + OverwriteByExpressionExecV1( + v1, transferFilters(filters, false), writeOptions.asOptions, query) :: Nil case v2 => - OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil + OverwriteByExpressionExec( + v2, transferFilters(filters, true), writeOptions.asOptions, planLater(query)) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 33338b06565c9..1a6f03f54f2e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -48,7 +48,8 @@ object PushDownUtils extends PredicateHelper { for (filterExpr <- filters) { val translated = - DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) + DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr), + nestedPredicatePushdownEnabled = true) if (translated.isEmpty) { untranslatableExprs += filterExpr } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 59089fa6b77e9..b168e848f0b6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -59,7 +59,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { val wrappedScan = scan match { case v1: V1Scan => - val translated = filters.flatMap(DataSourceStrategy.translateFilter) + val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true)) V1ScanWrapper(v1, translated, pushedFilters) case _ => scan } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index a775a97895cfc..07e4840c05c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -289,14 +289,14 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { test("SPARK-31027 test `PushableColumn.unapply` that finds the column name of " + "an expression that can be pushed down") { attrInts.foreach { case (attrInt, colName) => - assert(PushableColumn.unapply(attrInt) === Some(colName)) + assert(PushableColumnAndNestedColumn.unapply(attrInt) === Some(colName)) } attrStrs.foreach { case (attrStr, colName) => - assert(PushableColumn.unapply(attrStr) === Some(colName)) + assert(PushableColumnAndNestedColumn.unapply(attrStr) === Some(colName)) } // `Abs(col)` can not be pushed down, so it returns `None` - assert(PushableColumn.unapply(Abs('col.int)) === None) + assert(PushableColumnAndNestedColumn.unapply(Abs('col.int)) === None) } /** @@ -305,7 +305,7 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { */ def testTranslateFilter(catalystFilter: Expression, result: Option[sources.Filter]): Unit = { assertResult(result) { - DataSourceStrategy.translateFilter(catalystFilter) + DataSourceStrategy.translateFilter(catalystFilter, true) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d1161e33b0941..01d9c436aecf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1656,7 +1656,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, scan: ParquetScan, _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray + val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter(_, true)).toArray val pushedFilters = scan.pushedFilters assert(pushedFilters.nonEmpty, "No filter is pushed down") val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) From e555a1c94d6ec7b1a338015a686af63eaec3c8a9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 28 Apr 2020 01:01:08 -0700 Subject: [PATCH 2/7] Add one test. --- .../datasources/DataSourceStrategySuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index 07e4840c05c6d..b94918eccd46e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -290,9 +290,21 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { "an expression that can be pushed down") { attrInts.foreach { case (attrInt, colName) => assert(PushableColumnAndNestedColumn.unapply(attrInt) === Some(colName)) + + if (colName.contains(".")) { + assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === None) + } else { + assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === Some(colName)) + } } attrStrs.foreach { case (attrStr, colName) => assert(PushableColumnAndNestedColumn.unapply(attrStr) === Some(colName)) + + if (colName.contains(".")) { + assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === None) + } else { + assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === Some(colName)) + } } // `Abs(col)` can not be pushed down, so it returns `None` From 84bc8dd4f7ec1d49ad8631435f8cf212f92efd9b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 29 Apr 2020 01:16:35 -0700 Subject: [PATCH 3/7] Add test. --- .../parquet/ParquetFilterSuite.scala | 101 +++++++++++------- 1 file changed, 61 insertions(+), 40 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index ec2d15688074d..5b145fd58276d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ @@ -1588,47 +1589,67 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { expected: Seq[Row]): Unit = { val output = predicate.collect { case a: Attribute => a }.distinct - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", - // Disable adding filters from constraints because it adds, for instance, - // is-not-null to pushed filters, which makes it hard to test if the pushed - // filter is expected or not (this had to be fixed with SPARK-13495). - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) + Seq(("parquet", true), ("", false)).map { case (pushdownDsList, nestedPredicatePushdown) => + withSQLConf( + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", + // Disable adding filters from constraints because it adds, for instance, + // is-not-null to pushed filters, which makes it hard to test if the pushed + // filter is expected or not (this had to be fixed with SPARK-13495). + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST.key -> pushdownDsList) { + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + val nestedOrAttributes = predicate.collectFirst { + case g: GetStructField => g + case a: Attribute => a + } + assert(nestedOrAttributes.isDefined, "No GetStructField nor Attribute is detected.") + + val parsed = parseColumnPath( + PushableColumnAndNestedColumn.unapply(nestedOrAttributes.get).get) + + val containsNestedColumnOrDot = parsed.length > 1 || parsed(0).contains(".") + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, + LogicalRelation(relation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(relation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + // If predicates contains nested column or dot, we push down the predicates only if + // "parquet" is in `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST`. + if (nestedPredicatePushdown || !containsNestedColumnOrDot) { + assert(selectedFilters.nonEmpty, "No filter is pushed down") + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters) + val pushedParquetFilters = selectedFilters.map { pred => + val maybeFilter = parquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.get + } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + assert(pushedParquetFilters.exists(_.getClass === filterClass), + s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - var maybeRelation: Option[HadoopFsRelation] = None - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, - LogicalRelation(relation: HadoopFsRelation, _, _, _)) => - maybeRelation = Some(relation) - filters - }.flatten.reduceLeftOption(_ && _) - assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - - val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) - assert(selectedFilters.nonEmpty, "No filter is pushed down") - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - val parquetFilters = createParquetFilters(schema) - // In this test suite, all the simple predicates are convertible here. - assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters) - val pushedParquetFilters = selectedFilters.map { pred => - val maybeFilter = parquetFilters.createFilter(pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.get + checker(stripSparkFilter(query), expected) + } else { + assert(selectedFilters.isEmpty, "There is filter pushed down") + } } - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(pushedParquetFilters.exists(_.getClass === filterClass), - s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - - checker(stripSparkFilter(query), expected) } } } From a49b73cb3e7646f8f6c74b64171396eabb1db5a0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Apr 2020 23:22:18 -0700 Subject: [PATCH 4/7] Address comments. --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++------ .../datasources/DataSourceStrategy.scala | 20 +++++++++++-------- .../datasources/DataSourceUtils.scala | 5 ++--- .../datasources/FileSourceStrategy.scala | 2 +- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index eabf104d7e8f4..851e388ecfff9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2064,13 +2064,13 @@ object SQLConf { .createWithDefault(true) val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST = - buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList") + buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedV1Sources") .internal() .doc("A comma-separated list of data source short names or fully qualified data source " + "implementation class names for which Spark tries to push down predicates for nested " + - "columns and or names containing `dots` to data sources. Currently, Parquet implements " + + "columns and/or names containing `dots` to data sources. Currently, Parquet implements " + "both optimizations while ORC only supports predicates for names containing `dots`. The " + - "other data sources don't support this feature yet.") + "other data sources don't support this feature yet. So the default value is 'parquet,orc'.") .version("3.0.0") .stringConf .createWithDefault("parquet,orc") @@ -3099,9 +3099,6 @@ class SQLConf extends Serializable with Logging { def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) - def nestedPredicatePushdownv1SourceList: String = - getConf(NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) - def serializerNestedSchemaPruningEnabled: Boolean = getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 741b889de1d32..23454d7d5e7f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -532,8 +532,7 @@ object DataSourceStrategy { * @param translatedFilterToExpr An optional map from leaf node filter expressions to its * translated [[Filter]]. The map is used for rebuilding * [[Expression]] from [[Filter]]. - * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. Default is - * disabled. + * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ protected[sql] def translateFilterWithMapping( @@ -572,12 +571,7 @@ object DataSourceStrategy { .map(sources.Not) case other => - val pushableColumn = if (nestedPredicatePushdownEnabled) { - PushableColumnAndNestedColumn - } else { - PushableColumnWithoutNestedColumn - } - val filter = translateLeafNodeFilter(other, pushableColumn) + val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled)) if (filter.isDefined && translatedFilterToExpr.isDefined) { translatedFilterToExpr.get(filter.get) = predicate } @@ -687,6 +681,16 @@ abstract class PushableColumnBase { } } +object PushableColumn { + def apply(nestedPredicatePushdownEnabled: Boolean): PushableColumnBase = { + if (nestedPredicatePushdownEnabled) { + PushableColumnAndNestedColumn + } else { + PushableColumnWithoutNestedColumn + } + } +} + object PushableColumnAndNestedColumn extends PushableColumnBase { override val nestedPredicatePushdownEnabled = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index aa85533fb4efb..05d913d280bd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -78,9 +78,8 @@ object DataSourceUtils { relation match { case hs: HadoopFsRelation => val supportedDatasources = - SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) - .toLowerCase(Locale.ROOT) - .split(",").map(_.trim) + Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) + .toLowerCase(Locale.ROOT)) supportedDatasources.contains(hs.toString) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index ac856fa452d05..477937d66ad9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -182,7 +182,7 @@ object FileSourceStrategy extends Strategy with Logging { DataSourceUtils.supportNestedPredicatePushdown(fsRelation) val pushedFilters = dataFilters .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) - logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}") + logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}") // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty) From 17d1094e0725cc9f6363eb0f19f8457fb333b80e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2020 09:00:54 -0700 Subject: [PATCH 5/7] v1 fallback API can support nested predicate pushdown. --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 31d8bb32fe4c7..95c940202e20f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -191,7 +191,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => OverwriteByExpressionExecV1( - v1, transferFilters(filters, false), writeOptions.asOptions, query) :: Nil + v1, transferFilters(filters, true), writeOptions.asOptions, query) :: Nil case v2 => OverwriteByExpressionExec( v2, transferFilters(filters, true), writeOptions.asOptions, planLater(query)) :: Nil From aa32dcc303877caa6de070143c222cc4b9eeb4db Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 4 May 2020 22:57:24 -0700 Subject: [PATCH 6/7] Address comments. --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../spark/sql/execution/datasources/DataSourceUtils.scala | 2 +- .../execution/datasources/v2/DataSourceV2Strategy.scala | 8 ++++---- .../ParquetNestedPredicatePushDownBenchmark.scala | 2 +- .../datasources/parquet/ParquetFilterSuite.scala | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 851e388ecfff9..d824f8640d0fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2063,8 +2063,8 @@ object SQLConf { .booleanConf .createWithDefault(true) - val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST = - buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedV1Sources") + val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST = + buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources") .internal() .doc("A comma-separated list of data source short names or fully qualified data source " + "implementation class names for which Spark tries to push down predicates for nested " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 05d913d280bd6..45a9b1a808cf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -78,7 +78,7 @@ object DataSourceUtils { relation match { case hs: HadoopFsRelation => val supportedDatasources = - Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) + Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST) .toLowerCase(Locale.ROOT)) supportedDatasources.contains(hs.toString) case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 95c940202e20f..036b749644b08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -181,9 +181,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // fail if any filter cannot be converted. correctness depends on removing all matching data. val filters = splitConjunctivePredicates(deleteExpr) def transferFilters = - (filters: Seq[Expression], supportNestedPredicatePushdown: Boolean) => { + (filters: Seq[Expression]) => { filters.map { filter => - DataSourceStrategy.translateFilter(deleteExpr, supportNestedPredicatePushdown) + DataSourceStrategy.translateFilter(deleteExpr, supportNestedPredicatePushdown = true) .getOrElse(throw new AnalysisException( s"Cannot translate expression to source filter: $filter")) }.toArray @@ -191,10 +191,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => OverwriteByExpressionExecV1( - v1, transferFilters(filters, true), writeOptions.asOptions, query) :: Nil + v1, transferFilters(filters), writeOptions.asOptions, query) :: Nil case v2 => OverwriteByExpressionExec( - v2, transferFilters(filters, true), writeOptions.asOptions, planLater(query)) :: Nil + v2, transferFilters(filters), writeOptions.asOptions, planLater(query)) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index ab692f42d2397..d2bd962b50654 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -53,7 +53,7 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { withFilter: DataFrame => DataFrame): Unit = { val loadDF = spark.read.parquet(inputPath) benchmark.addCase(name) { _ => - withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST.key, enableNestedPD)) { + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key, enableNestedPD)) { withFilter(loadDF).noop() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5b145fd58276d..5cf21293fd07f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1601,7 +1601,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { // filter is expected or not (this had to be fixed with SPARK-13495). SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", - SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST.key -> pushdownDsList) { + SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key -> pushdownDsList) { val query = df .select(output.map(e => Column(e)): _*) .where(Column(predicate)) From 00b9d47702ae76fca3c7246155175cb42f75136f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 4 May 2020 23:07:04 -0700 Subject: [PATCH 7/7] Restore previous style. --- .../datasources/v2/DataSourceV2Strategy.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 036b749644b08..cca80c0cb6d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -179,22 +179,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(deleteExpr) - def transferFilters = - (filters: Seq[Expression]) => { - filters.map { filter => - DataSourceStrategy.translateFilter(deleteExpr, supportNestedPredicatePushdown = true) - .getOrElse(throw new AnalysisException( - s"Cannot translate expression to source filter: $filter")) - }.toArray - } + val filters = splitConjunctivePredicates(deleteExpr).map { + filter => DataSourceStrategy.translateFilter(deleteExpr, + supportNestedPredicatePushdown = true).getOrElse( + throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + }.toArray r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => - OverwriteByExpressionExecV1( - v1, transferFilters(filters), writeOptions.asOptions, query) :: Nil + OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil case v2 => - OverwriteByExpressionExec( - v2, transferFilters(filters), writeOptions.asOptions, planLater(query)) :: Nil + OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil } case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>