diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e2a96267082b..43cff753a44a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,11 @@ package org.apache.spark.internal.io +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ @@ -50,6 +55,22 @@ import org.apache.spark.util.Utils abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ + /** + * Get the final directory where the result data will be placed once the job + * is committed. This may be null, in which case, there is no output + * path to write data to and won't write any data. + */ + def getOutputPath: Path = null + + /** + * Get the directory that the task should write results into. + * Warning: there's no guarantee that this work path is on the same + * FS as the final output, or that it's visible across machines. + * May be null, in which case, there is no output path to write data to + * and won't write any data. + */ + def getWorkPath: Path = null + /** * Setups up a job. Must be called on the driver before any other methods can be invoked. */ @@ -195,6 +216,9 @@ abstract class FileCommitProtocol extends Logging { object FileCommitProtocol extends Logging { + val USING_SPARK_COMMIT_METHOD = "spark" + val USING_HIVE_COMMIT_METHOD = "hive" + class TaskCommitMessage(val obj: Any) extends Serializable object EmptyTaskCommitMessage extends TaskCommitMessage(null) @@ -232,6 +256,109 @@ object FileCommitProtocol extends Logging { def getStagingDir(path: String, jobId: String): Path = { new Path(path, ".spark-staging-" + jobId) } + + def externalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String, + engineType: String, + jobId: String): Path = { + val extURI = path.toUri + if (extURI.getScheme == "viewfs") { + new Path(getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir, engineType, jobId), + "-ext-10000") + } else { + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir, engineType, jobId), + "-ext-10000") + } + } + + private def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String, + commitMethod: String, + jobId: String): Path = { + getStagingDir(path, hadoopConf, stagingDir, commitMethod, jobId) + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String, + commitMethod: String, + jobId: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir, + commitMethod, + jobId) + } + + def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String, + commitMethod: String, + jobId: String): Path = { + val inputPathName: String = inputPath.toString + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, s".$commitMethod-staging").toString + } + + if (commitMethod == USING_SPARK_COMMIT_METHOD) { + val stagingFS = new Path(stagingPathName).getFileSystem(hadoopConf) + // SPARK-36579: Current SQLHadoopMapReduceCommitProtocol's dynamic partition overwriting uses + // rename operation to move partition's directories. This operation is not supported between + // different FileSystems. + if (equalsFileSystem(fs, stagingFS)) { + logDebug(s"The staging dir '$stagingPathName' should be in a same filesystem " + + s"with table location if we set `spark.sql.exec.stagingDir` under the table " + + "directory.") + stagingPathName = new Path(inputPathName, s".$commitMethod-staging").toString + } + } + val dir = commitMethod match { + case USING_SPARK_COMMIT_METHOD => + new Path(stagingPathName + "-" + jobId) + case USING_HIVE_COMMIT_METHOD => + fs.makeQualified(new Path(stagingPathName + "_" + executionId(commitMethod) + "-" + jobId)) + } + logDebug(s"Created staging dir = $dir for path = $inputPath") + dir + } + + private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { + val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR + val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR + path1.startsWith(path2) + } + + def executionId(engineType: String): String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + s"${engineType}_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } + + def equalsFileSystem(fs1: FileSystem, fs2: FileSystem): Boolean = { + fs1.getUri == fs2.getUri + } } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc2..9da91af77bd3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -116,6 +116,16 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } + override def getOutputPath: Path = { + if (dynamicPartitionOverwrite) { + stagingDir + } else { + new Path(path) + } + } + + override def getWorkPath: Path = getOutputPath + override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { newTaskTempFile(taskContext, dir, FileNameSpec("", ext)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a050156518c2..7f66b6986cee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3554,6 +3554,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val EXEC_STAGING_DIR = buildConf("spark.sql.exec.stagingDir") + .doc("The staging directory of Spark job. Spark uses it to deal with files with " + + "absolute output path, or writing data into partitioned directory " + + "when dynamic partition overwrite mode is on. " + + "Default value means staging directory is under table path.") + .version("3.3.0") + .internal() + .stringConf + .checkValue(!_.isEmpty, "Should not pass an empty string as staging directory.") + .createWithDefault(".spark-staging") + val LEGACY_USE_V1_COMMAND = buildConf("spark.sql.legacy.useV1Command") .internal() @@ -4292,6 +4303,8 @@ class SQLConf extends Serializable with Logging { def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT) + def stagingDir: String = getConf(SQLConf.EXEC_STAGING_DIR) + def parquetFieldIdReadEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) def parquetFieldIdWriteEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 267b360b474c..f2a8c2372b55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -162,15 +162,6 @@ case class InsertIntoHadoopFsRelationCommand( } } - // For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files - // will be renamed from staging path to final output path during commit job - val committerOutputPath = if (dynamicPartitionOverwrite) { - FileCommitProtocol.getStagingDir(outputPath.toString, jobId) - .makeQualified(fs.getUri, fs.getWorkingDirectory) - } else { - qualifiedOutputPath - } - val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -178,7 +169,7 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - committerOutputPath.toString, customPartitionLocations, outputColumns), + committer.getOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f09..134910dcd75c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -21,8 +21,9 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} import org.apache.spark.sql.internal.SQLConf /** @@ -36,6 +37,10 @@ class SQLHadoopMapReduceCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) with Serializable with Logging { + @transient override lazy val stagingDir: Path = + FileCommitProtocol.externalTempPath(new Path(path), SparkHadoopUtil.get.conf, + SQLConf.get.stagingDir, FileCommitProtocol.USING_SPARK_COMMIT_METHOD, jobId) + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = super.setupCommitter(context) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CommitProtocolStagingDirSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CommitProtocolStagingDirSuite.scala new file mode 100644 index 000000000000..6704bf322f45 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CommitProtocolStagingDirSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + + +abstract class CommitProtocolStagingDirBaseSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + def stagingDir: String + def cleanStagingDir(): Unit + + override def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.EXEC_STAGING_DIR, stagingDir) + + override def beforeAll(): Unit = { + super.beforeAll() + cleanStagingDir() + } + + override def afterAll(): Unit = { + try { + cleanStagingDir() + } finally { + super.afterAll() + } + } + + test("SPARK-36579: dynamic partition overwrite can use user defined staging dir") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString) { + withTempDir { d => + withTable("t") { + sql( + s""" + |CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + |LOCATION '${d.getAbsolutePath}' + """.stripMargin) + + val df = Seq((1, 2), (3, 4)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) + } + } + } + } +} + +class LocalStagingDirSuite extends CommitProtocolStagingDirBaseSuite { + val stagingDirFile = Utils.createTempDir() + override val stagingDir = stagingDirFile.getAbsolutePath + + override def cleanStagingDir(): Unit = { + Utils.deleteRecursively(stagingDirFile) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 7f885729bd2b..5900e03b08b8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} +import java.util.Locale import scala.util.control.NonFatal @@ -135,7 +134,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { oldVersionExternalTempPath(path, hadoopConf, scratchDir) } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) + val externalTempPath = FileCommitProtocol.externalTempPath(path, hadoopConf, stagingDir, + FileCommitProtocol.USING_HIVE_COMMIT_METHOD, TaskRunner.getTaskRunnerID.toString) + createdTempDir = Some(externalTempPath.getParent) + externalTempPath } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) } @@ -165,7 +167,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { hadoopConf: Configuration, scratchDir: String): Path = { val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) + val scratchPath = new Path(scratchDir, FileCommitProtocol.executionId("hive")) var dirPath = new Path( extURI.getScheme, extURI.getAuthority, @@ -186,90 +188,5 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { } dirPath } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathName: String = inputPath.toString - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError( - s"'${dir.toString}': ${e.getMessage}", e) - } - dir - } - - // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir(). - private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { - val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR - val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR - path1.startsWith(path2) - } - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 9e2938647523..517bf5fcd910 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -22,12 +22,13 @@ import java.util.Locale import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.exec.TaskRunner import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -541,25 +542,24 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter val conf = spark.sessionState.newHadoopConf() val inputPath = new Path("/tmp/b/c") var stagingDir = "tmp/b" - val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null) - val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir")) - var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + val id = TaskRunner.getTaskRunnerID + var path = FileCommitProtocol.getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b_hive_") != -1) stagingDir = "tmp/b/c" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = "d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = ".d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1) stagingDir = "/tmp/c/" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = FileCommitProtocol.getStagingDir(inputPath, conf, stagingDir, "hive", id.toString) assert(path.toString.indexOf("/tmp/c_hive_") != -1) }