Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2031f5b
[SPARK-36579][SQL] Make spark source stagingDir can use user defined
AngersZhuuuu Aug 25, 2021
c29f55e
Update SQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Aug 25, 2021
2604c9f
Update
AngersZhuuuu Aug 25, 2021
71f6b17
fix ut
AngersZhuuuu Aug 25, 2021
1947cbf
Merge branch 'master' into SPARK-36579
AngersZhuuuu Oct 12, 2021
30113d2
Update SaveAsHiveFile.scala
AngersZhuuuu Oct 12, 2021
6f405dc
update
AngersZhuuuu Oct 12, 2021
361263b
update
AngersZhuuuu Oct 12, 2021
9ee6ee5
update
AngersZhuuuu Oct 12, 2021
7773fb2
update
AngersZhuuuu Oct 12, 2021
a3b3c51
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 12, 2021
b4d60e4
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 12, 2021
2c41808
Update CommitterBindingSuite.scala
AngersZhuuuu Oct 13, 2021
5926822
update
AngersZhuuuu Oct 13, 2021
6cdee58
update
AngersZhuuuu Oct 13, 2021
824ec04
Update FileCommitProtocol.scala
AngersZhuuuu Oct 13, 2021
8c8a174
update
AngersZhuuuu Oct 13, 2021
8d7ce6e
update
AngersZhuuuu Oct 13, 2021
da6a0b9
update
AngersZhuuuu Oct 13, 2021
63e466c
update
AngersZhuuuu Oct 13, 2021
fd2ac5f
update
AngersZhuuuu Oct 13, 2021
e2951f1
fix UT
AngersZhuuuu Oct 14, 2021
c8d0c33
complicated
AngersZhuuuu Oct 14, 2021
eafb8dd
update
AngersZhuuuu Oct 14, 2021
ff2bfb8
revert API change
AngersZhuuuu Oct 14, 2021
c5a1d16
update
AngersZhuuuu Oct 14, 2021
8b93bad
Update package.scala
AngersZhuuuu Oct 19, 2021
a0e0ec9
follow comment
AngersZhuuuu Oct 20, 2021
872cd01
update
AngersZhuuuu Oct 20, 2021
d664932
update
AngersZhuuuu Oct 20, 2021
cf0cea8
Merge branch 'master' into SPARK-36579
AngersZhuuuu Oct 20, 2021
997ba25
Update InsertSuite.scala
AngersZhuuuu Oct 20, 2021
9e4a5a8
Merge branch 'SPARK-36579' of https://github.com/AngersZhuuuu/spark i…
AngersZhuuuu Oct 20, 2021
44d63de
Update SQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Oct 20, 2021
ed58884
Update SQLConf.scala
AngersZhuuuu Oct 21, 2021
632d725
Update SQLConf.scala
AngersZhuuuu Oct 23, 2021
10e029f
Merge branch 'master' into SPARK-36579
AngersZhuuuu Jan 11, 2022
db66beb
Merge branch 'master' into SPARK-36579
AngersZhuuuu Feb 22, 2022
b07e816
Merge branch 'master' into SPARK-36579
AngersZhuuuu Mar 10, 2022
cc8a4c3
Update SQLConf.scala
AngersZhuuuu Mar 10, 2022
1c5a70a
Update FileCommitProtocol.scala
AngersZhuuuu Mar 10, 2022
1e8424e
Update SQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Mar 10, 2022
3da83f1
Update SQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Mar 10, 2022
0a729f0
update
AngersZhuuuu Mar 14, 2022
9392bdb
update
AngersZhuuuu Mar 14, 2022
1178d1b
update
AngersZhuuuu Mar 14, 2022
9dba590
update
AngersZhuuuu Mar 14, 2022
b998ec9
Update InsertSuite.scala
AngersZhuuuu Mar 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.internal.io

import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._

Expand Down Expand Up @@ -50,6 +55,22 @@ import org.apache.spark.util.Utils
abstract class FileCommitProtocol extends Logging {
import FileCommitProtocol._

/**
* Get the final directory where the result data will be placed once the job
* is committed. This may be null, in which case, there is no output
* path to write data to and won't write any data.
*/
def getOutputPath: Path = null

/**
* Get the directory that the task should write results into.
* Warning: there's no guarantee that this work path is on the same
* FS as the final output, or that it's visible across machines.
* May be null, in which case, there is no output path to write data to
* and won't write any data.
*/
def getWorkPath: Path = null

/**
* Setups up a job. Must be called on the driver before any other methods can be invoked.
*/
Expand Down Expand Up @@ -195,6 +216,9 @@ abstract class FileCommitProtocol extends Logging {


object FileCommitProtocol extends Logging {
val USING_SPARK_COMMIT_METHOD = "spark"
val USING_HIVE_COMMIT_METHOD = "hive"

class TaskCommitMessage(val obj: Any) extends Serializable

object EmptyTaskCommitMessage extends TaskCommitMessage(null)
Expand Down Expand Up @@ -232,6 +256,109 @@ object FileCommitProtocol extends Logging {
def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}

def externalTempPath(
path: Path,
hadoopConf: Configuration,
stagingDir: String,
engineType: String,
jobId: String): Path = {
val extURI = path.toUri
if (extURI.getScheme == "viewfs") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always a bit brittle using the uri scheme. any way to avoid?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always a bit brittle using the uri scheme. any way to avoid?

To be honest, I don't know why we need to handle viewfs separately and after searching origin pull request of this part, don't see the discussion, so I am not sure about this. And this part is just moved to here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

Yea, I remembered that hive will check tempLocation and targetOutputPath's FS and EZ when call HiveMetastore loadTable/loadPartition, Should move that logical to here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

How about current?

new Path(getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir, engineType, jobId),
"-ext-10000")
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir, engineType, jobId),
"-ext-10000")
}
}

