From eeca939bc597e3b53b87c449b3f25359ef0a710e Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 00:38:46 +0200 Subject: [PATCH 1/9] add support for pushing data filters to file listing --- .../apache/spark/sql/v2/avro/AvroScan.scala | 6 ++++- .../PruneFileSourcePartitions.scala | 23 ++++++++++++------- .../execution/datasources/v2/FileScan.scala | 16 +++++++++++-- .../datasources/v2/csv/CSVScan.scala | 6 ++++- .../datasources/v2/json/JsonScan.scala | 6 ++++- .../datasources/v2/orc/OrcScan.scala | 6 ++++- .../datasources/v2/parquet/ParquetScan.scala | 6 ++++- .../datasources/v2/text/TextScan.scala | 6 ++++- 8 files changed, 59 insertions(+), 16 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index bb840e69d99a3..5516c736e94ed 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -36,7 +36,8 @@ case class AvroScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression] = Seq.empty) extends FileScan { + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { @@ -54,6 +55,9 @@ case class AvroScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) + override def equals(obj: Any): Boolean = obj match { case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 7fd154ccac445..e43662f37ccbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -28,20 +28,25 @@ import org.apache.spark.sql.types.StructType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { - private def getPartitionKeyFilters( + private def getPartitionKeyFiltersAndDataFilters( sparkSession: SparkSession, relation: LeafNode, partitionSchema: StructType, filters: Seq[Expression], - output: Seq[AttributeReference]): ExpressionSet = { + output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = { val normalizedFilters = DataSourceStrategy.normalizeExprs( filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), output) val partitionColumns = relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) - ExpressionSet(normalizedFilters.filter { f => + val partitionKeyFilters = ExpressionSet(normalizedFilters.filter { f => f.references.subsetOf(partitionSet) }) + + val dataFilters = + normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) + + (partitionKeyFilters, dataFilters) } private def rebuildPhysicalOperation( @@ -72,7 +77,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _)) if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => - val partitionKeyFilters = getPartitionKeyFilters( + val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( fsRelation.sparkSession, logicalRelation, partitionSchema, filters, logicalRelation.output) if (partitionKeyFilters.nonEmpty) { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) @@ -92,11 +97,13 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { case op @ PhysicalOperation(projects, filters, v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output)) if filters.nonEmpty && scan.readDataSchema.nonEmpty => - val partitionKeyFilters = getPartitionKeyFilters(scan.sparkSession, - v2Relation, scan.readPartitionSchema, filters, output) - if (partitionKeyFilters.nonEmpty) { + val (partitionKeyFilters, dataFilters) = + getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation, + scan.readPartitionSchema, filters, output) + if (partitionKeyFilters.nonEmpty || dataFilters.nonEmpty) { val prunedV2Relation = - v2Relation.copy(scan = scan.withPartitionFilters(partitionKeyFilters.toSeq)) + v2Relation.copy(scan = scan.withPartitionFilters(partitionKeyFilters.toSeq) + .withDataFilters(dataFilters)) // The pushed down partition filters don't need to be reevaluated. val afterScanFilters = ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index a22e1ccfe4515..01458d63cd463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -65,6 +65,16 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin */ def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan + /** + * Returns the filters that can be use for file listing + */ + def dataFilters: Seq[Expression] + + /** + * Create a new `FileScan` instance from the current one with different `dataFilters`. + */ + def withDataFilters(dataFilters: Seq[Expression]): FileScan + /** * If a file with `path` is unsplittable, return the unsplittable reason, * otherwise return `None`. @@ -79,7 +89,8 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def equals(obj: Any): Boolean = obj match { case f: FileScan => fileIndex == f.fileIndex && readSchema == f.readSchema - ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) + ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && + ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters) case _ => false } @@ -92,6 +103,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin val metadata: Map[String, String] = Map( "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), + "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) val metadataStr = metadata.toSeq.sorted.map { case (key, value) => @@ -103,7 +115,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } protected def partitions: Seq[FilePartition] = { - val selectedPartitions = fileIndex.listFiles(partitionFilters, Seq.empty) + val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val partitionAttributes = fileIndex.partitionSchema.toAttributes val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 78b04aa811e09..cc22036bb7a45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -38,7 +38,8 @@ case class CSVScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression] = Seq.empty) + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends TextBasedFileScan(sparkSession, options) { private lazy val parsedOptions: CSVOptions = new CSVOptions( @@ -92,6 +93,9 @@ case class CSVScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) + override def equals(obj: Any): Boolean = obj match { case c: CSVScan => super.equals(c) && dataSchema == c.dataSchema && options == c.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 153b402476c40..42b4c6a63b6d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -39,7 +39,8 @@ case class JsonScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression] = Seq.empty) + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends TextBasedFileScan(sparkSession, options) { private val parsedOptions = new JSONOptionsInRead( @@ -91,6 +92,9 @@ case class JsonScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) + override def equals(obj: Any): Boolean = obj match { case j: JsonScan => super.equals(j) && dataSchema == j.dataSchema && options == j.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index f0595cb6d09c3..9f582c62624e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -38,7 +38,8 @@ case class OrcScan( readPartitionSchema: StructType, options: CaseInsensitiveStringMap, pushedFilters: Array[Filter], - partitionFilters: Seq[Expression] = Seq.empty) extends FileScan { + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { @@ -66,4 +67,7 @@ case class OrcScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 44179e2e42a4c..d2db3813db2df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -41,7 +41,8 @@ case class ParquetScan( readPartitionSchema: StructType, pushedFilters: Array[Filter], options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression] = Seq.empty) extends FileScan { + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends FileScan { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { @@ -94,4 +95,7 @@ case class ParquetScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index cf6595e5c126c..bb0d480867b70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -36,7 +36,8 @@ case class TextScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression] = Seq.empty) + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) extends TextBasedFileScan(sparkSession, options) { private val optionsAsScala = options.asScala.toMap @@ -73,6 +74,9 @@ case class TextScan( override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters) + override def withDataFilters(dataFilters: Seq[Expression]): FileScan = + this.copy(dataFilters = dataFilters) + override def equals(obj: Any): Boolean = obj match { case t: TextScan => super.equals(t) && options == t.options From 1a65933530c7816ec55fe1cb3a595a5d1e902c54 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 01:00:28 +0200 Subject: [PATCH 2/9] modify tests --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 1 + .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 3f2744014c199..207816ec30031 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1544,6 +1544,7 @@ class AvroV2Suite extends AvroSuite { } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) + assert(fileScan.get.dataFilters.nonEmpty) assert(fileScan.get.planInputPartitions().forall { partition => partition.asInstanceOf[FilePartition].files.forall { file => file.filePath.contains("p1=1") && file.filePath.contains("p2=2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index a66ba0958bc14..bce83464fd07e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -772,6 +772,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) + assert(fileScan.get.dataFilters.nonEmpty) assert(fileScan.get.planInputPartitions().forall { partition => partition.asInstanceOf[FilePartition].files.forall { file => file.filePath.contains("p1=1") && file.filePath.contains("p2=2") From 67d501a0ec7b4360a41bf5916c9a8837401e39bf Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 02:22:07 +0200 Subject: [PATCH 3/9] fix per review --- .../scala/org/apache/spark/sql/v2/avro/AvroScan.scala | 8 +++----- .../datasources/PruneFileSourcePartitions.scala | 3 +-- .../spark/sql/execution/datasources/v2/FileScan.scala | 10 +++------- .../sql/execution/datasources/v2/csv/CSVScan.scala | 8 +++----- .../sql/execution/datasources/v2/json/JsonScan.scala | 8 +++----- .../sql/execution/datasources/v2/orc/OrcScan.scala | 8 +++----- .../execution/datasources/v2/parquet/ParquetScan.scala | 8 +++----- .../sql/execution/datasources/v2/text/TextScan.scala | 8 +++----- 8 files changed, 22 insertions(+), 39 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index 5516c736e94ed..4308d25e5d93d 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -52,11 +52,9 @@ case class AvroScan( dataSchema, readDataSchema, readPartitionSchema, caseSensitiveMap) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index e43662f37ccbb..53ede39f57d36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -102,8 +102,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { scan.readPartitionSchema, filters, output) if (partitionKeyFilters.nonEmpty || dataFilters.nonEmpty) { val prunedV2Relation = - v2Relation.copy(scan = scan.withPartitionFilters(partitionKeyFilters.toSeq) - .withDataFilters(dataFilters)) + v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters)) // The pushed down partition filters don't need to be reevaluated. val afterScanFilters = ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 01458d63cd463..9a959dc9d6824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -60,20 +60,16 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin */ def partitionFilters: Seq[Expression] - /** - * Create a new `FileScan` instance from the current one with different `partitionFilters`. - */ - def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan - /** * Returns the filters that can be use for file listing */ def dataFilters: Seq[Expression] /** - * Create a new `FileScan` instance from the current one with different `dataFilters`. + * Create a new `FileScan` instance from the current one + * with different `partitionFilters` and `dataFilters` */ - def withDataFilters(dataFilters: Seq[Expression]): FileScan + def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan /** * If a file with `path` is unsplittable, return the unsplittable reason, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index cc22036bb7a45..2b8f03d323340 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -90,11 +90,9 @@ case class CSVScan( dataSchema, readDataSchema, readPartitionSchema, parsedOptions) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { case c: CSVScan => super.equals(c) && dataSchema == c.dataSchema && options == c.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 42b4c6a63b6d2..75231625676ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -89,11 +89,9 @@ case class JsonScan( dataSchema, readDataSchema, readPartitionSchema, parsedOptions) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { case j: JsonScan => super.equals(j) && dataSchema == j.dataSchema && options == j.options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 9f582c62624e7..62894fa7a2538 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -65,9 +65,7 @@ case class OrcScan( super.description() + ", PushedFilters: " + seqToString(pushedFilters) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index d2db3813db2df..bb315262a8211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -93,9 +93,7 @@ case class ParquetScan( super.description() + ", PushedFilters: " + seqToString(pushedFilters) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index bb0d480867b70..e75de2c4a4079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -71,11 +71,9 @@ case class TextScan( readPartitionSchema, textOptions) } - override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan = - this.copy(partitionFilters = partitionFilters) - - override def withDataFilters(dataFilters: Seq[Expression]): FileScan = - this.copy(dataFilters = dataFilters) + override def withFilters( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { case t: TextScan => super.equals(t) && options == t.options From d056350b250eb04a933938b7eaa6ca18210a4901 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 02:23:56 +0200 Subject: [PATCH 4/9] add test --- .../spark/sql/FileBasedDataSourceSuite.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index bce83464fd07e..518a1beaa1955 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -784,6 +784,40 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } } + test("File source v2: support passing data filters to FileScan") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + allFileBasedDataSources.foreach { format => + withTempPath { dir => + Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) + .toDF("value", "col1", "col2") + .write + .format(format) + .option("header", true) + .save(dir.getCanonicalPath) + val df = spark + .read + .format(format) + .option("header", true) + .load(dir.getCanonicalPath) + .where("col1 = 1 and col2 = 2 and value != \"a\"") + + val filterCondition = df.queryExecution.optimizedPlan.collectFirst { + case f: Filter => f.condition + } + assert(filterCondition.isDefined) + + val fileScan = df.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: FileScan) => f + } + assert(fileScan.nonEmpty) + assert(fileScan.get.partitionFilters.isEmpty) + assert(fileScan.get.dataFilters.nonEmpty) + checkAnswer(df, Row("b", 1, 2)) + } + } + } + } + test("File table location should include both values of option `path` and `paths`") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { withTempPaths(3) { paths => From 689199bfbd3991a8a2921d8f2743c0efad8fd9fb Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 02:26:50 +0200 Subject: [PATCH 5/9] minor comment update --- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 9a959dc9d6824..6e05aa56f4f72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -61,7 +61,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin def partitionFilters: Seq[Expression] /** - * Returns the filters that can be use for file listing + * Returns the data filters that can be use for file listing */ def dataFilters: Seq[Expression] From 0915b54e1b59f6bcbc9fde1d0e211b76bd5b5646 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 11:20:29 +0200 Subject: [PATCH 6/9] minor fix + fix failing tests and adding avro test --- .../org/apache/spark/sql/avro/AvroSuite.scala | 30 +++++++++++++++++++ .../PruneFileSourcePartitions.scala | 3 +- .../spark/sql/FileBasedDataSourceSuite.scala | 9 +++--- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 207816ec30031..0938dd0d82ea8 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1554,6 +1554,36 @@ class AvroV2Suite extends AvroSuite { } } + test("Avro source v2: support passing data filters to FileScan without partitionFilters") { + withTempPath { dir => + Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) + .toDF("value", "p1", "p2") + .write + .format("avro") + .option("header", true) + .save(dir.getCanonicalPath) + val df = spark + .read + .format("avro") + .option("header", true) + .load(dir.getCanonicalPath) + .where("value = 'a'") + + val filterCondition = df.queryExecution.optimizedPlan.collectFirst { + case f: Filter => f.condition + } + assert(filterCondition.isDefined) + + val fileScan = df.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: AvroScan) => f + } + assert(fileScan.nonEmpty) + assert(fileScan.get.partitionFilters.isEmpty) + assert(fileScan.get.dataFilters.nonEmpty) + checkAnswer(df, Row("a", 1, 2)) + } + } + private def getBatchScanExec(plan: SparkPlan): BatchScanExec = { plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 53ede39f57d36..007ad233796cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -100,7 +100,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val (partitionKeyFilters, dataFilters) = getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation, scan.readPartitionSchema, filters, output) - if (partitionKeyFilters.nonEmpty || dataFilters.nonEmpty) { + // The dataFilters are pushed down only once + if ((partitionKeyFilters.nonEmpty || dataFilters.nonEmpty) && scan.dataFilters.isEmpty) { val prunedV2Relation = v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters)) // The pushed down partition filters don't need to be reevaluated. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 518a1beaa1955..9a96a37d35cea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -784,14 +784,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } } - test("File source v2: support passing data filters to FileScan") { + test("File source v2: support passing data filters to FileScan without partitionFilters") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { allFileBasedDataSources.foreach { format => withTempPath { dir => Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) - .toDF("value", "col1", "col2") + .toDF("value", "p1", "p2") .write .format(format) + .partitionBy("p1", "p2") .option("header", true) .save(dir.getCanonicalPath) val df = spark @@ -799,7 +800,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { .format(format) .option("header", true) .load(dir.getCanonicalPath) - .where("col1 = 1 and col2 = 2 and value != \"a\"") + .where("value = 'a'") val filterCondition = df.queryExecution.optimizedPlan.collectFirst { case f: Filter => f.condition @@ -812,7 +813,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) assert(fileScan.get.dataFilters.nonEmpty) - checkAnswer(df, Row("b", 1, 2)) + checkAnswer(df, Row("a", 1, 2)) } } } From 8ab97db63e3935ef20943c942cb85832eccacfb0 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 10 Jan 2020 11:34:54 +0200 Subject: [PATCH 7/9] minor fix --- .../sql/execution/datasources/PruneFileSourcePartitions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 007ad233796cd..996076b327467 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -101,7 +101,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation, scan.readPartitionSchema, filters, output) // The dataFilters are pushed down only once - if ((partitionKeyFilters.nonEmpty || dataFilters.nonEmpty) && scan.dataFilters.isEmpty) { + if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) { val prunedV2Relation = v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters)) // The pushed down partition filters don't need to be reevaluated. From 3fe4dc47acb3c68522c1f163aa96ae10ec86aa5a Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Mon, 13 Jan 2020 08:16:49 +0200 Subject: [PATCH 8/9] fix per comment --- .../datasources/PruneFileSourcePartitions.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 996076b327467..59c55c161bc89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -39,14 +39,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val partitionColumns = relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) - val partitionKeyFilters = ExpressionSet(normalizedFilters.filter { f => + val (partitionFilters, dataFilters) = normalizedFilters.partition(f => f.references.subsetOf(partitionSet) - }) + ) - val dataFilters = - normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) - - (partitionKeyFilters, dataFilters) + (ExpressionSet(partitionFilters), dataFilters) } private def rebuildPhysicalOperation( From d181e38a4440c7673b1c14ead96d9d9be2720ea3 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Sat, 18 Jan 2020 07:55:42 +0200 Subject: [PATCH 9/9] fix avro reader --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 8927945f414c3..5a23db7428a97 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1582,12 +1582,10 @@ class AvroV2Suite extends AvroSuite { .toDF("value", "p1", "p2") .write .format("avro") - .option("header", true) .save(dir.getCanonicalPath) val df = spark .read .format("avro") - .option("header", true) .load(dir.getCanonicalPath) .where("value = 'a'")