Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,26 +203,36 @@ class HadoopMapReduceCommitProtocol(
}

if (dynamicPartitionOverwrite) {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
// According to the official hadoop FileSystem API spec, delete op should assume
// the destination is no longer present regardless of return value, thus we do not
// need to double check if finalPartPath exists before rename.
// Also in our case, based on the spec, delete returns false only when finalPartPath
// does not exist. When this happens, we need to take action if parent of finalPartPath
// also does not exist(e.g. the scenario described on SPARK-23815), because
// FileSystem API spec on rename op says the rename dest(finalPartPath) must have
// a parent that exists, otherwise we may get unexpected result on the rename.
fs.mkdirs(finalPartPath.getParent)
}
val stagingPartPath = new Path(stagingDir, part)
if (!fs.rename(stagingPartPath, finalPartPath)) {
throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " +
val targetPath = new Path(path)
val pathExisted = fs.exists(targetPath)
if (!pathExisted || fs.listStatus(targetPath).isEmpty) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test coverage for both fs.exist and fs.listStatus?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have

if ((!pathExisted || (pathExisted && fs.delete(targetPath, true))) &&
!fs.rename(stagingDir, targetPath)) {
throw new IOException(s"Failed to rename $stagingDir to $targetPath when " +
s"committing files staged for overwriting dynamic partitions")
}
} else {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
// According to the official hadoop FileSystem API spec, delete op should assume
// the destination is no longer present regardless of return value, thus we do not
// need to double check if finalPartPath exists before rename.
// Also in our case, based on the spec, delete returns false only when finalPartPath
// does not exist. When this happens, we need to take action if parent of
// finalPartPath also does not exist(e.g. the scenario described on SPARK-23815),
// because FileSystem API spec on rename op says the rename dest(finalPartPath) must
// have a parent that exists, otherwise we may get unexpected result on the rename.
fs.mkdirs(finalPartPath.getParent)
}
val stagingPartPath = new Path(stagingDir, part)
if (!fs.rename(stagingPartPath, finalPartPath)) {
throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " +
s"committing files staged for overwriting dynamic partitions")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,16 @@ object SQLConf {
.stringConf
.createOptional

val FILE_STAGING_DIR =
buildConf("spark.sql.source.stagingDir")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks orthogonal what your PR aims. Could you spin off this new configuration as a new PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do this first.

.doc("The staging directory of Spark job. Spark uses it to deal with files with " +
"absolute output path, or writing data into partitioned directory when " +
"dynamic partition overwrite mode.")
.version("3.3.0")
.internal()
.stringConf
.createWithDefault(".spark-staging")

val FILE_COMMIT_PROTOCOL_CLASS =
buildConf("spark.sql.sources.commitProtocolClass")
.version("2.1.1")
Expand Down Expand Up @@ -3824,6 +3834,8 @@ class SQLConf extends Serializable with Logging {
def partitionColumnTypeInferenceEnabled: Boolean =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)

def fileStagingDir: String = getConf(SQLConf.FILE_STAGING_DIR)

def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)

def parallelPartitionDiscoveryThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class SQLHadoopMapReduceCommitProtocol(
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

override def stagingDir: Path = {
new Path(path, SQLConf.get.fileStagingDir + "-" + jobId)
}

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = super.setupCommitter(context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-36563: dynamic partition overwrite can rename table path when reasonable") {
withTempDir { stagingDir =>
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString,
SQLConf.FILE_STAGING_DIR.key -> stagingDir.getAbsolutePath) {
withTempDir { d =>
withTable("t") {
sql(
s"""
| CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1)
| LOCATION '${d.getAbsolutePath}'
""".stripMargin)

val df = Seq((1, 2), (3, 4)).toDF("c1", "p1")
df.write
.partitionBy("p1")
.mode("overwrite")
.saveAsTable("t")
checkAnswer(sql("SELECT * FROM t"), df)
checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil)
checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test case fail before your PR? IIUC, this doesn't provide a test coverage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test case fail before your PR? IIUC, this doesn't provide a test coverage.

Just to show in new way, it can write data as before?

}
}
}
}
}
}

/**
Expand Down