private def getExtTmpPathRelTo(
path: Path,
hadoopConf: Configuration,
stagingDir: String,
commitMethod: String,
jobId: String): Path = {
getStagingDir(path, hadoopConf, stagingDir, commitMethod, jobId)
}

private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,
stagingDir: String,
commitMethod: String,
jobId: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
hadoopConf,
stagingDir,
commitMethod,
jobId)
}

def getStagingDir(
inputPath: Path,
hadoopConf: Configuration,
stagingDir: String,
commitMethod: String,
jobId: String): Path = {
val inputPathName: String = inputPath.toString
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}

// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, s".$commitMethod-staging").toString
}

if (commitMethod == USING_SPARK_COMMIT_METHOD) {
val stagingFS = new Path(stagingPathName).getFileSystem(hadoopConf)
// SPARK-36579: Current SQLHadoopMapReduceCommitProtocol's dynamic partition overwriting uses
// rename operation to move partition's directories. This operation is not supported between
// different FileSystems.
if (equalsFileSystem(fs, stagingFS)) {
logDebug(s"The staging dir '$stagingPathName' should be in a same filesystem " +
s"with table location if we set `spark.sql.exec.stagingDir` under the table " +
"directory.")
stagingPathName = new Path(inputPathName, s".$commitMethod-staging").toString
}
}
val dir = commitMethod match {
case USING_SPARK_COMMIT_METHOD =>
new Path(stagingPathName + "-" + jobId)
case USING_HIVE_COMMIT_METHOD =>
fs.makeQualified(new Path(stagingPathName + "_" + executionId(commitMethod) + "-" + jobId))
}
logDebug(s"Created staging dir = $dir for path = $inputPath")
dir
}

private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
path1.startsWith(path2)
}

def executionId(engineType: String): String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
s"${engineType}_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}

def equalsFileSystem(fs1: FileSystem, fs2: FileSystem): Boolean = {
fs1.getUri == fs2.getUri
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class HadoopMapReduceCommitProtocol(
format.getOutputCommitter(context)
}

override def getOutputPath: Path = {
if (dynamicPartitionOverwrite) {
stagingDir
} else {
new Path(path)
}
}

override def getWorkPath: Path = getOutputPath

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
newTaskTempFile(taskContext, dir, FileNameSpec("", ext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3554,6 +3554,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val EXEC_STAGING_DIR = buildConf("spark.sql.exec.stagingDir")
.doc("The staging directory of Spark job. Spark uses it to deal with files with " +
"absolute output path, or writing data into partitioned directory " +
"when dynamic partition overwrite mode is on. " +
"Default value means staging directory is under table path.")
.version("3.3.0")
.internal()
.stringConf
.checkValue(!_.isEmpty, "Should not pass an empty string as staging directory.")
.createWithDefault(".spark-staging")
Comment thread
AngersZhuuuu marked this conversation as resolved.

val LEGACY_USE_V1_COMMAND =
buildConf("spark.sql.legacy.useV1Command")
.internal()
Expand Down Expand Up @@ -4292,6 +4303,8 @@ class SQLConf extends Serializable with Logging {

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

def stagingDir: String = getConf(SQLConf.EXEC_STAGING_DIR)

def parquetFieldIdReadEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)

def parquetFieldIdWriteEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,14 @@ case class InsertIntoHadoopFsRelationCommand(
}
}

// For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files
// will be renamed from staging path to final output path during commit job
val committerOutputPath = if (dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
qualifiedOutputPath
}

val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
plan = child,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
committerOutputPath.toString, customPartitionLocations, outputColumns),
committer.getOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -36,6 +37,10 @@ class SQLHadoopMapReduceCommitProtocol(
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

@transient override lazy val stagingDir: Path =
FileCommitProtocol.externalTempPath(new Path(path), SparkHadoopUtil.get.conf,
SQLConf.get.stagingDir, FileCommitProtocol.USING_SPARK_COMMIT_METHOD, jobId)

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = super.setupCommitter(context)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import org.apache.spark.SparkConf
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils


abstract class CommitProtocolStagingDirBaseSuite extends QueryTest with SharedSparkSession {
import testImplicits._

def stagingDir: String
def cleanStagingDir(): Unit

override def sparkConf: SparkConf =
super.sparkConf.set(SQLConf.EXEC_STAGING_DIR, stagingDir)

override def beforeAll(): Unit = {
super.beforeAll()
cleanStagingDir()
}

override def afterAll(): Unit = {
try {
cleanStagingDir()
} finally {
super.afterAll()
}
}

test("SPARK-36579: dynamic partition overwrite can use user defined staging dir") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
withTempDir { d =>
withTable("t") {
sql(
s"""
|CREATE TABLE t(c1 int, p1 int) USING PARQUET PARTITIONED BY(p1)
|LOCATION '${d.getAbsolutePath}'
""".stripMargin)

val df = Seq((1, 2), (3, 4)).toDF("c1", "p1")
df.write
.partitionBy("p1")
.mode("overwrite")
.saveAsTable("t")
checkAnswer(sql("SELECT * FROM t"), df)
checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil)
checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil)
}
}
}
}
}

class LocalStagingDirSuite extends CommitProtocolStagingDirBaseSuite {
val stagingDirFile = Utils.createTempDir()
override val stagingDir = stagingDirFile.getAbsolutePath

override def cleanStagingDir(): Unit = {
Utils.deleteRecursively(stagingDirFile)
}
}
Loading