From 96469c814963ac0d1f31691c7b001c046e0457d3 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 22 Jan 2020 12:45:28 +0800 Subject: [PATCH 01/10] rebase code. --- .../apache/spark/sql/internal/SQLConf.scala | 13 +++++ .../PruneFileSourcePartitions.scala | 28 +++++++-- .../execution/datasources/v2/FileScan.scala | 10 +++- .../PruneFileSourcePartitionsSuite.scala | 58 +++++++++++++++++++ 4 files changed, 103 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e13d65bf81821..3741251004352 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1403,6 +1403,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS = + buildConf("spark.sql.statistics.statisticViaFileSystem.maxPartitionNumber") + .doc("If the number of table (can be either hive table or data source table ) partitions " + + "exceed this value, statistic calculation via file system is not allowed. This is to " + + "avoid calculating size of large number of partitions via file system, eg. HDFS, which " + + "is very time consuming. Setting this value to negative will disable statistic " + + "calculation via file system.") + .intConf + .createWithDefault(1000) + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2564,6 +2574,9 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def maxPartNumForStatsCalculateViaFS: Int = + getConf(MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 1ea19c187e51a..0c1087189c932 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -18,12 +18,13 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogStatistics +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { @@ -59,6 +60,22 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { Project(projects, withFilter) } + private def getNewStatistics( + fileIndex: InMemoryFileIndex, + conf: SQLConf, + tableMeta: Option[CatalogTable]): CatalogStatistics = { + val partNum = fileIndex.partitionSpec().partitions.size + if (partNum <= conf.maxPartNumForStatsCalculateViaFS) { + CatalogStatistics(sizeInBytes = BigInt(fileIndex.sizeInBytes)) + } else { + if (tableMeta.isDefined && tableMeta.get.stats.isDefined) { + tableMeta.get.stats.get + } else { + CatalogStatistics(sizeInBytes = BigInt(conf.defaultSizeInBytes)) + } + } + } + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case op @ PhysicalOperation(projects, filters, logicalRelation @ @@ -73,18 +90,19 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _, _)) - if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( fsRelation.sparkSession, logicalRelation, partitionSchema, filters, logicalRelation.output) + val tableMeta = logicalRelation.catalogTable if (partitionKeyFilters.nonEmpty) { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) // Change table stats based on the sizeInBytes of pruned files - val withStats = logicalRelation.catalogTable.map(_.copy( - stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) + val newStats = getNewStatistics(prunedFileIndex, fsRelation.sparkSession.sessionState.conf, tableMeta) + val newTableMeta = tableMeta.map(_.copy(stats = Some(newStats))) val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, catalogTable = withStats) + relation = prunedFsRelation, catalogTable = newTableMeta) // Keep partition-pruning predicates so that they are visible in physical planning rebuildPhysicalOperation(projects, filters, prunedLogicalRelation) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6e05aa56f4f72..a162c9113998e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -162,7 +162,15 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def estimateStatistics(): Statistics = { new Statistics { override def sizeInBytes(): OptionalLong = { - val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor + val conf = sparkSession.sessionState.conf + val compressionFactor = conf.fileCompressionFactor + val partNum = fileIndex.partitionSpec().partitions.size + if (conf.maxPartNumForStatsCalculateViaFS > 0 + && partNum <= conf.maxPartNumForStatsCalculateViaFS) { + fileIndex.sizeInBytes + } else { + conf.defaultSizeInBytes + } val size = (compressionFactor * fileIndex.sizeInBytes).toLong OptionalLong.of(size) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index c9c36992906a8..7b5d8eaba5ebc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.Matchers._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -74,6 +75,63 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } + test("SPARK-30427 statistics of pruned partitions can be controlled by " + + "spark.sql.statistics.fallBackToFs.maxPartitionNumber") { + withTable("test", "temp") { + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet + |LOCATION '${dir.toURI}'""".stripMargin) + + spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + val singlePartitionSizeInBytes = 4425 + val catalogTable = spark.sharedState.externalCatalog.getTable("default", "test") + val tableMeta = catalogTable.copy(stats = + Some(CatalogStatistics(sizeInBytes = singlePartitionSizeInBytes*4))) + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = catalogFileIndex, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, tableMeta) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") <= 2, logicalRelation)).analyze + val prunedPartNum = 2 + Seq(-1, 1, 2, 3).foreach{ maxPartNum => + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"$maxPartNum") { + val optimized = Optimize.execute(query) + val sizeInBytes = + if (maxPartNum>0 && prunedPartNum<=maxPartNum) { + singlePartitionSizeInBytes*2 + } else { + singlePartitionSizeInBytes*4 + } + assert(optimized.stats.sizeInBytes === sizeInBytes) + } + } + } + } + } + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { withTable("tbl") { spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") From 228763121202c91be81169d7a2482d272d8ce403 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Fri, 10 Jan 2020 15:31:10 +0800 Subject: [PATCH 02/10] Fix compile error. --- .../sql/execution/datasources/PruneFileSourcePartitions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 0c1087189c932..d5489a6d03d9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -99,7 +99,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) // Change table stats based on the sizeInBytes of pruned files - val newStats = getNewStatistics(prunedFileIndex, fsRelation.sparkSession.sessionState.conf, tableMeta) + val newStats = + getNewStatistics(prunedFileIndex, fsRelation.sparkSession.sessionState.conf, tableMeta) val newTableMeta = tableMeta.map(_.copy(stats = Some(newStats))) val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, catalogTable = newTableMeta) From 2c2da949b0ea2f3514a19eadcbd35c22356dadeb Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 14 Jan 2020 22:43:15 +0800 Subject: [PATCH 03/10] update default value of MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS to 100. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3741251004352..4c9b9f0156eee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1411,7 +1411,7 @@ object SQLConf { "is very time consuming. Setting this value to negative will disable statistic " + "calculation via file system.") .intConf - .createWithDefault(1000) + .createWithDefault(100) val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") From 6f315480e6c41e9f4268b0329d55228fea6b6dbc Mon Sep 17 00:00:00 2001 From: fuwhu Date: Thu, 16 Jan 2020 17:26:38 +0800 Subject: [PATCH 04/10] Refine code. --- .../datasources/PruneFileSourcePartitions.scala | 4 ++++ .../spark/sql/execution/datasources/v2/FileScan.scala | 10 +--------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index d5489a6d03d9c..f207d5806628b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -64,8 +64,12 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { fileIndex: InMemoryFileIndex, conf: SQLConf, tableMeta: Option[CatalogTable]): CatalogStatistics = { +<<<<<<< HEAD val partNum = fileIndex.partitionSpec().partitions.size if (partNum <= conf.maxPartNumForStatsCalculateViaFS) { +======= + if (fileIndex.partitionSpec().partitions.size <= conf.maxPartNumForStatsCalculateViaFS) { +>>>>>>> Refine code. CatalogStatistics(sizeInBytes = BigInt(fileIndex.sizeInBytes)) } else { if (tableMeta.isDefined && tableMeta.get.stats.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index a162c9113998e..6e05aa56f4f72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -162,15 +162,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def estimateStatistics(): Statistics = { new Statistics { override def sizeInBytes(): OptionalLong = { - val conf = sparkSession.sessionState.conf - val compressionFactor = conf.fileCompressionFactor - val partNum = fileIndex.partitionSpec().partitions.size - if (conf.maxPartNumForStatsCalculateViaFS > 0 - && partNum <= conf.maxPartNumForStatsCalculateViaFS) { - fileIndex.sizeInBytes - } else { - conf.defaultSizeInBytes - } + val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor val size = (compressionFactor * fileIndex.sizeInBytes).toLong OptionalLong.of(size) } From bccdc3e848199e050c9ac39523357bfb12010c7d Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 22 Jan 2020 12:49:49 +0800 Subject: [PATCH 05/10] refine code --- .../execution/datasources/PruneFileSourcePartitions.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index f207d5806628b..4003c083a59b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -64,12 +64,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { fileIndex: InMemoryFileIndex, conf: SQLConf, tableMeta: Option[CatalogTable]): CatalogStatistics = { -<<<<<<< HEAD - val partNum = fileIndex.partitionSpec().partitions.size - if (partNum <= conf.maxPartNumForStatsCalculateViaFS) { -======= if (fileIndex.partitionSpec().partitions.size <= conf.maxPartNumForStatsCalculateViaFS) { ->>>>>>> Refine code. CatalogStatistics(sizeInBytes = BigInt(fileIndex.sizeInBytes)) } else { if (tableMeta.isDefined && tableMeta.get.stats.isDefined) { From d62171d2e50895a9511d2a90efa5bf9ddcc59736 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 22 Jan 2020 22:30:38 +0800 Subject: [PATCH 06/10] Check whether the partition number to calculate sizeInBytes is less than SQLConf.maxPartNumForStatsCalculateViaFS in rule PriveHiveTablePartitions. --- .../execution/PruneHiveTablePartitions.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index a0349f627d107..b2c1f0736c332 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.common.StatsSetupConst - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} @@ -26,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.internal.SQLConf @@ -73,19 +73,30 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) private def updateTableMeta( tableMeta: CatalogTable, prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { - val sizeOfPartitions = prunedPartitions.map { partition => + val partitionsWithSize = prunedPartitions.map { partition => val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) if (rawDataSize.isDefined && rawDataSize.get > 0) { - rawDataSize.get + (partition, rawDataSize.get) } else if (totalSize.isDefined && totalSize.get > 0L) { - totalSize.get + (partition, totalSize.get) } else { - 0L + (partition, 0L) } } - if (sizeOfPartitions.forall(_ > 0)) { - val sizeInBytes = sizeOfPartitions.sum + if (partitionsWithSize.forall(_._2 > 0)) { + val sizeInBytes = partitionsWithSize.map(_._2).sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } else if (partitionsWithSize.count(_._2 == 0) <= conf.maxPartNumForStatsCalculateViaFS) { + val sizeInBytes = + partitionsWithSize.map(pair => { + if (pair._2 == 0) { + CommandUtils.calculateLocationSize( + session.sessionState, tableMeta.identifier, pair._1.storage.locationUri) + } else { + pair._2 + } + }).sum tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } else { tableMeta From faffdf81dc8f55e8c402c37f096b259c7a44570d Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 22 Jan 2020 22:34:00 +0800 Subject: [PATCH 07/10] fix scala code style. --- .../spark/sql/hive/execution/PruneHiveTablePartitions.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index b2c1f0736c332..2bf590dca7e60 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} From a06288c958de041ace1add08e3fc3a8e6e130949 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 28 Jan 2020 09:22:58 +0800 Subject: [PATCH 08/10] refine tests. --- .../PruneFileSourcePartitionsSuite.scala | 85 +++++++++---------- .../PruneHiveTablePartitionsSuite.scala | 60 ++++++++++++- 2 files changed, 96 insertions(+), 49 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 7b5d8eaba5ebc..7b0115edf64b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -43,56 +43,50 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te test("PruneFileSourcePartitions should not change the output of LogicalRelation") { withTable("test") { - withTempDir { dir => - sql( - s""" - |CREATE EXTERNAL TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS parquet - |LOCATION '${dir.toURI}'""".stripMargin) - - val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) - - val dataSchema = StructType(tableMeta.schema.filterNot { f => - tableMeta.partitionColumnNames.contains(f.name) - }) - val relation = HadoopFsRelation( - location = catalogFileIndex, - partitionSchema = tableMeta.partitionSchema, - dataSchema = dataSchema, - bucketSpec = None, - fileFormat = new ParquetFileFormat(), - options = Map.empty)(sparkSession = spark) - - val logicalRelation = LogicalRelation(relation, tableMeta) - val query = Project(Seq(Symbol("i"), Symbol("p")), - Filter(Symbol("p") === 1, logicalRelation)).analyze - - val optimized = Optimize.execute(query) - assert(optimized.missingInput.isEmpty) - } + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet""".stripMargin) + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = catalogFileIndex, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, tableMeta) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") === 1, logicalRelation)).analyze + + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) } } - test("SPARK-30427 statistics of pruned partitions can be controlled by " + + test("SPARK-30427 statistics of pruned partitions on file source table can be controlled by " + "spark.sql.statistics.fallBackToFs.maxPartitionNumber") { withTable("test", "temp") { - withTempDir { dir => - sql( - s""" - |CREATE EXTERNAL TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS parquet - |LOCATION '${dir.toURI}'""".stripMargin) + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet""".stripMargin) spark.range(0, 1000, 1).selectExpr("id as col") .createOrReplaceTempView("temp") for (part <- Seq(1, 2, 3, 4)) { sql(s""" - |INSERT OVERWRITE TABLE test PARTITION (p='$part') - |select col from temp""".stripMargin) + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) } val singlePartitionSizeInBytes = 4425 val catalogTable = spark.sharedState.externalCatalog.getTable("default", "test") @@ -119,16 +113,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te withSQLConf( SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"$maxPartNum") { val optimized = Optimize.execute(query) - val sizeInBytes = - if (maxPartNum>0 && prunedPartNum<=maxPartNum) { - singlePartitionSizeInBytes*2 - } else { - singlePartitionSizeInBytes*4 - } - assert(optimized.stats.sizeInBytes === sizeInBytes) + if (prunedPartNum <= maxPartNum) { + assert(optimized.stats.sizeInBytes / 2 === singlePartitionSizeInBytes) + } else { + assert(optimized.stats.sizeInBytes / 4 === singlePartitionSizeInBytes) + } } } - } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index e41709841a736..7b3a1e18682fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -18,10 +18,15 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -32,7 +37,7 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil } - test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { + test("SPARK-15616 statistics pruned after going through PruneHiveTablePartitions") { withTable("test", "temp") { sql( s""" @@ -54,4 +59,55 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes Optimize.execute(analyzed2).stats.sizeInBytes) } } + + test("SPARK-30427 spark.sql.statistics.fallBackToFs.maxPartitionNumber can not control " + + "the action of rule PruneHiveTablePartitions when sizeInBytes of all partitions are " + + "available in meta data.") { + withTable("test", "temp") { + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + + spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val relation = + HiveTableRelation(tableMeta, + tableMeta.dataSchema.asNullable.toAttributes, + tableMeta.partitionSchema.asNullable.toAttributes) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") === 1, relation)).analyze + var plan1: LogicalPlan = null + var plan2: LogicalPlan = null + var plan3: LogicalPlan = null + var plan4: LogicalPlan = null + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"-1") { + plan1 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"1") { + plan2 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"2") { + plan3 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"3") { + plan4 = Optimize.execute(query) + } + assert(plan1.stats.sizeInBytes === plan2.stats.sizeInBytes) + assert(plan2.stats.sizeInBytes === plan3.stats.sizeInBytes) + assert(plan3.stats.sizeInBytes === plan4.stats.sizeInBytes) + } + } } From 3c27224777362f53baf20d79fd38ab9500621977 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Fri, 31 Jan 2020 12:11:39 +0800 Subject: [PATCH 09/10] roll back the check on partition number when calculating statistics in PruneFileSourcePartitions. --- .../PruneFileSourcePartitions.scala | 30 ++++--------------- 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 4003c083a59b2..59c55c161bc89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -18,13 +18,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable} import org.apache.spark.sql.types.StructType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { @@ -60,21 +59,6 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { Project(projects, withFilter) } - private def getNewStatistics( - fileIndex: InMemoryFileIndex, - conf: SQLConf, - tableMeta: Option[CatalogTable]): CatalogStatistics = { - if (fileIndex.partitionSpec().partitions.size <= conf.maxPartNumForStatsCalculateViaFS) { - CatalogStatistics(sizeInBytes = BigInt(fileIndex.sizeInBytes)) - } else { - if (tableMeta.isDefined && tableMeta.get.stats.isDefined) { - tableMeta.get.stats.get - } else { - CatalogStatistics(sizeInBytes = BigInt(conf.defaultSizeInBytes)) - } - } - } - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case op @ PhysicalOperation(projects, filters, logicalRelation @ @@ -89,20 +73,18 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _, _)) - if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( fsRelation.sparkSession, logicalRelation, partitionSchema, filters, logicalRelation.output) - val tableMeta = logicalRelation.catalogTable if (partitionKeyFilters.nonEmpty) { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) // Change table stats based on the sizeInBytes of pruned files - val newStats = - getNewStatistics(prunedFileIndex, fsRelation.sparkSession.sessionState.conf, tableMeta) - val newTableMeta = tableMeta.map(_.copy(stats = Some(newStats))) + val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, catalogTable = newTableMeta) + relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning rebuildPhysicalOperation(projects, filters, prunedLogicalRelation) } else { From 8b14ce4435dfa6fb5114385bb122ec9a86ca7213 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Fri, 31 Jan 2020 23:56:39 +0800 Subject: [PATCH 10/10] fix test failure --- .../PruneFileSourcePartitionsSuite.scala | 53 ------------------- 1 file changed, 53 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 7b0115edf64b5..b5b3a08f251f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -21,7 +21,6 @@ import org.scalatest.Matchers._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -71,58 +70,6 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } - test("SPARK-30427 statistics of pruned partitions on file source table can be controlled by " + - "spark.sql.statistics.fallBackToFs.maxPartitionNumber") { - withTable("test", "temp") { - sql( - s""" - |CREATE TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS parquet""".stripMargin) - - spark.range(0, 1000, 1).selectExpr("id as col") - .createOrReplaceTempView("temp") - - for (part <- Seq(1, 2, 3, 4)) { - sql(s""" - |INSERT OVERWRITE TABLE test PARTITION (p='$part') - |select col from temp""".stripMargin) - } - val singlePartitionSizeInBytes = 4425 - val catalogTable = spark.sharedState.externalCatalog.getTable("default", "test") - val tableMeta = catalogTable.copy(stats = - Some(CatalogStatistics(sizeInBytes = singlePartitionSizeInBytes*4))) - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) - - val dataSchema = StructType(tableMeta.schema.filterNot { f => - tableMeta.partitionColumnNames.contains(f.name) - }) - val relation = HadoopFsRelation( - location = catalogFileIndex, - partitionSchema = tableMeta.partitionSchema, - dataSchema = dataSchema, - bucketSpec = None, - fileFormat = new ParquetFileFormat(), - options = Map.empty)(sparkSession = spark) - - val logicalRelation = LogicalRelation(relation, tableMeta) - val query = Project(Seq(Symbol("i"), Symbol("p")), - Filter(Symbol("p") <= 2, logicalRelation)).analyze - val prunedPartNum = 2 - Seq(-1, 1, 2, 3).foreach{ maxPartNum => - withSQLConf( - SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"$maxPartNum") { - val optimized = Optimize.execute(query) - if (prunedPartNum <= maxPartNum) { - assert(optimized.stats.sizeInBytes / 2 === singlePartitionSizeInBytes) - } else { - assert(optimized.stats.sizeInBytes / 4 === singlePartitionSizeInBytes) - } - } - } - } - } - test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { withTable("tbl") { spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")