diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 5cd7397ea358f..bdbef8d63bfbc 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,11 @@ package org.apache.spark.internal.io +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ @@ -50,6 +55,22 @@ import org.apache.spark.util.Utils abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ + /** + * Get the final directory where the result data will be placed once the job + * is committed. This may be null, in which case, there is no output + * path to write data to and won't write any data. + */ + def getOutputPath: Path = null + + /** + * Get the directory that the task should write results into. + * Warning: there's no guarantee that this work path is on the same + * FS as the final output, or that it's visible across machines. + * May be null, in which case, there is no output path to write data to + * and won't write any data. + */ + def getWorkPath: Path = null + /** * Setups up a job. Must be called on the driver before any other methods can be invoked. */ @@ -230,6 +251,89 @@ object FileCommitProtocol extends Logging { def getStagingDir(path: String, jobId: String): Path = { new Path(path, ".spark-staging-" + jobId) } + + def externalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String, + engineType: String, + jobId: String): Path = { + val extURI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir, engineType, jobId) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir, engineType, jobId), + "-ext-10000") + } + } + + private def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String, + engineType: String, + jobId: String): Path = { + // Hive uses 10000 + new Path(getStagingDir(path, hadoopConf, stagingDir, engineType, jobId), "-ext-10000") + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String, + engineType: String, + jobId: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir, + engineType, + jobId) + } + + def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String, + engineType: String, + jobId: String): Path = { + val inputPathName: String = inputPath.toString + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + + val dir = fs.makeQualified( + new Path(stagingPathName + "_" + executionId(engineType) + "-" + jobId)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + dir + } + + private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { + val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR + val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR + path1.startsWith(path2) + } + + def executionId(engineType: String): String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + s"${engineType}_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index a39e9abd9bdc4..2f356d5231730 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -73,7 +73,7 @@ class HadoopMapReduceCommitProtocol( import FileCommitProtocol._ /** OutputCommitter from Hadoop is not serializable so marking it transient. */ - @transient private var committer: OutputCommitter = _ + @transient protected var committer: OutputCommitter = _ /** * Checks whether there are files to be committed to a valid output location. @@ -106,6 +106,16 @@ class HadoopMapReduceCommitProtocol( */ protected def stagingDir = getStagingDir(path, jobId) + override def getOutputPath: Path = { + if (dynamicPartitionOverwrite) { + stagingDir + } else { + new Path(path) + } + } + + override def getWorkPath: Path = getOutputPath + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() // If OutputFormat is Configurable, we should set conf to it. diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index fc5d0a0b3a7f5..467de4509e7fe 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -60,9 +60,6 @@ class PathOutputCommitProtocol( throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) } - /** The committer created. */ - @transient private var committer: PathOutputCommitter = _ - require(dest != null, "Null destination specified") private[cloud] val destination: String = dest @@ -115,7 +112,7 @@ class PathOutputCommitProtocol( logTrace(s"Committer $committer may not be tolerant of task commit failures") } } - committer + committer.asInstanceOf[PathOutputCommitter] } /** @@ -131,7 +128,7 @@ class PathOutputCommitProtocol( dir: Option[String], spec: FileNameSpec): String = { - val workDir = committer.getWorkPath + val workDir = committer.asInstanceOf[PathOutputCommitter].getWorkPath val parent = dir.map { d => new Path(workDir, d) }.getOrElse(workDir) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 42979a68d8578..b946907d1b31c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1282,6 +1282,17 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + val EXEC_STAGING_DIR = buildConf("spark.sql.exec.stagingDir") + .doc("The staging directory of Spark job. Spark uses it to deal with files with " + + "absolute output path, or writing data into partitioned directory " + + "when dynamic partition overwrite mode is on. " + + "Default value means staging directory is under table path.") + .version("3.3.0") + .internal() + .stringConf + .checkValue(!_.isEmpty, "Should not pass an empty string as staging diretory.") + .createWithDefault(".spark-staging") + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of paths allowed for listing files at driver side. If the number " + @@ -3966,6 +3977,8 @@ class SQLConf extends Serializable with Logging { def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) + def stagingDir: String = getConf(SQLConf.EXEC_STAGING_DIR) + def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 267b360b474ca..f2a8c2372b551 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -162,15 +162,6 @@ case class InsertIntoHadoopFsRelationCommand( } } - // For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files - // will be renamed from staging path to final output path during commit job - val committerOutputPath = if (dynamicPartitionOverwrite) { - FileCommitProtocol.getStagingDir(outputPath.toString, jobId) - .makeQualified(fs.getUri, fs.getWorkingDirectory) - } else { - qualifiedOutputPath - } - val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -178,7 +169,7 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - committerOutputPath.toString, customPartitionLocations, outputColumns), + committer.getOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f091..9e0b2a1a00552 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -21,8 +21,9 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} import org.apache.spark.sql.internal.SQLConf /** @@ -36,6 +37,10 @@ class SQLHadoopMapReduceCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) with Serializable with Logging { + override val stagingDir: Path = + FileCommitProtocol.externalTempPath(new Path(path), SparkHadoopUtil.get.conf, + SQLConf.get.stagingDir, "spark", jobId) + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = super.setupCommitter(context) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathHadoopMapReduceCommitProtocol.scala new file mode 100644 index 0000000000000..51ad8ae5b0ac2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathHadoopMapReduceCommitProtocol.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext} + +import org.apache.spark.internal.io.FileNameSpec + +/** + * A variant of [[SQLHadoopMapReduceCommitProtocol]] that allows specifying the actual + * Hadoop output committer using an option specified in SQLConf. + */ +class SQLPathHadoopMapReduceCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean = false) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + // This variable only can be used after setupCommitter. + private lazy val sqlPathOutputCommitter: SQLPathOutputCommitter = + committer.asInstanceOf[SQLPathOutputCommitter] + + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { + val committer = new SQLPathOutputCommitter(stagingDir, new Path(path), context) + logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + committer + } + + override def newTaskTempFile( + taskContext: TaskAttemptContext, + dir: Option[String], + spec: FileNameSpec): String = { + val filename = getFilename(taskContext, spec) + dir.map { d => + new Path(new Path( + sqlPathOutputCommitter.getTaskAttemptPath(taskContext), d), filename).toString + }.getOrElse { + new Path(sqlPathOutputCommitter.getTaskAttemptPath(taskContext), filename).toString + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathOutputCommitter.scala new file mode 100644 index 0000000000000..22cd72e353f96 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLPathOutputCommitter.scala @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{FileNotFoundException, IOException} + +import com.google.common.base.Preconditions +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext, TaskAttemptID} +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter +import org.apache.hadoop.util.{DurationInfo, Progressable} + +import org.apache.spark.internal.Logging + +class SQLPathOutputCommitter( + stagingDir: Path, + outputPath: Path, + context: TaskAttemptContext) + extends PathOutputCommitter(outputPath, context) with Logging { + + val PENDING_DIR_NAME = "_temporary" + val SUCCEEDED_FILE_NAME = "_SUCCESS" + val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs" + val FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version" + val FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2 + val FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = "mapreduce.fileoutputcommitter.cleanup.skipped" + val FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false + val FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED = + "mapreduce.fileoutputcommitter.cleanup-failures.ignored" + val FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false + val FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS = "mapreduce.fileoutputcommitter.failures.attempts" + val FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1 + val FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED = + "mapreduce.fileoutputcommitter.task.cleanup.enabled" + val FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false + private val workPath: Path = if (getOutputPath != null) { + Preconditions.checkNotNull(getTaskAttemptPath(context, getOutputPath), + "Null task attempt path in %s and output path %s", context, outputPath) + } else { + null + } + private val algorithmVersion = context.getConfiguration.getInt( + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT) + private val skipCleanup = context.getConfiguration.getBoolean( + FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED, FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT) + private val ignoreCleanupFailures = context.getConfiguration.getBoolean( + FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED, + FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT) + + private class CommittedTaskFilter extends PathFilter { + def accept(path: Path): Boolean = !(PENDING_DIR_NAME == path.getName) + } + + override def getOutputPath: Path = + outputPath.getFileSystem(context.getConfiguration).makeQualified(outputPath) + + def getPendingJobAttemptsPath(): Path = { + getPendingJobAttemptsPath(stagingDir) + } + + def getPendingJobAttemptsPath(out: Path): Path = { + new Path(out, PENDING_DIR_NAME) + } + + private def getAppAttemptId(context: JobContext) = { + context.getConfiguration.getInt("mapreduce.job.application.attempt.id", 0) + } + + def getJobAttemptPath(context: JobContext): Path = { + getJobAttemptPath(context, this.stagingDir) + } + + def getJobAttemptPath(context: JobContext, out: Path): Path = { + getJobAttemptPath(getAppAttemptId(context), out) + } + + protected def getJobAttemptPath(appAttemptId: Int): Path = { + getJobAttemptPath(appAttemptId, this.stagingDir) + } + + private def getJobAttemptPath(appAttemptId: Int, out: Path) = { + new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)) + } + + private def getPendingTaskAttemptsPath(context: JobContext): Path = { + getPendingTaskAttemptsPath(context, this.stagingDir) + } + + private def getPendingTaskAttemptsPath(context: JobContext, out: Path) = { + new Path(getJobAttemptPath(context, out), "_temporary") + } + + def getTaskAttemptPath(context: TaskAttemptContext): Path = { + new Path(this.getPendingTaskAttemptsPath(context), String.valueOf(context.getTaskAttemptID)) + } + + def getTaskAttemptPath(context: TaskAttemptContext, out: Path): Path = { + new Path(getPendingTaskAttemptsPath(context, out), String.valueOf(context.getTaskAttemptID)) + } + + def getCommittedTaskPath(context: TaskAttemptContext): Path = { + this.getCommittedTaskPath(getAppAttemptId(context), context) + } + + def getCommittedTaskPath(context: TaskAttemptContext, out: Path): Path = { + getCommittedTaskPath(getAppAttemptId(context), context, out) + } + + protected def getCommittedTaskPath(appAttemptId: Int, context: TaskAttemptContext) = { + new Path(this.getJobAttemptPath(appAttemptId), + String.valueOf(context.getTaskAttemptID.getTaskID)) + } + + private def getCommittedTaskPath( + appAttemptId: Int, + context: TaskAttemptContext, + out: Path): Path = { + new Path(getJobAttemptPath(appAttemptId, out), + String.valueOf(context.getTaskAttemptID.getTaskID.toString)) + } + + @throws[IOException] + private def getAllCommittedTaskPaths(context: JobContext) = { + val jobAttemptPath = this.getJobAttemptPath(context) + val fs = jobAttemptPath.getFileSystem(context.getConfiguration) + fs.listStatus(jobAttemptPath, new CommittedTaskFilter) + } + + @throws[IOException] + override def setupJob(context: JobContext): Unit = { + if (hasOutputPath) { + val jobAttemptPath = getJobAttemptPath(context) + val fs = jobAttemptPath.getFileSystem(context.getConfiguration) + if (!fs.mkdirs(jobAttemptPath)) { + logError("Mkdirs failed to create " + jobAttemptPath) + } + } else { + logWarning("Output Path is null in setupJob()") + } + } + + + @throws[IOException] + override def commitJob(context: JobContext): Unit = { + val maxAttemptsOnFailure = if (this.isCommitJobRepeatable(context)) { + context.getConfiguration.getInt( + FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) + } else { + 1 + } + var attempt = 0 + var jobCommitNotFinished = true + + while (jobCommitNotFinished) { + try { + this.commitJobInternal(context) + jobCommitNotFinished = false + } catch { + case var6: Exception => + attempt += 1 + if (attempt >= maxAttemptsOnFailure) throw var6 + logWarning("Exception get thrown in job commit, retry (" + attempt + ") time.", var6) + } + } + } + + @throws[IOException] + protected def commitJobInternal(context: JobContext): Unit = { + if (!this.hasOutputPath) { + logWarning("Output Path is null in commitJob()") + } else { + val finalOutput = this.getOutputPath + val fs = finalOutput.getFileSystem(context.getConfiguration) + if (this.algorithmVersion == 1) { + val committedTaskPaths = this.getAllCommittedTaskPaths(context) + committedTaskPaths.foreach { path => + this.mergePaths(fs, path, finalOutput, context) + } + } + if (this.skipCleanup) { + logInfo("Skip cleanup the _temporary folders under job's output directory in commitJob.") + } else { + try { + this.cleanupJob(context) + } catch { + case var8: IOException => + if (!this.ignoreCleanupFailures) { + throw var8 + } + logError("Error in cleanup job, manually cleanup is needed.", var8) + } + } + if (context.getConfiguration.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { + val markerPath = new Path(this.outputPath, "_SUCCESS") + if (this.isCommitJobRepeatable(context)) { + fs.create(markerPath, true).close() + } else { + fs.create(markerPath).close() + } + } + } + } + + @throws[IOException] + private def mergePaths(fs: FileSystem, from: FileStatus, to: Path, context: JobContext): Unit = { + val d: DurationInfo = + new DurationInfo(log, false, "Merging data from %s to %s", Array[AnyRef](from, to)) + var var6: Throwable = null + try { + this.reportProgress(context) + var toStat: FileStatus = null + try { + toStat = fs.getFileStatus(to) + } catch { + case _: FileNotFoundException => + toStat = null + } + if (from.isFile) { + if (toStat != null && !fs.delete(to, true)) { + throw new IOException("Failed to delete " + to) + } + if (!fs.rename(from.getPath, to)) { + throw new IOException("Failed to rename " + from + " to " + to) + } + } else { + if (from.isDirectory) { + if (toStat != null) { + if (!toStat.isDirectory) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to) + } + this.renameOrMerge(fs, from, to, context) + } else { + val var8: Array[FileStatus] = fs.listStatus(from.getPath) + val var9: Int = var8.length + for (var10 <- 0 until var9) { + val subFrom: FileStatus = var8(var10) + val subTo: Path = new Path(to, subFrom.getPath.getName) + this.mergePaths(fs, subFrom, subTo, context) + } + } + } else { + this.renameOrMerge(fs, from, to, context) + } + } + } + } catch { + case var22: Throwable => + var6 = var22 + throw var22 + } finally { + if (d != null) { + if (var6 != null) { + try { + d.close() + } catch { + case var20: Throwable => + var6.addSuppressed(var20) + } + } else { + d.close() + } + } + } + } + + private def reportProgress(context: JobContext): Unit = { + if (context.isInstanceOf[Progressable]) { + (context.asInstanceOf[Progressable]).progress() + } + } + + @throws[IOException] + private def renameOrMerge( + fs: FileSystem, + from: FileStatus, + to: Path, + context: JobContext): Unit = { + if (this.algorithmVersion == 1) { + if (!(fs.rename(from.getPath, to))) { + throw new IOException("Failed to rename " + from + " to " + to) + } + } else { + fs.mkdirs(to) + val var5: Array[FileStatus] = fs.listStatus(from.getPath) + val var6: Int = var5.length + for (var7 <- 0 until var6) { + val subFrom: FileStatus = var5(var7) + val subTo: Path = new Path(to, subFrom.getPath.getName) + this.mergePaths(fs, subFrom, subTo, context) + } + } + } + + /** @deprecated */ + @deprecated + @throws[IOException] + override def cleanupJob(context: JobContext): Unit = { + if (this.hasOutputPath) { + val pendingJobAttemptsPath: Path = this.getPendingJobAttemptsPath + val fs: FileSystem = pendingJobAttemptsPath.getFileSystem(context.getConfiguration) + try { + fs.delete(pendingJobAttemptsPath, true) + } catch { + case var5: FileNotFoundException => + if (!(this.isCommitJobRepeatable(context))) { + throw var5 + } + } + } else { + logWarning("Output Path is null in cleanupJob()") + } + } + + @throws[IOException] + override def abortJob(context: JobContext, state: JobStatus.State): Unit = { + this.cleanupJob(context) + } + + @throws[IOException] + override def setupTask(context: TaskAttemptContext): Unit = { + } + + @throws[IOException] + override def commitTask(context: TaskAttemptContext): Unit = { + this.commitTask(context, null.asInstanceOf[Path]) + } + + @throws[IOException] + def commitTask(context: TaskAttemptContext, taskAttemptPath: Path): Unit = { + val attemptId: TaskAttemptID = context.getTaskAttemptID + var currentTaskAttemptPath = taskAttemptPath + if (this.hasOutputPath) { + context.progress() + if (currentTaskAttemptPath == null) { + currentTaskAttemptPath = this.getTaskAttemptPath(context) + } + val fs: FileSystem = currentTaskAttemptPath.getFileSystem(context.getConfiguration) + var taskAttemptDirStatus: FileStatus = null + try { + taskAttemptDirStatus = fs.getFileStatus(currentTaskAttemptPath) + } catch { + case _: FileNotFoundException => + taskAttemptDirStatus = null + } + if (taskAttemptDirStatus != null) { + if (this.algorithmVersion == 1) { + val committedTaskPath = this.getCommittedTaskPath(context) + if (fs.exists(committedTaskPath) && !fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete " + committedTaskPath) + } + if (!fs.rename(currentTaskAttemptPath, committedTaskPath)) { + throw new IOException( + "Could not rename " + currentTaskAttemptPath + " to " + committedTaskPath) + } + logInfo("Saved output of task '" + attemptId + "' to " + committedTaskPath) + } else { + this.mergePaths(fs, taskAttemptDirStatus, this.outputPath, context) + logInfo("Saved output of task '" + attemptId + "' to " + this.outputPath) + if (context.getConfiguration.getBoolean( + "mapreduce.fileoutputcommitter.task.cleanup.enabled", false)) { + logDebug(String.format( + "Deleting the temporary directory of '%s': '%s'", attemptId, currentTaskAttemptPath)) + if (!fs.delete(currentTaskAttemptPath, true)) { + logWarning("Could not delete " + currentTaskAttemptPath) + } + } + } + } else { + logWarning("No Output found for " + attemptId) + } + } else { + logWarning("Output Path is null in commitTask()") + } + } + + @throws[IOException] + override def abortTask(context: TaskAttemptContext): Unit = { + this.abortTask(context, null.asInstanceOf[Path]) + } + + @throws[IOException] + def abortTask(context: TaskAttemptContext, taskAttemptPath: Path): Unit = { + var currentTaskAttemptPath = taskAttemptPath + if (this.hasOutputPath) { + context.progress() + if (currentTaskAttemptPath == null) { + currentTaskAttemptPath = this.getTaskAttemptPath(context) + } + val fs: FileSystem = currentTaskAttemptPath.getFileSystem(context.getConfiguration) + if (!fs.delete(currentTaskAttemptPath, true)) { + logWarning("Could not delete " + currentTaskAttemptPath) + } + } else { + logWarning("Output Path is null in abortTask()") + } + } + + @throws[IOException] + override def needsTaskCommit(context: TaskAttemptContext): Boolean = { + this.needsTaskCommit(context, null.asInstanceOf[Path]) + } + + @throws[IOException] + def needsTaskCommit(context: TaskAttemptContext, taskAttemptPath: Path): Boolean = { + var currentTaskAttemptPath = taskAttemptPath + if (this.hasOutputPath) { + if (currentTaskAttemptPath == null) { + currentTaskAttemptPath = this.getTaskAttemptPath(context) + } + val fs: FileSystem = currentTaskAttemptPath.getFileSystem(context.getConfiguration) + fs.exists(currentTaskAttemptPath) + } else { + false + } + } + + @deprecated override def isRecoverySupported: Boolean = { + true + } + + @throws[IOException] + override def isCommitJobRepeatable(context: JobContext): Boolean = { + this.algorithmVersion == 2 + } + + @throws[IOException] + override def recoverTask(context: TaskAttemptContext): Unit = { + if (this.hasOutputPath) { + context.progress() + val attemptId: TaskAttemptID = context.getTaskAttemptID + val previousAttempt: Int = getAppAttemptId(context) - 1 + if (previousAttempt < 0) { + throw new IOException("Cannot recover task output for first attempt...") + } + val previousCommittedTaskPath: Path = this.getCommittedTaskPath(previousAttempt, context) + val fs: FileSystem = previousCommittedTaskPath.getFileSystem(context.getConfiguration) + if (log.isDebugEnabled) { + logDebug("Trying to recover task from " + previousCommittedTaskPath) + } + if (this.algorithmVersion == 1) { + if (fs.exists(previousCommittedTaskPath)) { + val committedTaskPath: Path = this.getCommittedTaskPath(context) + if (!fs.delete(committedTaskPath, true) && fs.exists(committedTaskPath)) { + throw new IOException("Could not delete " + committedTaskPath) + } + val committedParent: Path = committedTaskPath.getParent + fs.mkdirs(committedParent) + if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { + throw new IOException( + "Could not rename " + previousCommittedTaskPath + " to " + committedTaskPath) + } + } else { + logWarning(attemptId + " had no output to recover.") + } + } else { + try { + val from: FileStatus = fs.getFileStatus(previousCommittedTaskPath) + logInfo("Recovering task for upgrading scenario, moving files from " + + previousCommittedTaskPath + " to " + this.outputPath) + this.mergePaths(fs, from, this.outputPath, context) + } catch { + case var8: FileNotFoundException => + + } + logInfo("Done recovering task " + attemptId) + } + } else { + logWarning("Output Path is null in recoverTask()") + } + } + + override def getWorkPath: Path = workPath + + override def toString: String = { + val sb: StringBuilder = new StringBuilder("FileOutputCommitter{") + sb.append(super.toString).append("; ") + sb.append("outputPath=").append(this.outputPath) + sb.append(", workPath=").append(this.workPath) + sb.append(", algorithmVersion=").append(this.algorithmVersion) + sb.append(", skipCleanup=").append(this.skipCleanup) + sb.append(", ignoreCleanupFailures=").append(this.ignoreCleanupFailures) + sb.append('}') + sb.toString + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index b553e6ed566b5..d7c2f44b79506 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1116,6 +1116,6 @@ class RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLoca } private def isSparkStagingDir(path: Path): Boolean = { - path.toString.contains(".spark-staging-") + path.toString.contains(".spark-staging_") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/StagingInsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/StagingInsertSuite.scala new file mode 100644 index 0000000000000..42b047229bb46 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/StagingInsertSuite.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.{SQLHadoopMapReduceCommitProtocol, SQLPathHadoopMapReduceCommitProtocol} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +class StagingInsertSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + val stagingParentDir = Utils.createTempDir() + + val stagingDir = stagingParentDir.getCanonicalPath + "/.spark-stagingDir" + + override def beforeAll(): Unit = { + super.beforeAll() + stagingParentDir.delete() + } + + override def afterAll(): Unit = { + try { + Utils.deleteRecursively(stagingParentDir) + } finally { + super.afterAll() + } + } + + test("SPDI-25701: Assert staging dir work") { + withSQLConf(SQLConf.EXEC_STAGING_DIR.key -> stagingDir) { + val commitProtocol = new SQLHadoopMapReduceCommitProtocol("job-id", "dummy-path", true) + assert(commitProtocol.stagingDir.toString.contains(stagingDir)) + } + } + + test("SPARK-36579: dynamic partition overwrite can use user defined staging dir") { + // Partition Insert + Seq("static", "dynamic").foreach { mode => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> mode, + SQLConf.EXEC_STAGING_DIR.key -> stagingDir) { + withTempDir { d => + withTable("t") { + sql( + s""" + |CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + |LOCATION '${d.getAbsolutePath}' + """.stripMargin) + + val df = Seq((1, 2), (3, 4), (5, 6)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) + } + } + } + } + } + + test("SPARK-36571: Add a new commit protocol - None-partitioned insert") { + Seq("1", "2").foreach { version => + withSQLConf(SQLConf.EXEC_STAGING_DIR.key -> stagingDir, + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version" -> version, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLPathHadoopMapReduceCommitProtocol].getName) { + withTable("t") { + sql( + s""" + | CREATE TABLE t(c1 int, p1 int) using PARQUET + """.stripMargin) + + val df = + Seq((1, 2), (1, 2), (1, 2), (1, 2), (1, 2), (1, 2), (1, 2)) + .toDF("c1", "p1").repartition(1) + df.write + .mode("overwrite") + .format("parquet") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + } + } + } + } + + test("SPARK-36571: Add a new commit protocol - Static partition insert") { + Seq("1", "2").foreach { version => + withSQLConf(SQLConf.EXEC_STAGING_DIR.key -> stagingDir, + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version" -> version, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLPathHadoopMapReduceCommitProtocol].getName) { + withTempDir { d => + withTable("t") { + sql( + s""" + | CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + | LOCATION '${d.getAbsolutePath}' + """.stripMargin) + + val df = + Seq((1, 2), (1, 2), (1, 2), (1, 2), (1, 2), (1, 2), (1, 2)) + .toDF("c1", "p1").repartition(1) + df.createOrReplaceTempView("view1") + sql("INSERT OVERWRITE t PARTITION(p1=2) SELECT c1 FROM view1") + checkAnswer(sql("SELECT * FROM t WHERE p1=2"), df) + } + } + } + } + } + + test("SPARK-36571: Add a new commit protocol - Dynamic partition insert") { + Seq("1", "2").foreach { version => + withSQLConf(SQLConf.EXEC_STAGING_DIR.key -> stagingDir, + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version" -> version, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLPathHadoopMapReduceCommitProtocol].getName) { + withTable("t") { + sql( + s""" + | CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + """.stripMargin) + + val df = Seq((1, 2), (3, 4), (5, 6), (7, 8)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + checkAnswer(sql("SELECT * FROM t WHERE p1 > 5"), Row(5, 6) :: Row(7, 8) :: Nil) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) + } + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 7f885729bd2be..c3e9deacd7e80 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} +import java.util.Locale import scala.util.control.NonFatal @@ -135,7 +134,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { oldVersionExternalTempPath(path, hadoopConf, scratchDir) } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) + val externalTempPath = FileCommitProtocol.externalTempPath( + path, hadoopConf, stagingDir, "hive", TaskRunner.getTaskRunnerID.toString) + createdTempDir = Some(externalTempPath.getParent) + externalTempPath } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) } @@ -165,7 +167,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { hadoopConf: Configuration, scratchDir: String): Path = { val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) + val scratchPath = new Path(scratchDir, FileCommitProtocol.executionId("hive")) var dirPath = new Path( extURI.getScheme, extURI.getAuthority, @@ -186,90 +188,5 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { } dirPath } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathName: String = inputPath.toString - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError( - s"'${dir.toString}': ${e.getMessage}", e) - } - dir - } - - // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir(). - private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { - val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR - val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR - path1.startsWith(path2) - } - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 177c227595162..1dfeedcd9a18b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -22,12 +22,13 @@ import java.util.Locale import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.exec.TaskRunner import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -541,25 +542,29 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter val conf = spark.sessionState.newHadoopConf() val inputPath = new Path("/tmp/b/c") var stagingDir = "tmp/b" - val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null) - val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir")) - var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + val id = TaskRunner.getTaskRunnerID + var path = FileCommitProtocol. + getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b_hive_") != -1) stagingDir = "tmp/b/c" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir( + inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = "d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir( + inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = ".d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir( + inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1) stagingDir = "/tmp/c/" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir( + inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/c_hive_") != -1) }