From b889eb41a49c73066ba2cf0bebf82d0a065fec94 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Mon, 13 Jan 2020 17:57:00 +0800 Subject: [PATCH 01/10] Take partitionFilters and partition num into account when calculating statistic sizeInBytes in FileScan.estimateStatistics --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++++++ .../datasources/PartitioningAwareFileIndex.scala | 8 ++++++++ .../sql/execution/datasources/v2/FileScan.scala | 11 +++++++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 64c613611c861..37d1760bb3ab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1401,6 +1401,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS = + buildConf("spark.sql.statistics.statisticViaFileSystem.maxPartitionNumber") + .doc("If the number of table (can be either hive table or data source table ) partitions " + + "exceed this value, statistic calculation via file system is not allowed. This is to " + + "avoid calculating size of large number of partitions via file system, eg. HDFS, which " + + "is very time consuming. Setting this value to negative will disable statistic " + + "calculation via file system.") + .intConf + .createWithDefault(1000) + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2593,6 +2603,9 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def maxPartNumForStatsCalculateViaFS: Int = + getConf(MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 2e09c729529a6..b34c3d262f6e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -66,6 +66,14 @@ abstract class PartitioningAwareFileIndex( parameters.getOrElse("recursiveFileLookup", "false").toBoolean } + def listPartitions(partitionFilters: Seq[Expression]): Option[Seq[PartitionPath]] = { + if (partitionSpec().partitionColumns.nonEmpty) { + Some(prunePartitions(partitionFilters, partitionSpec())) + } else { + None + } + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6e05aa56f4f72..a8111399eaaa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -160,10 +160,17 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } override def estimateStatistics(): Statistics = { + val partitions = fileIndex.listPartitions(partitionFilters) + val conf = sparkSession.sessionState.conf new Statistics { override def sizeInBytes(): OptionalLong = { - val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor - val size = (compressionFactor * fileIndex.sizeInBytes).toLong + val compressionFactor = conf.fileCompressionFactor + val size = + if (partitions.isDefined && partitions.get.size<=conf.maxPartNumForStatsCalculateViaFS) { + (compressionFactor * fileIndex.sizeInBytes).toLong + } else { + (compressionFactor * conf.defaultSizeInBytes).toLong + } OptionalLong.of(size) } From 7d0cbd36e1118cf88fc6edfb11cf35487dc71559 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 14 Jan 2020 15:20:01 +0800 Subject: [PATCH 02/10] Add assert to make sure the rootPathsSpecified of InMemoryFileIndex is equal to the root paths of patitions specified by userSpecifiedPartitionSpec. --- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index cac2d6e626120..0eedcb106c7b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -56,6 +56,10 @@ class InMemoryFileIndex( extends PartitioningAwareFileIndex( sparkSession, parameters, userSpecifiedSchema, fileStatusCache) { + assert(userSpecifiedPartitionSpec.isEmpty || + userSpecifiedPartitionSpec.get.partitions.map(_.path).equals(rootPathsSpecified), + "") + // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain // such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" From 0646d9a83e48935797eda90f87eea73dd414f58d Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 15 Jan 2020 14:14:14 +0800 Subject: [PATCH 03/10] Refine FileScan.estimateStatistics to take partitionFilters and MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS into account. --- .../sql/execution/DataSourceScanExec.scala | 4 +- .../execution/OptimizeMetadataOnlyQuery.scala | 2 +- .../datasources/CatalogFileIndex.scala | 5 +-- .../sql/execution/datasources/FileIndex.scala | 6 +-- .../datasources/InMemoryFileIndex.scala | 3 +- .../PartitioningAwareFileIndex.scala | 17 +++------ .../execution/datasources/v2/FileScan.scala | 37 ++++++++++++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 8 files changed, 43 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0d759085a7e2c..2fd3ed68bec4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -207,8 +207,8 @@ case class FileSourceScanExec( val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = - relation.location.listFiles( - partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) + relation.location.listPartitionData( + partitionFilters.filterNot(isDynamicPruningFilter)) if (relation.partitionSchemaOption.isDefined) { driverMetrics("numPartitions") = ret.length } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 45e5f415e8da1..ec50417706111 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -124,7 +124,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) - val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil) + val partitionData = fsRelation.location.listPartitionData(normalizedFilters) LocalRelation(partAttrs, partitionData.map(_.values), isStreaming) case relation: HiveTableRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 91313f33a78e0..b96ad4eeb11ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -54,9 +54,8 @@ class CatalogFileIndex( override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - override def listFiles( - partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - filterPartitions(partitionFilters).listFiles(Nil, dataFilters) + override def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = { + filterPartitions(partitionFilters).listPartitionData(Nil) } override def refresh(): Unit = fileStatusCache.invalidateAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 094a66a2820f3..d5608c6ef3cd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -51,12 +51,8 @@ trait FileIndex { * files where these predicates are guaranteed to evaluate to `true`. * Thus, these filters will not need to be evaluated again on the * returned data. - * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation - * does not need to guarantee these filters are applied, i.e. the execution - * engine will ensure these filters are still applied on the returned files. */ - def listFiles( - partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] + def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] /** * Returns the list of files that will be read when scanning this relation. This call may be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 0eedcb106c7b5..7caa2a4ad04f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -58,7 +58,8 @@ class InMemoryFileIndex( assert(userSpecifiedPartitionSpec.isEmpty || userSpecifiedPartitionSpec.get.partitions.map(_.path).equals(rootPathsSpecified), - "") + s"The rootPathsSpecified ($rootPathsSpecified) is inconsistent with the file paths " + + s"of userSpecifiedPartitionSpec (${userSpecifiedPartitionSpec.get.partitions.map(_.path)}).") // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index b34c3d262f6e0..40b7d3e72d9db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType /** * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables. @@ -66,16 +66,7 @@ abstract class PartitioningAwareFileIndex( parameters.getOrElse("recursiveFileLookup", "false").toBoolean } - def listPartitions(partitionFilters: Seq[Expression]): Option[Seq[PartitionPath]] = { - if (partitionSpec().partitionColumns.nonEmpty) { - Some(prunePartitions(partitionFilters, partitionSpec())) - } else { - None - } - } - - override def listFiles( - partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + override def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { isDataPath(f.getPath) && f.getLen > 0 } @@ -110,6 +101,10 @@ abstract class PartitioningAwareFileIndex( override def sizeInBytes: Long = allFiles().map(_.getLen).sum + def sizeInBytesOfPartitions(partitions: Seq[PartitionDirectory]): Long = { + partitions.flatMap(_.files).map(_.getLen).sum + } + def allFiles(): Seq[FileStatus] = { val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) { // For each of the root input paths, get the list of files inside them diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index a8111399eaaa2..c68fa6399c854 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics} import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -111,7 +112,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } protected def partitions: Seq[FilePartition] = { - val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + val selectedPartitions = fileIndex.listPartitionData(partitionFilters) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val partitionAttributes = fileIndex.partitionSchema.toAttributes val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap @@ -159,19 +160,37 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin partitions.toArray } + private def getSizeInBytes(conf: SQLConf): Long = { + if (fileIndex.partitionSpec().partitionColumns.isEmpty) { + fileIndex.sizeInBytes + } else { + val (partitionNum, partitionData) = + if (partitionFilters.isEmpty) { + (fileIndex.partitionSpec().partitions.size, None) + } else { + val partitions = fileIndex.listPartitionData(partitionFilters) + (partitions.size, Some(partitions)) + } + if (partitionNum <= conf.maxPartNumForStatsCalculateViaFS) { + if (partitionData.isDefined) { + fileIndex.sizeInBytesOfPartitions(partitionData.get) + } else { + fileIndex.sizeInBytes + } + } else { + conf.defaultSizeInBytes + } + } + } + override def estimateStatistics(): Statistics = { - val partitions = fileIndex.listPartitions(partitionFilters) + val conf = sparkSession.sessionState.conf new Statistics { override def sizeInBytes(): OptionalLong = { val compressionFactor = conf.fileCompressionFactor - val size = - if (partitions.isDefined && partitions.get.size<=conf.maxPartNumForStatsCalculateViaFS) { - (compressionFactor * fileIndex.sizeInBytes).toLong - } else { - (compressionFactor * conf.defaultSizeInBytes).toLong - } - OptionalLong.of(size) + val sizeInBytes = getSizeInBytes(conf) + OptionalLong.of((compressionFactor*sizeInBytes).toLong) } override def numRows(): OptionalLong = OptionalLong.empty() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2981e391c0439..69e04f377b13a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -297,7 +297,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log .inferSchema( sparkSession, options, - fileIndex.listFiles(Nil, Nil).flatMap(_.files)) + fileIndex.listPartitionData(Nil).flatMap(_.files)) .map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _)) inferredSchema match { From c4f0c1995c961574b5576cd7f30747cd81d5b5cd Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 15 Jan 2020 14:56:50 +0800 Subject: [PATCH 04/10] Fix compile error. --- .../org/apache/spark/sql/execution/datasources/v2/FileScan.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index c68fa6399c854..95596e8d25dc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -184,7 +184,6 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } override def estimateStatistics(): Statistics = { - val conf = sparkSession.sessionState.conf new Statistics { override def sizeInBytes(): OptionalLong = { From 7c2c3ca2e2b3cd3c09884b53c5498abebc3cc22c Mon Sep 17 00:00:00 2001 From: fuwhu Date: Thu, 30 Jan 2020 23:27:52 +0800 Subject: [PATCH 05/10] roll back on the change on method FileIndex.listFiles --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 2 +- .../spark/sql/execution/datasources/CatalogFileIndex.scala | 5 +++-- .../apache/spark/sql/execution/datasources/FileIndex.scala | 6 +++++- .../execution/datasources/PartitioningAwareFileIndex.scala | 3 ++- .../spark/sql/execution/datasources/v2/FileScan.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 7 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2fd3ed68bec4f..0d759085a7e2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -207,8 +207,8 @@ case class FileSourceScanExec( val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = - relation.location.listPartitionData( - partitionFilters.filterNot(isDynamicPruningFilter)) + relation.location.listFiles( + partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) if (relation.partitionSchemaOption.isDefined) { driverMetrics("numPartitions") = ret.length } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index ec50417706111..45e5f415e8da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -124,7 +124,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) - val partitionData = fsRelation.location.listPartitionData(normalizedFilters) + val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil) LocalRelation(partAttrs, partitionData.map(_.values), isStreaming) case relation: HiveTableRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index b96ad4eeb11ab..91313f33a78e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -54,8 +54,9 @@ class CatalogFileIndex( override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - override def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = { - filterPartitions(partitionFilters).listPartitionData(Nil) + override def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + filterPartitions(partitionFilters).listFiles(Nil, dataFilters) } override def refresh(): Unit = fileStatusCache.invalidateAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index d5608c6ef3cd8..094a66a2820f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -51,8 +51,12 @@ trait FileIndex { * files where these predicates are guaranteed to evaluate to `true`. * Thus, these filters will not need to be evaluated again on the * returned data. + * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation + * does not need to guarantee these filters are applied, i.e. the execution + * engine will ensure these filters are still applied on the returned files. */ - def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] + def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] /** * Returns the list of files that will be read when scanning this relation. This call may be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 40b7d3e72d9db..6546ec261868b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -66,7 +66,8 @@ abstract class PartitioningAwareFileIndex( parameters.getOrElse("recursiveFileLookup", "false").toBoolean } - override def listPartitionData(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = { + override def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { isDataPath(f.getPath) && f.getLen > 0 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 95596e8d25dc6..95f252cf11654 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -112,7 +112,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } protected def partitions: Seq[FilePartition] = { - val selectedPartitions = fileIndex.listPartitionData(partitionFilters) + val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val partitionAttributes = fileIndex.partitionSchema.toAttributes val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap @@ -168,7 +168,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin if (partitionFilters.isEmpty) { (fileIndex.partitionSpec().partitions.size, None) } else { - val partitions = fileIndex.listPartitionData(partitionFilters) + val partitions = fileIndex.listFiles(partitionFilters, dataFilters) (partitions.size, Some(partitions)) } if (partitionNum <= conf.maxPartNumForStatsCalculateViaFS) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 69e04f377b13a..2981e391c0439 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -297,7 +297,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log .inferSchema( sparkSession, options, - fileIndex.listPartitionData(Nil).flatMap(_.files)) + fileIndex.listFiles(Nil, Nil).flatMap(_.files)) .map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _)) inferredSchema match { From a443031897238aae50d53c7506180f21ec960dd3 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Fri, 31 Jan 2020 12:46:15 +0800 Subject: [PATCH 06/10] Remove check on partition number when estmating statistics in FileScan. --- .../execution/datasources/v2/FileScan.scala | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 95f252cf11654..da252be55f89e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -161,25 +161,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } private def getSizeInBytes(conf: SQLConf): Long = { - if (fileIndex.partitionSpec().partitionColumns.isEmpty) { + if (fileIndex.partitionSpec().partitionColumns.isEmpty || partitionFilters.isEmpty) { fileIndex.sizeInBytes } else { - val (partitionNum, partitionData) = - if (partitionFilters.isEmpty) { - (fileIndex.partitionSpec().partitions.size, None) - } else { - val partitions = fileIndex.listFiles(partitionFilters, dataFilters) - (partitions.size, Some(partitions)) - } - if (partitionNum <= conf.maxPartNumForStatsCalculateViaFS) { - if (partitionData.isDefined) { - fileIndex.sizeInBytesOfPartitions(partitionData.get) - } else { - fileIndex.sizeInBytes - } - } else { - conf.defaultSizeInBytes - } + fileIndex.sizeInBytesOfPartitions(fileIndex.listFiles(partitionFilters, dataFilters)) } } @@ -189,7 +174,8 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def sizeInBytes(): OptionalLong = { val compressionFactor = conf.fileCompressionFactor val sizeInBytes = getSizeInBytes(conf) - OptionalLong.of((compressionFactor*sizeInBytes).toLong) + val size = (compressionFactor * sizeInBytes).toLong + OptionalLong.of(size) } override def numRows(): OptionalLong = OptionalLong.empty() From 56f7efaee469e7ab8be990019bd881084e4c8985 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Fri, 31 Jan 2020 12:50:01 +0800 Subject: [PATCH 07/10] remove conf MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS in SQLConf. --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 37d1760bb3ab2..64c613611c861 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1401,16 +1401,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS = - buildConf("spark.sql.statistics.statisticViaFileSystem.maxPartitionNumber") - .doc("If the number of table (can be either hive table or data source table ) partitions " + - "exceed this value, statistic calculation via file system is not allowed. This is to " + - "avoid calculating size of large number of partitions via file system, eg. HDFS, which " + - "is very time consuming. Setting this value to negative will disable statistic " + - "calculation via file system.") - .intConf - .createWithDefault(1000) - val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2603,9 +2593,6 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) - def maxPartNumForStatsCalculateViaFS: Int = - getConf(MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS) - def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) From be7e6604b447a901432986ae3eba0f9b2416db23 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 4 Feb 2020 10:22:05 +0800 Subject: [PATCH 08/10] refine code --- .../spark/sql/execution/datasources/v2/FileScan.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index da252be55f89e..22a37ca2e5ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -160,7 +160,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin partitions.toArray } - private def getSizeInBytes(conf: SQLConf): Long = { + private def getSizeInBytes(): Long = { if (fileIndex.partitionSpec().partitionColumns.isEmpty || partitionFilters.isEmpty) { fileIndex.sizeInBytes } else { @@ -169,11 +169,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin } override def estimateStatistics(): Statistics = { - val conf = sparkSession.sessionState.conf new Statistics { override def sizeInBytes(): OptionalLong = { - val compressionFactor = conf.fileCompressionFactor - val sizeInBytes = getSizeInBytes(conf) + val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor + val sizeInBytes = getSizeInBytes() val size = (compressionFactor * sizeInBytes).toLong OptionalLong.of(size) } From 94ea426c7739367f4ea54ad462d1b70dffd28ebf Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 4 Feb 2020 10:26:28 +0800 Subject: [PATCH 09/10] refine code --- .../org/apache/spark/sql/execution/datasources/v2/FileScan.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 22a37ca2e5ab2..96eedeac4da1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics} import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils From 2e210a092442bbc65ff92f97a2eb2c06aaf83376 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Mon, 10 Feb 2020 16:55:37 +0800 Subject: [PATCH 10/10] Add test in FileBasedDataSourceSuite --- .../spark/sql/FileBasedDataSourceSuite.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c870958128483..841e40ab63371 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -787,6 +787,42 @@ class FileBasedDataSourceSuite extends QueryTest } } + test("File source v2: involve partition filters in statistic estimation") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + allFileBasedDataSources.foreach { format => + withTempPath { dir => + Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) + .toDF("value", "p1", "p2") + .write + .format(format) + .partitionBy("p1", "p2") + .option("header", true) + .save(dir.getCanonicalPath) + val df1 = spark + .read + .format(format) + .option("header", true) + .load(dir.getCanonicalPath) + .where("p1 = 1 and p2 = 2") + val df2 = spark + .read + .format(format) + .option("header", true) + .load(dir.getCanonicalPath) + .where("p1 = 2 and p2 = 1") + val fileScan1 = df1.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: FileScan) => f + } + val fileScan2 = df2.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: FileScan) => f + } + assert(fileScan1.get.estimateStatistics().sizeInBytes().getAsLong / 2 === + fileScan2.get.estimateStatistics().sizeInBytes().getAsLong) + } + } + } + } + test("File source v2: support passing data filters to FileScan without partitionFilters") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { allFileBasedDataSources.foreach { format =>