diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a2576684d6899..d824f8640d0fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2063,16 +2063,17 @@ object SQLConf { .booleanConf .createWithDefault(true) - val NESTED_PREDICATE_PUSHDOWN_ENABLED = - buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled") - .internal() - .doc("When true, Spark tries to push down predicates for nested columns and or names " + - "containing `dots` to data sources. Currently, Parquet implements both optimizations " + - "while ORC only supports predicates for names containing `dots`. The other data sources" + - "don't support this feature yet.") + val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST = + buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources") + .internal() + .doc("A comma-separated list of data source short names or fully qualified data source " + + "implementation class names for which Spark tries to push down predicates for nested " + + "columns and/or names containing `dots` to data sources. Currently, Parquet implements " + + "both optimizations while ORC only supports predicates for names containing `dots`. The " + + "other data sources don't support this feature yet. So the default value is 'parquet,orc'.") .version("3.0.0") - .booleanConf - .createWithDefault(true) + .stringConf + .createWithDefault("parquet,orc") val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED = buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled") @@ -3098,8 +3099,6 @@ class SQLConf extends Serializable with Logging { def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) - def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED) - def serializerNestedSchemaPruningEnabled: Boolean = getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 48fab99db9b76..66996498ffd3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -326,7 +326,10 @@ case class FileSourceScanExec( } @transient - private lazy val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + private lazy val pushedDownFilters = { + val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) + dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + } override lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a58038d127818..23454d7d5e7f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -448,60 +448,62 @@ object DataSourceStrategy { } } - private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match { - case expressions.EqualTo(PushableColumn(name), Literal(v, t)) => + private def translateLeafNodeFilter( + predicate: Expression, + pushableColumn: PushableColumnBase): Option[Filter] = predicate match { + case expressions.EqualTo(pushableColumn(name), Literal(v, t)) => Some(sources.EqualTo(name, convertToScala(v, t))) - case expressions.EqualTo(Literal(v, t), PushableColumn(name)) => + case expressions.EqualTo(Literal(v, t), pushableColumn(name)) => Some(sources.EqualTo(name, convertToScala(v, t))) - case expressions.EqualNullSafe(PushableColumn(name), Literal(v, t)) => + case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) => Some(sources.EqualNullSafe(name, convertToScala(v, t))) - case expressions.EqualNullSafe(Literal(v, t), PushableColumn(name)) => + case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) => Some(sources.EqualNullSafe(name, convertToScala(v, t))) - case expressions.GreaterThan(PushableColumn(name), Literal(v, t)) => + case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) => Some(sources.GreaterThan(name, convertToScala(v, t))) - case expressions.GreaterThan(Literal(v, t), PushableColumn(name)) => + case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) => Some(sources.LessThan(name, convertToScala(v, t))) - case expressions.LessThan(PushableColumn(name), Literal(v, t)) => + case expressions.LessThan(pushableColumn(name), Literal(v, t)) => Some(sources.LessThan(name, convertToScala(v, t))) - case expressions.LessThan(Literal(v, t), PushableColumn(name)) => + case expressions.LessThan(Literal(v, t), pushableColumn(name)) => Some(sources.GreaterThan(name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(PushableColumn(name), Literal(v, t)) => + case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) => Some(sources.GreaterThanOrEqual(name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(Literal(v, t), PushableColumn(name)) => + case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) => Some(sources.LessThanOrEqual(name, convertToScala(v, t))) - case expressions.LessThanOrEqual(PushableColumn(name), Literal(v, t)) => + case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) => Some(sources.LessThanOrEqual(name, convertToScala(v, t))) - case expressions.LessThanOrEqual(Literal(v, t), PushableColumn(name)) => + case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) => Some(sources.GreaterThanOrEqual(name, convertToScala(v, t))) - case expressions.InSet(e @ PushableColumn(name), set) => + case expressions.InSet(e @ pushableColumn(name), set) => val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType) Some(sources.In(name, set.toArray.map(toScala))) // Because we only convert In to InSet in Optimizer when there are more than certain // items. So it is possible we still get an In expression here that needs to be pushed // down. - case expressions.In(e @ PushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) => + case expressions.In(e @ pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) => val hSet = list.map(_.eval(EmptyRow)) val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType) Some(sources.In(name, hSet.toArray.map(toScala))) - case expressions.IsNull(PushableColumn(name)) => + case expressions.IsNull(pushableColumn(name)) => Some(sources.IsNull(name)) - case expressions.IsNotNull(PushableColumn(name)) => + case expressions.IsNotNull(pushableColumn(name)) => Some(sources.IsNotNull(name)) - case expressions.StartsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringStartsWith(name, v.toString)) - case expressions.EndsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringEndsWith(name, v.toString)) - case expressions.Contains(PushableColumn(name), Literal(v: UTF8String, StringType)) => + case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) => Some(sources.StringContains(name, v.toString)) case expressions.Literal(true, BooleanType) => @@ -518,8 +520,9 @@ object DataSourceStrategy { * * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ - protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { - translateFilterWithMapping(predicate, None) + protected[sql] def translateFilter( + predicate: Expression, supportNestedPredicatePushdown: Boolean): Option[Filter] = { + translateFilterWithMapping(predicate, None, supportNestedPredicatePushdown) } /** @@ -529,11 +532,13 @@ object DataSourceStrategy { * @param translatedFilterToExpr An optional map from leaf node filter expressions to its * translated [[Filter]]. The map is used for rebuilding * [[Expression]] from [[Filter]]. + * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ protected[sql] def translateFilterWithMapping( predicate: Expression, - translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]]) + translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]], + nestedPredicatePushdownEnabled: Boolean) : Option[Filter] = { predicate match { case expressions.And(left, right) => @@ -547,21 +552,26 @@ object DataSourceStrategy { // Pushing one leg of AND down is only safe to do at the top level. // You can see ParquetFilters' createFilter for more details. for { - leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) - rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) + leftFilter <- translateFilterWithMapping( + left, translatedFilterToExpr, nestedPredicatePushdownEnabled) + rightFilter <- translateFilterWithMapping( + right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.And(leftFilter, rightFilter) case expressions.Or(left, right) => for { - leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) - rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) + leftFilter <- translateFilterWithMapping( + left, translatedFilterToExpr, nestedPredicatePushdownEnabled) + rightFilter <- translateFilterWithMapping( + right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.Or(leftFilter, rightFilter) case expressions.Not(child) => - translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not) + translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled) + .map(sources.Not) case other => - val filter = translateLeafNodeFilter(other) + val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled)) if (filter.isDefined && translatedFilterToExpr.isDefined) { translatedFilterToExpr.get(filter.get) = predicate } @@ -608,8 +618,9 @@ object DataSourceStrategy { // A map from original Catalyst expressions to corresponding translated data source filters. // If a predicate is not in this map, it means it cannot be pushed down. + val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) val translatedMap: Map[Expression, Filter] = predicates.flatMap { p => - translateFilter(p).map(f => p -> f) + translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f) }.toMap val pushedFilters: Seq[Filter] = translatedMap.values.toSeq @@ -650,9 +661,10 @@ object DataSourceStrategy { /** * Find the column name of an expression that can be pushed down. */ -object PushableColumn { +abstract class PushableColumnBase { + val nestedPredicatePushdownEnabled: Boolean + def unapply(e: Expression): Option[String] = { - val nestedPredicatePushdownEnabled = SQLConf.get.nestedPredicatePushdownEnabled import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper def helper(e: Expression): Option[Seq[String]] = e match { case a: Attribute => @@ -668,3 +680,21 @@ object PushableColumn { helper(e).map(_.quoted) } } + +object PushableColumn { + def apply(nestedPredicatePushdownEnabled: Boolean): PushableColumnBase = { + if (nestedPredicatePushdownEnabled) { + PushableColumnAndNestedColumn + } else { + PushableColumnWithoutNestedColumn + } + } +} + +object PushableColumnAndNestedColumn extends PushableColumnBase { + override val nestedPredicatePushdownEnabled = true +} + +object PushableColumnWithoutNestedColumn extends PushableColumnBase { + override val nestedPredicatePushdownEnabled = false +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index b19de6d3deb89..45a9b1a808cf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -24,6 +26,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -68,6 +71,19 @@ object DataSourceUtils { private[sql] def isDataFile(fileName: String) = !(fileName.startsWith("_") || fileName.startsWith(".")) + /** + * Returns if the given relation's V1 datasource provider supports nested predicate pushdown. + */ + private[sql] def supportNestedPredicatePushdown(relation: BaseRelation): Boolean = + relation match { + case hs: HadoopFsRelation => + val supportedDatasources = + Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST) + .toLowerCase(Locale.ROOT)) + supportedDatasources.contains(hs.toString) + case _ => false + } + def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = { if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { return Some(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f45495121a980..477937d66ad9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging { // Partition keys are not available in the statistics of the files. val dataFilters = normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty) - logInfo(s"Pushed Filters: " + - s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}") + val supportNestedPredicatePushdown = + DataSourceUtils.supportNestedPredicatePushdown(fsRelation) + val pushedFilters = dataFilters + .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}") // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8f4e2d256c714..cca80c0cb6d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -180,8 +180,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) => // fail if any filter cannot be converted. correctness depends on removing all matching data. val filters = splitConjunctivePredicates(deleteExpr).map { - filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( - throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + filter => DataSourceStrategy.translateFilter(deleteExpr, + supportNestedPredicatePushdown = true).getOrElse( + throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray r.table.asWritable match { case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => @@ -205,7 +206,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // correctness depends on removing all matching data. val filters = DataSourceStrategy.normalizeExprs(condition.toSeq, output) .flatMap(splitConjunctivePredicates(_).map { - f => DataSourceStrategy.translateFilter(f).getOrElse( + f => DataSourceStrategy.translateFilter(f, true).getOrElse( throw new AnalysisException(s"Exec update failed:" + s" cannot translate expression to source filter: $f")) }).toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 33338b06565c9..1a6f03f54f2e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -48,7 +48,8 @@ object PushDownUtils extends PredicateHelper { for (filterExpr <- filters) { val translated = - DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) + DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr), + nestedPredicatePushdownEnabled = true) if (translated.isEmpty) { untranslatableExprs += filterExpr } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 59089fa6b77e9..b168e848f0b6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -59,7 +59,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { val wrappedScan = scan match { case v1: V1Scan => - val translated = filters.flatMap(DataSourceStrategy.translateFilter) + val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true)) V1ScanWrapper(v1, translated, pushedFilters) case _ => scan } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala index 11bc91a4b1551..d2bd962b50654 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -48,12 +48,12 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { private def addCase( benchmark: Benchmark, inputPath: String, - enableNestedPD: Boolean, + enableNestedPD: String, name: String, withFilter: DataFrame => DataFrame): Unit = { val loadDF = spark.read.parquet(inputPath) benchmark.addCase(name) { _ => - withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) { + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key, enableNestedPD)) { withFilter(loadDF).noop() } } @@ -67,13 +67,13 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { addCase( benchmark, outputPath, - enableNestedPD = false, + enableNestedPD = "", "Without nested predicate Pushdown", withFilter) addCase( benchmark, outputPath, - enableNestedPD = true, + enableNestedPD = "parquet", "With nested predicate Pushdown", withFilter) benchmark.run() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index a775a97895cfc..b94918eccd46e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -289,14 +289,26 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { test("SPARK-31027 test `PushableColumn.unapply` that finds the column name of " + "an expression that can be pushed down") { attrInts.foreach { case (attrInt, colName) => - assert(PushableColumn.unapply(attrInt) === Some(colName)) + assert(PushableColumnAndNestedColumn.unapply(attrInt) === Some(colName)) + + if (colName.contains(".")) { + assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === None) + } else { + assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === Some(colName)) + } } attrStrs.foreach { case (attrStr, colName) => - assert(PushableColumn.unapply(attrStr) === Some(colName)) + assert(PushableColumnAndNestedColumn.unapply(attrStr) === Some(colName)) + + if (colName.contains(".")) { + assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === None) + } else { + assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === Some(colName)) + } } // `Abs(col)` can not be pushed down, so it returns `None` - assert(PushableColumn.unapply(Abs('col.int)) === None) + assert(PushableColumnAndNestedColumn.unapply(Abs('col.int)) === None) } /** @@ -305,7 +317,7 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { */ def testTranslateFilter(catalystFilter: Expression, result: Option[sources.Filter]): Unit = { assertResult(result) { - DataSourceStrategy.translateFilter(catalystFilter) + DataSourceStrategy.translateFilter(catalystFilter, true) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 20bfb32e2c0d4..5cf21293fd07f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ @@ -1588,47 +1589,67 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { expected: Seq[Row]): Unit = { val output = predicate.collect { case a: Attribute => a }.distinct - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", - SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", - // Disable adding filters from constraints because it adds, for instance, - // is-not-null to pushed filters, which makes it hard to test if the pushed - // filter is expected or not (this had to be fixed with SPARK-13495). - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) + Seq(("parquet", true), ("", false)).map { case (pushdownDsList, nestedPredicatePushdown) => + withSQLConf( + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true", + // Disable adding filters from constraints because it adds, for instance, + // is-not-null to pushed filters, which makes it hard to test if the pushed + // filter is expected or not (this had to be fixed with SPARK-13495). + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> InferFiltersFromConstraints.ruleName, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key -> pushdownDsList) { + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + val nestedOrAttributes = predicate.collectFirst { + case g: GetStructField => g + case a: Attribute => a + } + assert(nestedOrAttributes.isDefined, "No GetStructField nor Attribute is detected.") + + val parsed = parseColumnPath( + PushableColumnAndNestedColumn.unapply(nestedOrAttributes.get).get) + + val containsNestedColumnOrDot = parsed.length > 1 || parsed(0).contains(".") + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, + LogicalRelation(relation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(relation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + // If predicates contains nested column or dot, we push down the predicates only if + // "parquet" is in `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST`. + if (nestedPredicatePushdown || !containsNestedColumnOrDot) { + assert(selectedFilters.nonEmpty, "No filter is pushed down") + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters) + val pushedParquetFilters = selectedFilters.map { pred => + val maybeFilter = parquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.get + } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + assert(pushedParquetFilters.exists(_.getClass === filterClass), + s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - var maybeRelation: Option[HadoopFsRelation] = None - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, - LogicalRelation(relation: HadoopFsRelation, _, _, _)) => - maybeRelation = Some(relation) - filters - }.flatten.reduceLeftOption(_ && _) - assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - - val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) - assert(selectedFilters.nonEmpty, "No filter is pushed down") - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - val parquetFilters = createParquetFilters(schema) - // In this test suite, all the simple predicates are convertible here. - assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters) - val pushedParquetFilters = selectedFilters.map { pred => - val maybeFilter = parquetFilters.createFilter(pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.get + checker(stripSparkFilter(query), expected) + } else { + assert(selectedFilters.isEmpty, "There is filter pushed down") + } } - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(pushedParquetFilters.exists(_.getClass === filterClass), - s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - - checker(stripSparkFilter(query), expected) } } } @@ -1667,7 +1688,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, scan: ParquetScan, _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray + val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter(_, true)).toArray val pushedFilters = scan.pushedFilters assert(pushedFilters.nonEmpty, "No filter is pushed down") val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)