Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ object CatalogStorageFormat {
*
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
* @param parameters some parameters for the partition, for example, stats.
*/
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat)
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty)


/**
Expand Down
156 changes: 129 additions & 27 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution.command

import scala.collection.GenSeq
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down Expand Up @@ -422,6 +424,9 @@ case class AlterTableDropPartitionCommand(

}


case class PartitionStatistics(numFiles: Int, totalSize: Long)

/**
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
* update the catalog.
Expand All @@ -435,6 +440,31 @@ case class AlterTableDropPartitionCommand(
case class AlterTableRecoverPartitionsCommand(
tableName: TableIdentifier,
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {

// These are list of statistics that can be collected quickly without requiring a scan of the data
// see https://github.com/apache/hive/blob/master/
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
val NUM_FILES = "numFiles"
val TOTAL_SIZE = "totalSize"
val DDL_TIME = "transient_lastDdlTime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some more explanation about the hive metastore bug that requires this parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are comments on this below


private def getPathFilter(hadoopConf: Configuration): PathFilter = {
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
pathFilter == null || pathFilter.accept(path)
} else {
false
}
}
}
}

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
Expand All @@ -449,10 +479,6 @@ case class AlterTableRecoverPartitionsCommand(
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
}
if (table.tableType != CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on external tables: $tableName")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
Expand All @@ -463,19 +489,26 @@ case class AlterTableRecoverPartitionsCommand(
}

val root = new Path(table.storage.locationUri.get)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)

val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
val partitionSpecsAndLocs = scanPartitions(
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
GenMap.empty[String, PartitionStatistics]
}
spark.sessionState.catalog.createPartitions(tableName,
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
logInfo(s"Finished to gather the fast stats for all $total partitions.")

addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}

Expand All @@ -487,15 +520,16 @@ case class AlterTableRecoverPartitionsCommand(
filter: PathFilter,
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.length == 0) {
partitionNames: Seq[String],
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}

val statuses = fs.listStatus(path)
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val statuses = fs.listStatus(path, filter)
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
parArray.tasksupport = evalTaskSupport
parArray
Expand All @@ -510,21 +544,89 @@ case class AlterTableRecoverPartitionsCommand(
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
// comparing with case-insensitive, but preserve the case
if (columnName == partitionNames(0)) {
scanPartitions(
spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
if (columnName == partitionNames.head) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
partitionNames.drop(1), threshold)
} else {
logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
Seq()
}
} else {
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
logWarning(s"ignore ${new Path(path, name)}")
}
logWarning(s"ignore ${new Path(path, name)}")
Seq()
}
}
}

private def gatherPartitionStats(
spark: SparkSession,
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
fs: FileSystem,
pathFilter: PathFilter,
threshold: Int): GenMap[String, PartitionStatistics] = {
if (partitionSpecsAndLocs.length > threshold) {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(serializedPaths.length,
Math.min(spark.sparkContext.defaultParallelism, 10000))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to add a comment about why we picked 10000 here. If there is no good reason, we can make it configurable too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from HadoopFsRelation

// gather the fast stats for all the partitions otherwise Hive metastore will list all the
// files for all the new partitions in sequential way, which is super slow.
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
spark.sparkContext.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val pathFilter = getPathFilter(serializableConfiguration.value)
paths.map(new Path(_)).map{ path =>
val fs = path.getFileSystem(serializableConfiguration.value)
val statuses = fs.listStatus(path, pathFilter)
(path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
}
}.collectAsMap()
} else {
partitionSpecsAndLocs.map { case (_, location) =>
val statuses = fs.listStatus(location, pathFilter)
(location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
}.toMap
}
}

private def addPartitions(
spark: SparkSession,
table: CatalogTable,
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
partitionStats: GenMap[String, PartitionStatistics]): Unit = {
val total = partitionSpecsAndLocs.length
var done = 0L
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
// do this in parallel.
val batchSize = 100
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
val now = System.currentTimeMillis() / 1000
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
case PartitionStatistics(numFiles, totalSize) =>
// This two fast stat could prevent Hive metastore to list the files again.
Map(NUM_FILES -> numFiles.toString,
TOTAL_SIZE -> totalSize.toString,
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
DDL_TIME -> now.toString)
}.getOrElse(Map.empty)
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
table.storage.copy(locationUri = Some(location.toUri.toString)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
done += parts.length
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
.internal()
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
" in parallel while repairing table partitions to avoid the sequential listing in Hive" +
" metastore.")
.booleanConf
.createWithDefault(true)

// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
Expand Down Expand Up @@ -608,6 +616,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,13 +817,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
}
}

test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
testRecoverPartitions()
}
}
Expand All @@ -846,7 +846,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))

// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
Expand All @@ -860,6 +867,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} finally {
fs.delete(root, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
compressed = apiPartition.getSd.isCompressed,
properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull))
.map(_.asScala.toMap).orNull),
parameters =
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists == true
Expand All @@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
table,
spec,
location,
null, // partParams
params, // partParams
null, // inputFormat
null, // outputFormat
-1: JInteger, // numBuckets
Expand Down Expand Up @@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.foreach { s =>
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
if (s.parameters.nonEmpty) {
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
}
hive.createPartitions(addPartitionDesc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,44 @@ class HiveDDLSuite
expectedSerdeProps)
}

test("MSCK REPAIR RABLE") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1")
sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
val part1 = Map("a" -> "1", "b" -> "5")
val part2 = Map("a" -> "2", "b" -> "6")
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))

// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
fs.mkdirs(new Path(root, "a=4")) // not enough columns
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .

try {
sql("MSCK REPAIR TABLE tab1")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} finally {
fs.delete(root, true)
}
}

test("drop table using drop view") {
withTable("tab1") {
sql("CREATE TABLE tab1(c1 int)")
Expand Down