From f2f31509beb5f4221a4abf3ca3c26a30966fbcbd Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 11 Aug 2016 13:55:03 -0700 Subject: [PATCH 01/10] repair table in batch --- .../apache/spark/sql/execution/command/ddl.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 8fa7615b97b18..6e17d0ca7d890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -478,12 +478,16 @@ case class AlterTableRecoverPartitionsCommand( val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val partitionSpecsAndLocs = scanPartitions( spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase)) - val parts = partitionSpecsAndLocs.map { case (spec, location) => - // inherit table storage format (possibly except for location) - CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) + // Hive metastore may not have enough memory to handle millions of partitions in single RPC, + // we should split them into smaller batches. + partitionSpecsAndLocs.iterator.grouped(1024).foreach { batch => + val parts = batch.map { case (spec, location) => + // inherit table storage format (possibly except for location) + CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) + } + spark.sessionState.catalog.createPartitions(tableName, + parts.toArray[CatalogTablePartition], ignoreIfExists = true) } - spark.sessionState.catalog.createPartitions(tableName, - parts.toArray[CatalogTablePartition], ignoreIfExists = true) Seq.empty[Row] } From ec2d8dad9554a07d712f28f206598c9524e3ee50 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 11 Aug 2016 14:08:13 -0700 Subject: [PATCH 02/10] add logging --- .../org/apache/spark/sql/execution/command/ddl.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6e17d0ca7d890..5f2875ce3a45a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -471,6 +471,7 @@ case class AlterTableRecoverPartitionsCommand( } val root = new Path(table.storage.locationUri.get) + logInfo(s"Recover all the partitions in $root") val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) // Dummy jobconf to get to the pathFilter defined in configuration // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) @@ -478,16 +479,21 @@ case class AlterTableRecoverPartitionsCommand( val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val partitionSpecsAndLocs = scanPartitions( spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase)) + val total = partitionSpecsAndLocs.length + logInfo(s"Found $total partitions in $root") + var done = 0L // Hive metastore may not have enough memory to handle millions of partitions in single RPC, // we should split them into smaller batches. partitionSpecsAndLocs.iterator.grouped(1024).foreach { batch => val parts = batch.map { case (spec, location) => // inherit table storage format (possibly except for location) CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) - } - spark.sessionState.catalog.createPartitions(tableName, - parts.toArray[CatalogTablePartition], ignoreIfExists = true) + }.toArray[CatalogTablePartition] + spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) + done += parts.length + logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") } + logInfo(s"Recovered all partitions ($total).") Seq.empty[Row] } From c442b758e8bf0fc1affd1daa08381d458c7a71a4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 11 Aug 2016 16:15:31 -0700 Subject: [PATCH 03/10] support managed table --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5f2875ce3a45a..9e8d754dd13a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -457,10 +457,6 @@ case class AlterTableRecoverPartitionsCommand( throw new AnalysisException( s"Operation not allowed: $cmd on datasource tables: $tableName") } - if (table.tableType != CatalogTableType.EXTERNAL) { - throw new AnalysisException( - s"Operation not allowed: $cmd only works on external tables: $tableName") - } if (!DDLUtils.isTablePartitioned(table)) { throw new AnalysisException( s"Operation not allowed: $cmd only works on partitioned tables: $tableName") From b3797c960348dffa88a578ff2380e1a5d5ad2c85 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Aug 2016 16:29:01 -0700 Subject: [PATCH 04/10] speed up addPartition by gather the fast stats in parallel --- .../sql/catalyst/catalog/interface.scala | 4 +- .../spark/sql/execution/command/ddl.scala | 94 +++++++++++++++---- .../sql/execution/command/DDLSuite.scala | 11 +++ .../sql/hive/client/HiveClientImpl.scala | 4 +- .../spark/sql/hive/client/HiveShim.scala | 6 +- 5 files changed, 98 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f7762e0f8acd3..50e21848d1a38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -81,10 +81,12 @@ object CatalogStorageFormat { * * @param spec partition spec values indexed by column name * @param storage storage format of the partition + * @param parameters some parameters for the partition, for example, stats. */ case class CatalogTablePartition( spec: CatalogTypes.TablePartitionSpec, - storage: CatalogStorageFormat) + storage: CatalogStorageFormat, + parameters: Map[String, String] = Map.empty) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 9e8d754dd13a7..e138ab2214110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,22 +17,25 @@ package org.apache.spark.sql.execution.command -import scala.collection.GenSeq +import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.execution.datasources.HadoopFsRelation.{FakeBlockLocation, FakeFileStatus} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -443,6 +446,30 @@ case class AlterTableDropPartitionCommand( case class AlterTableRecoverPartitionsCommand( tableName: TableIdentifier, cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { + + // These are two fast stats in Hive Metastore + // see https://github.com/apache/hive/blob/master/ + // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java#L88 + val NUM_FILES = "numFiles" + val TOTAL_SIZE = "totalSize" + + private def getPathFilter(hadoopConf: Configuration): PathFilter = { + // Dummy jobconf to get to the pathFilter defined in configuration + // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + new PathFilter { + override def accept(path: Path): Boolean = { + val name = path.getName + if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { + pathFilter == null || pathFilter.accept(path) + } else { + false + } + } + } + } + override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog if (!catalog.tableExists(tableName)) { @@ -469,21 +496,55 @@ case class AlterTableRecoverPartitionsCommand( val root = new Path(table.storage.locationUri.get) logInfo(s"Recover all the partitions in $root") val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) - // Dummy jobconf to get to the pathFilter defined in configuration - // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) - val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + + val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt + val hadoopConf = spark.sparkContext.hadoopConfiguration + val pathFilter = getPathFilter(hadoopConf) val partitionSpecsAndLocs = scanPartitions( - spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase)) + spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold) val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") + + val partitionStats: GenMap[String, (Int, Long)] = if (total > threshold) { + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(serializedPaths.size, 10000) + // gather the fast stats for all the partitions otherwise Hive metastore will list all the + // files for all the new partitions in sequential way, which is super slow. + logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") + spark.sparkContext.parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val pathFilter = getPathFilter(serializableConfiguration.value) + paths.map(new Path(_)).map{ path => + val fs = path.getFileSystem(serializableConfiguration.value) + val statuses = fs.listStatus(path, pathFilter) + (path.toString, (statuses.length, statuses.map(_.getLen).sum)) + } + }.collectAsMap() + } else { + partitionSpecsAndLocs.map { case (_, location) => + val statuses = fs.listStatus(location, pathFilter) + (location.toString, (statuses.length, statuses.map(_.getLen).sum)) + }.toMap + } + logInfo(s"Finished to gather the fast stats for all $total partitions.") + var done = 0L // Hive metastore may not have enough memory to handle millions of partitions in single RPC, // we should split them into smaller batches. partitionSpecsAndLocs.iterator.grouped(1024).foreach { batch => val parts = batch.map { case (spec, location) => // inherit table storage format (possibly except for location) - CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) + val (numFiles, totalSize) = partitionStats(location.toString) + // This two fast stat could prevent Hive metastore to list the files again. + val params = Map(NUM_FILES -> numFiles.toString, TOTAL_SIZE -> totalSize.toString) + CatalogTablePartition( + spec, + table.storage.copy(locationUri = Some(location.toUri.toString)), + params) }.toArray[CatalogTablePartition] spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) done += parts.length @@ -501,15 +562,16 @@ case class AlterTableRecoverPartitionsCommand( filter: PathFilter, path: Path, spec: TablePartitionSpec, - partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = { + partitionNames: Seq[String], + threshold: Int): GenSeq[(TablePartitionSpec, Path)] = { if (partitionNames.length == 0) { return Seq(spec -> path) } - val statuses = fs.listStatus(path) - val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt + val statuses = fs.listStatus(path, filter) val statusPar: GenSeq[FileStatus] = if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { + // parallelize the list of partitions here, then we can have better parallelism later. val parArray = statuses.par parArray.tasksupport = evalTaskSupport parArray @@ -525,16 +587,14 @@ case class AlterTableRecoverPartitionsCommand( val value = PartitioningUtils.unescapePathName(ps(1)) // comparing with case-insensitive, but preserve the case if (columnName == partitionNames(0)) { - scanPartitions( - spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1)) + scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), + partitionNames.drop(1), threshold) } else { logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it") Seq() } } else { - if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { - logWarning(s"ignore ${new Path(path, name)}") - } + logWarning(s"ignore ${new Path(path, name)}") Seq() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ce1f7c5082ca1..a8d6a384ae0a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -900,7 +900,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file fs.mkdirs(new Path(new Path(root, "A=2"), "B=6")) + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file + fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary")) + // invalid fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order @@ -914,6 +921,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("ALTER TABLE tab1 RECOVER PARTITIONS") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) + assert(catalog.getPartition(tableIdent, part1).parameters == + Map("numFiles" -> "1", "totalSize" -> "0")) + assert(catalog.getPartition(tableIdent, part2).parameters == + Map("numFiles" -> "2", "totalSize" -> "0")) } finally { fs.delete(root, true) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f8204e183f03a..5a78a33624c71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -829,6 +829,7 @@ private[hive] class HiveClientImpl( tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) + tpart.setParameters(p.parameters.asJava) new HivePartition(ht, tpart) } @@ -843,6 +844,7 @@ private[hive] class HiveClientImpl( serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) - .map(_.asScala.toMap).orNull)) + .map(_.asScala.toMap).orNull), + parameters = hp.getParameters().asScala.toMap) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 41527fcd05154..2ef0e17b60823 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull + val params = s.parameters.asJava val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { // Ignore this partition since it already exists and ignoreIfExists == true @@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { table, spec, location, - null, // partParams + params, // partParams null, // inputFormat null, // outputFormat -1: JInteger, // numBuckets @@ -459,8 +460,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) - parts.foreach { s => + parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } hive.createPartitions(addPartitionDesc) } From 1c490ef31f741b657802414692ad4f0d49685708 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Aug 2016 16:46:19 -0700 Subject: [PATCH 05/10] refactor --- .../spark/sql/execution/command/ddl.scala | 114 +++++++++++------- 1 file changed, 68 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e138ab2214110..20fee8f95059a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -505,51 +505,11 @@ case class AlterTableRecoverPartitionsCommand( val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") - val partitionStats: GenMap[String, (Int, Long)] = if (total > threshold) { - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(serializedPaths.size, 10000) - // gather the fast stats for all the partitions otherwise Hive metastore will list all the - // files for all the new partitions in sequential way, which is super slow. - logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") - spark.sparkContext.parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val pathFilter = getPathFilter(serializableConfiguration.value) - paths.map(new Path(_)).map{ path => - val fs = path.getFileSystem(serializableConfiguration.value) - val statuses = fs.listStatus(path, pathFilter) - (path.toString, (statuses.length, statuses.map(_.getLen).sum)) - } - }.collectAsMap() - } else { - partitionSpecsAndLocs.map { case (_, location) => - val statuses = fs.listStatus(location, pathFilter) - (location.toString, (statuses.length, statuses.map(_.getLen).sum)) - }.toMap - } + val partitionStats = gatherPartitionStats( + spark, partitionSpecsAndLocs, fs, pathFilter, threshold) logInfo(s"Finished to gather the fast stats for all $total partitions.") - var done = 0L - // Hive metastore may not have enough memory to handle millions of partitions in single RPC, - // we should split them into smaller batches. - partitionSpecsAndLocs.iterator.grouped(1024).foreach { batch => - val parts = batch.map { case (spec, location) => - // inherit table storage format (possibly except for location) - val (numFiles, totalSize) = partitionStats(location.toString) - // This two fast stat could prevent Hive metastore to list the files again. - val params = Map(NUM_FILES -> numFiles.toString, TOTAL_SIZE -> totalSize.toString) - CatalogTablePartition( - spec, - table.storage.copy(locationUri = Some(location.toUri.toString)), - params) - }.toArray[CatalogTablePartition] - spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) - done += parts.length - logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") - } + addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) logInfo(s"Recovered all partitions ($total).") Seq.empty[Row] } @@ -564,7 +524,7 @@ case class AlterTableRecoverPartitionsCommand( spec: TablePartitionSpec, partitionNames: Seq[String], threshold: Int): GenSeq[(TablePartitionSpec, Path)] = { - if (partitionNames.length == 0) { + if (partitionNames.isEmpty) { return Seq(spec -> path) } @@ -586,11 +546,11 @@ case class AlterTableRecoverPartitionsCommand( // TODO: Validate the value val value = PartitioningUtils.unescapePathName(ps(1)) // comparing with case-insensitive, but preserve the case - if (columnName == partitionNames(0)) { + if (columnName == partitionNames.head) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1), threshold) } else { - logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it") + logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it") Seq() } } else { @@ -599,6 +559,68 @@ case class AlterTableRecoverPartitionsCommand( } } } + + private def gatherPartitionStats( + spark: SparkSession, + partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], + fs: FileSystem, + pathFilter: PathFilter, + threshold: Int): GenMap[String, (Int, Long)] = { + if (partitionSpecsAndLocs.length > threshold) { + val hadoopConf = spark.sparkContext.hadoopConfiguration + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(serializedPaths.length, 10000) + // gather the fast stats for all the partitions otherwise Hive metastore will list all the + // files for all the new partitions in sequential way, which is super slow. + logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") + spark.sparkContext.parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val pathFilter = getPathFilter(serializableConfiguration.value) + paths.map(new Path(_)).map{ path => + val fs = path.getFileSystem(serializableConfiguration.value) + val statuses = fs.listStatus(path, pathFilter) + (path.toString, (statuses.length, statuses.map(_.getLen).sum)) + } + }.collectAsMap() + } else { + partitionSpecsAndLocs.map { case (_, location) => + val statuses = fs.listStatus(location, pathFilter) + (location.toString, (statuses.length, statuses.map(_.getLen).sum)) + }.toMap + } + } + + private def addPartitions( + spark: SparkSession, + table: CatalogTable, + partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], + partitionStats: GenMap[String, (Int, Long)]): Unit = { + val total = partitionSpecsAndLocs.length + var done = 0L + // Hive metastore may not have enough memory to handle millions of partitions in single RPC, + // we should split them into smaller batches. + val parArray = partitionSpecsAndLocs.toArray.grouped(100).toArray.par + parArray.tasksupport = evalTaskSupport + parArray.foreach { batch => + val parts = batch.map { case (spec, location) => + // inherit table storage format (possibly except for location) + val (numFiles, totalSize) = partitionStats(location.toString) + // This two fast stat could prevent Hive metastore to list the files again. + val params = Map(NUM_FILES -> numFiles.toString, TOTAL_SIZE -> totalSize.toString) + CatalogTablePartition( + spec, + table.storage.copy(locationUri = Some(location.toUri.toString)), + params) + } + spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) + done += parts.length + logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") + } + } } From f30e3875d9d397b8025c92b13f8079b3701464b7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 15 Aug 2016 11:36:59 -0700 Subject: [PATCH 06/10] fix tests --- .../spark/sql/execution/command/ddl.scala | 8 +++- .../sql/execution/command/DDLSuite.scala | 10 ++--- .../sql/hive/client/HiveClientImpl.scala | 7 +++- .../spark/sql/hive/client/HiveShim.scala | 6 ++- .../sql/hive/execution/HiveDDLSuite.scala | 38 +++++++++++++++++++ 5 files changed, 58 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 20fee8f95059a..28960c2da105b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -452,6 +452,7 @@ case class AlterTableRecoverPartitionsCommand( // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java#L88 val NUM_FILES = "numFiles" val TOTAL_SIZE = "totalSize" + val DDL_TIME = "transient_lastDdlTime" private def getPathFilter(hadoopConf: Configuration): PathFilter = { // Dummy jobconf to get to the pathFilter defined in configuration @@ -606,11 +607,16 @@ case class AlterTableRecoverPartitionsCommand( val parArray = partitionSpecsAndLocs.toArray.grouped(100).toArray.par parArray.tasksupport = evalTaskSupport parArray.foreach { batch => + val now = System.currentTimeMillis() / 1000 val parts = batch.map { case (spec, location) => // inherit table storage format (possibly except for location) val (numFiles, totalSize) = partitionStats(location.toString) // This two fast stat could prevent Hive metastore to list the files again. - val params = Map(NUM_FILES -> numFiles.toString, TOTAL_SIZE -> totalSize.toString) + val params = Map(NUM_FILES -> numFiles.toString, + TOTAL_SIZE -> totalSize.toString, + // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. + // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java:L2394 + DDL_TIME -> now.toString) CatalogTablePartition( spec, table.storage.copy(locationUri = Some(location.toUri.toString)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a8d6a384ae0a0..ee06f11e05262 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -871,13 +871,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("alter table: recover partitions (sequential)") { - withSQLConf("spark.rdd.parallelListingThreshold" -> "1") { + withSQLConf("spark.rdd.parallelListingThreshold" -> "10") { testRecoverPartitions() } } test("alter table: recover partition (parallel)") { - withSQLConf("spark.rdd.parallelListingThreshold" -> "10") { + withSQLConf("spark.rdd.parallelListingThreshold" -> "1") { testRecoverPartitions() } } @@ -921,10 +921,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("ALTER TABLE tab1 RECOVER PARTITIONS") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) - assert(catalog.getPartition(tableIdent, part1).parameters == - Map("numFiles" -> "1", "totalSize" -> "0")) - assert(catalog.getPartition(tableIdent, part2).parameters == - Map("numFiles" -> "2", "totalSize" -> "0")) + assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1") + assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2") } finally { fs.delete(root, true) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5a78a33624c71..49b1d11b6a3e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -829,7 +829,9 @@ private[hive] class HiveClientImpl( tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) - tpart.setParameters(p.parameters.asJava) + if (p.parameters.nonEmpty) { + tpart.setParameters(p.parameters.asJava) + } new HivePartition(ht, tpart) } @@ -845,6 +847,7 @@ private[hive] class HiveClientImpl( compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), - parameters = hp.getParameters().asScala.toMap) + parameters = + if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 2ef0e17b60823..32387707612f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -267,7 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull - val params = s.parameters.asJava + val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { // Ignore this partition since it already exists and ignoreIfExists == true @@ -462,7 +462,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) - addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) + if (s.parameters.nonEmpty) { + addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) + } } hive.createPartitions(addPartitionDesc) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 970b6885f6254..7f90988838496 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -378,6 +378,44 @@ class HiveDDLSuite expectedSerdeProps) } + test("MSCK REPAIR RABLE") { + val catalog = spark.sessionState.catalog + val tableIdent = TableIdentifier("tab1") + sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)") + val part1 = Map("a" -> "1", "b" -> "5") + val part2 = Map("a" -> "2", "b" -> "6") + val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) + val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + // valid + fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file + fs.mkdirs(new Path(new Path(root, "A=2"), "B=6")) + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file + fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary")) + + // invalid + fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name + fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order + fs.mkdirs(new Path(root, "a=4")) // not enough columns + fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file + fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS + fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary + fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with . + + try { + sql("MSCK REPAIR TABLE tab1") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2)) + assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1") + assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2") + } finally { + fs.delete(root, true) + } + } + test("drop table using drop view") { withTable("tab1") { sql("CREATE TABLE tab1(c1 int)") From a4d07db6b9d43184d9622587add6abebc256923d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 15 Aug 2016 13:55:59 -0700 Subject: [PATCH 07/10] fix tests --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 49b1d11b6a3e9..689ad5a01a32e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -829,9 +829,6 @@ private[hive] class HiveClientImpl( tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) - if (p.parameters.nonEmpty) { - tpart.setParameters(p.parameters.asJava) - } new HivePartition(ht, tpart) } From 8a18bf7e66b78bf21148844863fd1f62af1dcacf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 18 Aug 2016 14:15:28 -0700 Subject: [PATCH 08/10] add config, add partition in sequential --- .../spark/sql/execution/command/ddl.scala | 34 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 28960c2da105b..37ab9d46ebd08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -449,7 +449,7 @@ case class AlterTableRecoverPartitionsCommand( // These are two fast stats in Hive Metastore // see https://github.com/apache/hive/blob/master/ - // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java#L88 + // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java val NUM_FILES = "numFiles" val TOTAL_SIZE = "totalSize" val DDL_TIME = "transient_lastDdlTime" @@ -506,8 +506,11 @@ case class AlterTableRecoverPartitionsCommand( val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") - val partitionStats = gatherPartitionStats( - spark, partitionSpecsAndLocs, fs, pathFilter, threshold) + val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { + gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) + } else { + GenMap.empty[String, (Int, Long)] + } logInfo(s"Finished to gather the fast stats for all $total partitions.") addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) @@ -574,7 +577,8 @@ case class AlterTableRecoverPartitionsCommand( // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(serializedPaths.length, 10000) + val numParallelism = Math.min(serializedPaths.length, + Math.min(spark.sparkContext.defaultParallelism, 10000)) // gather the fast stats for all the partitions otherwise Hive metastore will list all the // files for all the new partitions in sequential way, which is super slow. logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") @@ -603,20 +607,20 @@ case class AlterTableRecoverPartitionsCommand( val total = partitionSpecsAndLocs.length var done = 0L // Hive metastore may not have enough memory to handle millions of partitions in single RPC, - // we should split them into smaller batches. - val parArray = partitionSpecsAndLocs.toArray.grouped(100).toArray.par - parArray.tasksupport = evalTaskSupport - parArray.foreach { batch => + // we should split them into smaller batches. Since Hive client is not thread safe, we cannot + // do this in parallel. + partitionSpecsAndLocs.toIterator.grouped(1000).foreach { batch => val now = System.currentTimeMillis() / 1000 val parts = batch.map { case (spec, location) => + val params = partitionStats.get(location.toString).map { case (numFiles, totalSize) => + // This two fast stat could prevent Hive metastore to list the files again. + Map(NUM_FILES -> numFiles.toString, + TOTAL_SIZE -> totalSize.toString, + // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. + // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java + DDL_TIME -> now.toString) + }.getOrElse(Map.empty) // inherit table storage format (possibly except for location) - val (numFiles, totalSize) = partitionStats(location.toString) - // This two fast stat could prevent Hive metastore to list the files again. - val params = Map(NUM_FILES -> numFiles.toString, - TOTAL_SIZE -> totalSize.toString, - // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. - // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java:L2394 - DDL_TIME -> now.toString) CatalogTablePartition( spec, table.storage.copy(locationUri = Some(location.toUri.toString)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b867a6551feb9..287e57b26aa51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -310,6 +310,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats") + .internal() + .doc("When true, fast stats (number of files and total size of all files) will be gathered" + + "while reparing table partitions.") + .booleanConf + .createWithDefault(false) + // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has // a length restriction of 4000 characters). We will split the JSON string of a schema @@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) + def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) From 0672c89ed3fe173a9fcf0c9a08e1ff39536c30b4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 18 Aug 2016 15:54:33 -0700 Subject: [PATCH 09/10] Update SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 287e57b26aa51..dc271da6febe5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -315,7 +315,7 @@ object SQLConf { .doc("When true, fast stats (number of files and total size of all files) will be gathered" + "while reparing table partitions.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has From b58ce2ad8bb65f142ccd7f43b1fe5a1a46a7f58f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 23 Aug 2016 13:57:43 -0700 Subject: [PATCH 10/10] address comments --- .../spark/sql/execution/command/ddl.scala | 33 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 3 +- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 37ab9d46ebd08..676c181579428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -433,6 +433,9 @@ case class AlterTableDropPartitionCommand( } + +case class PartitionStatistics(numFiles: Int, totalSize: Long) + /** * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and * update the catalog. @@ -447,7 +450,7 @@ case class AlterTableRecoverPartitionsCommand( tableName: TableIdentifier, cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { - // These are two fast stats in Hive Metastore + // These are list of statistics that can be collected quickly without requiring a scan of the data // see https://github.com/apache/hive/blob/master/ // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java val NUM_FILES = "numFiles" @@ -509,7 +512,7 @@ case class AlterTableRecoverPartitionsCommand( val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) } else { - GenMap.empty[String, (Int, Long)] + GenMap.empty[String, PartitionStatistics] } logInfo(s"Finished to gather the fast stats for all $total partitions.") @@ -569,7 +572,7 @@ case class AlterTableRecoverPartitionsCommand( partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], fs: FileSystem, pathFilter: PathFilter, - threshold: Int): GenMap[String, (Int, Long)] = { + threshold: Int): GenMap[String, PartitionStatistics] = { if (partitionSpecsAndLocs.length > threshold) { val hadoopConf = spark.sparkContext.hadoopConfiguration val serializableConfiguration = new SerializableConfiguration(hadoopConf) @@ -588,13 +591,13 @@ case class AlterTableRecoverPartitionsCommand( paths.map(new Path(_)).map{ path => val fs = path.getFileSystem(serializableConfiguration.value) val statuses = fs.listStatus(path, pathFilter) - (path.toString, (statuses.length, statuses.map(_.getLen).sum)) + (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) } }.collectAsMap() } else { partitionSpecsAndLocs.map { case (_, location) => val statuses = fs.listStatus(location, pathFilter) - (location.toString, (statuses.length, statuses.map(_.getLen).sum)) + (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) }.toMap } } @@ -603,22 +606,24 @@ case class AlterTableRecoverPartitionsCommand( spark: SparkSession, table: CatalogTable, partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], - partitionStats: GenMap[String, (Int, Long)]): Unit = { + partitionStats: GenMap[String, PartitionStatistics]): Unit = { val total = partitionSpecsAndLocs.length var done = 0L // Hive metastore may not have enough memory to handle millions of partitions in single RPC, // we should split them into smaller batches. Since Hive client is not thread safe, we cannot // do this in parallel. - partitionSpecsAndLocs.toIterator.grouped(1000).foreach { batch => + val batchSize = 100 + partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => val now = System.currentTimeMillis() / 1000 val parts = batch.map { case (spec, location) => - val params = partitionStats.get(location.toString).map { case (numFiles, totalSize) => - // This two fast stat could prevent Hive metastore to list the files again. - Map(NUM_FILES -> numFiles.toString, - TOTAL_SIZE -> totalSize.toString, - // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. - // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java - DDL_TIME -> now.toString) + val params = partitionStats.get(location.toString).map { + case PartitionStatistics(numFiles, totalSize) => + // This two fast stat could prevent Hive metastore to list the files again. + Map(NUM_FILES -> numFiles.toString, + TOTAL_SIZE -> totalSize.toString, + // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. + // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java + DDL_TIME -> now.toString) }.getOrElse(Map.empty) // inherit table storage format (possibly except for location) CatalogTablePartition( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 287e57b26aa51..2ad8ce403c68b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -313,7 +313,8 @@ object SQLConf { val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats") .internal() .doc("When true, fast stats (number of files and total size of all files) will be gathered" + - "while reparing table partitions.") + " in parallel while repairing table partitions to avoid the sequential listing in Hive" + + " metastore.") .booleanConf .createWithDefault(false)