Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ object FileCommitProtocol extends Logging {
def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}

def overwriteStagingDir(path: String, jobId: String): Path = {
new Path(new Path(path).getParent, s".${new Path(path).getName}-spark-staging-" + jobId)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class HadoopMapReduceCommitProtocol(
}
}

fs.delete(stagingDir, true)
fs.deleteOnExit(stagingDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4351,6 +4351,9 @@ class SQLConf extends Serializable with Logging {

def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)

def useOverwriteFileCommitProtocol: Boolean = fileCommitProtocolClass ==
"org.apache.spark.sql.execution.datasources.SQLOverwriteHadoopMapReduceCommitProtocol"

def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object FileFormatWriter extends Logging {
* processing statistics.
* @return The set of all partition paths that were updated during this write job.
*/
// scalastyle:off argcount
def write(
sparkSession: SparkSession,
plan: SparkPlan,
Expand All @@ -87,7 +88,8 @@ object FileFormatWriter extends Logging {
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String],
numStaticPartitionCols: Int = 0)
numStaticPartitionCols: Int = 0,
preCommitJob: Option[() => Unit] = None)
: Set[String] = {
require(partitionColumns.size >= numStaticPartitionCols)

Expand Down Expand Up @@ -234,6 +236,7 @@ object FileFormatWriter extends Logging {

val commitMsgs = ret.map(_.commitMsg)

preCommitJob.map(_())
logInfo(s"Start to commit write Job ${description.uuid}.")
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.io.IOException

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.io.FileCommitProtocol
Expand Down Expand Up @@ -84,7 +86,8 @@ case class InsertIntoHadoopFsRelationCommand(
outputColumnNames,
s"when inserting into $outputPath",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val useOverwriteFileCommitProtocol =
sparkSession.sessionState.conf.useOverwriteFileCommitProtocol
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand Down Expand Up @@ -125,6 +128,9 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
} else if (useOverwriteFileCommitProtocol) {
// For `SQLOverwriteHadoopMapReduceCommitProtocol`, do not delete directories first.
true
} else if (dynamicPartitionOverwrite) {
// For dynamic partition overwrite, do not delete partition directories ahead.
true
Expand Down Expand Up @@ -168,13 +174,27 @@ case class InsertIntoHadoopFsRelationCommand(

// For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files
// will be renamed from staging path to final output path during commit job
val committerOutputPath = if (dynamicPartitionOverwrite) {
val committerOutputPath = if (useOverwriteFileCommitProtocol && mode == SaveMode.Overwrite) {
FileCommitProtocol.overwriteStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else if (dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
qualifiedOutputPath
}

// When `dynamicPartitionOverwrite` is true, `SQLOverwriteHadoopMapReduceCommitProtocol`
// will execute as the method `dynamicPartitionOverwrite`, so Spark don't need to delete
// matching partition here.
val preCommitJob = if (useOverwriteFileCommitProtocol &&
mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
Some(() =>
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer))
} else {
None
}

val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
Expand All @@ -188,8 +208,44 @@ case class InsertIntoHadoopFsRelationCommand(
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options,
numStaticPartitionCols = staticPartitions.size)
numStaticPartitionCols = staticPartitions.size,
preCommitJob = preCommitJob)

if (useOverwriteFileCommitProtocol && mode == SaveMode.Overwrite) {
if (partitionColumns.isEmpty) {
// Non-partition table overwrite should rename staging dir to output path
if (!fs.rename(committerOutputPath, qualifiedOutputPath)) {
throw new IOException(s"Failed to rename $committerOutputPath to $outputPath")
}
} else if (staticPartitions.size == partitionColumns.size) {
// For static partition overwrite, if custom partition path is not empty, result data
// haven been written to target custom partition path during commitJob.
if (!customPartitionLocations.contains(staticPartitions)) {
val stagingStaticPartitionPath = committerOutputPath.suffix(staticPartitionPrefix)
val targetLocation = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (!fs.exists(targetLocation.getParent)) {
fs.mkdirs(targetLocation.getParent)
}
if (!fs.rename(stagingStaticPartitionPath, targetLocation)) {
throw new IOException(s"Failed to rename $stagingStaticPartitionPath to " +
s"$targetLocation")
}
}
} else if (dynamicPartitionOverwrite) {
// Same behavior as default, do nothing here.
} else {
// STATIC mode dynamic partition overwrite
val targetLocation = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (!fs.exists(targetLocation.getParent)) {
fs.mkdirs(targetLocation.getParent)
}
val stagingStaticPartitionPath = committerOutputPath.suffix(staticPartitionPrefix)
if (!fs.rename(stagingStaticPartitionPath, targetLocation)) {
throw new IOException(s"Failed to rename $stagingStaticPartitionPath to " +
s"$targetLocation")
}
}
}

// update metastore partition metadata
if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
Expand Down Expand Up @@ -218,6 +274,17 @@ case class InsertIntoHadoopFsRelationCommand(
Seq.empty[Row]
}


def staticPartitionPrefix: String = {
if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _))
}.mkString("/")
} else {
""
}
}

/**
* Deletes all partition files that match the specified static prefix. Partitions with custom
* locations are also cleared based on the custom locations map given to this class.
Expand All @@ -227,13 +294,6 @@ case class InsertIntoHadoopFsRelationCommand(
qualifiedOutputPath: Path,
customPartitionLocations: Map[TablePartitionSpec, String],
committer: FileCommitProtocol): Unit = {
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _))
}.mkString("/")
} else {
""
}
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.io.FileCommitProtocol.overwriteStagingDir
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol

/**
* A variant of [[HadoopMapReduceCommitProtocol]] that used for overwrite save mode.
*/
class SQLOverwriteHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
// Override stagingDir here to keep use same staging dir when dynamicPartitionOverwrite is true.
@transient override lazy val stagingDir: Path = overwriteStagingDir(path, jobId)
}
Loading