From e1ec69f4023781731577e268f287c409abc39e29 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 23 Aug 2021 20:48:33 +0800 Subject: [PATCH 1/4] [SPARK-36563][SQL] dynamicPartitionOverwrite can direct rename to targetPath instead of partition path one by one when targetPath is empty --- .../io/HadoopMapReduceCommitProtocol.scala | 46 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 9 ++++ .../SQLHadoopMapReduceCommitProtocol.scala | 4 ++ .../sql/sources/PartitionedWriteSuite.scala | 27 +++++++++++ 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c061d617fce4b..99b9fa7582540 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -203,26 +203,36 @@ class HadoopMapReduceCommitProtocol( } if (dynamicPartitionOverwrite) { - val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - val stagingPartPath = new Path(stagingDir, part) - if (!fs.rename(stagingPartPath, finalPartPath)) { - throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + + val targetPath = new Path(path) + val pathExisted = fs.exists(targetPath) + if (!pathExisted || fs.listStatus(targetPath).isEmpty) { + if ((!pathExisted || (pathExisted && fs.delete(targetPath, true))) && + !fs.rename(stagingDir, targetPath)) { + throw new IOException(s"Failed to rename $stagingDir to $targetPath when " + s"committing files staged for overwriting dynamic partitions") } + } else { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") + for (part <- partitionPaths) { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of + // finalPartPath also does not exist(e.g. the scenario described on SPARK-23815), + // because FileSystem API spec on rename op says the rename dest(finalPartPath) must + // have a parent that exists, otherwise we may get unexpected result on the rename. + fs.mkdirs(finalPartPath.getParent) + } + val stagingPartPath = new Path(stagingDir, part) + if (!fs.rename(stagingPartPath, finalPartPath)) { + throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + + s"committing files staged for overwriting dynamic partitions") + } + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c6d9558d53eda..2230833e39aba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1224,6 +1224,13 @@ object SQLConf { .stringConf .createOptional + val FILE_STAGING_DIR = + buildConf("spark.sql.source.stagingDir") + .version("3.3.0") + .internal() + .stringConf + .createWithDefault(".spark-staging") + val FILE_COMMIT_PROTOCOL_CLASS = buildConf("spark.sql.sources.commitProtocolClass") .version("2.1.1") @@ -3824,6 +3831,8 @@ class SQLConf extends Serializable with Logging { def partitionColumnTypeInferenceEnabled: Boolean = getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + def fileStagingDir: String = getConf(SQLConf.FILE_STAGING_DIR) + def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) def parallelPartitionDiscoveryThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f091..abfeb00b44c0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -36,6 +36,10 @@ class SQLHadoopMapReduceCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) with Serializable with Logging { + override def stagingDir: Path = { + new Path(path, SQLConf.get.fileStagingDir + "-" + jobId) + } + override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = super.setupCommitter(context) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index b9266429f81a5..51868f5c07a6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -189,6 +189,33 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + + test("dynamic partition overwrite can rename table path when reasonable") { + withTempDir { stagingDir => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_STAGING_DIR.key -> stagingDir.getAbsolutePath) { + withTempDir { d => + withTable("t") { + sql( + s""" + | create table t(c1 int, p1 int) using parquet partitioned by (p1) + | location '${d.getAbsolutePath}' + """.stripMargin) + + val df = Seq((1, 2), (3, 4)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("SELECT * FROM t"), df) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil) + checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) + } + } + } + } + } } /** From 2f36b1f55305b0c15b67697db6fbfdf8cc331b40 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 23 Aug 2021 20:49:23 +0800 Subject: [PATCH 2/4] Update PartitionedWriteSuite.scala --- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 51868f5c07a6b..8ccc129880633 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -190,7 +190,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } - test("dynamic partition overwrite can rename table path when reasonable") { + test("SPARK-36563: dynamic partition overwrite can rename table path when reasonable") { withTempDir { stagingDir => withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> SQLConf.PartitionOverwriteMode.DYNAMIC.toString, From 1ed15ae63a480c2aea8f80bcedfe06257229b6d6 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 23 Aug 2021 21:02:56 +0800 Subject: [PATCH 3/4] Update SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2230833e39aba..a6947eea57f32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1226,6 +1226,9 @@ object SQLConf { val FILE_STAGING_DIR = buildConf("spark.sql.source.stagingDir") + .doc("The staging directory of Spark job. Spark uses it to deal with files with " + + "absolute output path, or writing data into partitioned directory when " + + "dynamic partition overwrite mode.") .version("3.3.0") .internal() .stringConf From 892b557875158c6e127de081cff98d06b18e14bf Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 25 Aug 2021 10:25:50 +0800 Subject: [PATCH 4/4] update --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 +- .../apache/spark/sql/sources/PartitionedWriteSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 99b9fa7582540..d306515b11d12 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -207,7 +207,7 @@ class HadoopMapReduceCommitProtocol( val pathExisted = fs.exists(targetPath) if (!pathExisted || fs.listStatus(targetPath).isEmpty) { if ((!pathExisted || (pathExisted && fs.delete(targetPath, true))) && - !fs.rename(stagingDir, targetPath)) { + !fs.rename(stagingDir, targetPath)) { throw new IOException(s"Failed to rename $stagingDir to $targetPath when " + s"committing files staged for overwriting dynamic partitions") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 8ccc129880633..49a8a272d0a4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -199,9 +199,9 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { withTable("t") { sql( s""" - | create table t(c1 int, p1 int) using parquet partitioned by (p1) - | location '${d.getAbsolutePath}' - """.stripMargin) + | CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1) + | LOCATION '${d.getAbsolutePath}' + """.stripMargin) val df = Seq((1, 2), (3, 4)).toDF("c1", "p1") df.write