diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c061d617fce4b..d306515b11d12 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -203,26 +203,36 @@ class HadoopMapReduceCommitProtocol( } if (dynamicPartitionOverwrite) { - val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - val stagingPartPath = new Path(stagingDir, part) - if (!fs.rename(stagingPartPath, finalPartPath)) { - throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + + val targetPath = new Path(path) + val pathExisted = fs.exists(targetPath) + if (!pathExisted || fs.listStatus(targetPath).isEmpty) { + if ((!pathExisted || (pathExisted && fs.delete(targetPath, true))) && + !fs.rename(stagingDir, targetPath)) { + throw new IOException(s"Failed to rename $stagingDir to $targetPath when " + s"committing files staged for overwriting dynamic partitions") } + } else { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") + for (part <- partitionPaths) { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of + // finalPartPath also does not exist(e.g. the scenario described on SPARK-23815), + // because FileSystem API spec on rename op says the rename dest(finalPartPath) must + // have a parent that exists, otherwise we may get unexpected result on the rename. + fs.mkdirs(finalPartPath.getParent) + } + val stagingPartPath = new Path(stagingDir, part) + if (!fs.rename(stagingPartPath, finalPartPath)) { + throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + + s"committing files staged for overwriting dynamic partitions") + } + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c6d9558d53eda..a6947eea57f32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1224,6 +1224,16 @@ object SQLConf { .stringConf .createOptional + val FILE_STAGING_DIR = + buildConf("spark.sql.source.stagingDir") + .doc("The staging directory of Spark job. Spark uses it to deal with files with " + + "absolute output path, or writing data into partitioned directory when " + + "dynamic partition overwrite mode.") + .version("3.3.0") + .internal() + .stringConf + .createWithDefault(".spark-staging") + val FILE_COMMIT_PROTOCOL_CLASS = buildConf("spark.sql.sources.commitProtocolClass") .version("2.1.1") @@ -3824,6 +3834,8 @@ class SQLConf extends Serializable with Logging { def partitionColumnTypeInferenceEnabled: Boolean = getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + def fileStagingDir: String = getConf(SQLConf.FILE_STAGING_DIR) + def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) def parallelPartitionDiscoveryThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f091..abfeb00b44c0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -36,6 +36,10 @@ class SQLHadoopMapReduceCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) with Serializable with Logging { + override def stagingDir: Path = { + new Path(path, SQLConf.get.fileStagingDir + "-" + jobId) + } + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = super.setupCommitter(context) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index b9266429f81a5..49a8a272d0a4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -189,6 +189,33 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-36563: dynamic partition overwrite can rename table path when reasonable") { + withTempDir { stagingDir => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_STAGING_DIR.key -> stagingDir.getAbsolutePath) { + withTempDir { d => + withTable("t") { + sql( + s""" + | CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + | LOCATION '${d.getAbsolutePath}' + """.stripMargin) + + val df = Seq((1, 2), (3, 4)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) + } + } + } + } + } } /**