From 536f2d67e9e1ef79444e466ec3484de7c9d134b5 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 22 Dec 2022 18:11:01 +0000 Subject: [PATCH] SPARK-41551. Dynamic/absolute path support in PathOutputCommitters Follow on to SPARK-40034. Dynamic partitioning though the PathOutputCommitProtocol needs to add the dirs to the superclass's partition list else the partition delete doesn't take place. Fix: * add an addPartition() method subclasses can use * add a getPartitions method to return an immutable copy of the list for testing. * add tests to verify all of this. Also fix newTaskTempFileAbsPath to return a path, irrespective of committer type. In dynamic mode, because the parent dir of an absolute path is deleted, there's a safety check to reject any requests for a file in a parent dir. This is something which could be pulled up to HadoopMapReduceCommitProtocol because it needs the same check, if the risk is considered realistic. The patch now downgrades from failing on dynamic partitioning if the committer doesn't declare it supports it to printing a warning. Why this? well, it *does* work through the s3a committers, it's just O(data). If someone does want to do INSERT OVERWRITE then they can be allowed to, just warned about it. The outcome will be correct except in the case of: "if the driver fails partway through dir rename, only some of the files will be there" Google GCS has that same non-atomic rename issue. But: even on an FS with atomic dir rename, any job which fails partway through the overwrite process is going to leave the fs in an inconsistent state, such as * some parts with new data, some parts not yet overwritten * a directory deleted and the new data not instantiated So it's not that much worse. The patch tries to update the protocol spec in HadoopMapReduceCommitProtocol to cover both newFileAbsPath() semantics/commit and failure modes of dynamic partition commit. Adds new test suite PathOutputPartitionedWriteSuite That suite doesn't validate the dynamic overwrite code, which can be demonstrated by commenting out addPartitionedDir() in PathOutputCommitProtocol.newTaskTempFile(). The updated CommitterBindingSuite will fail if that line is commented out, so it is verifying that the base committer is given the list of updated directories --- .../io/HadoopMapReduceCommitProtocol.scala | 118 ++++++- docs/cloud-integration.md | 23 +- .../io/cloud/PathOutputCommitProtocol.scala | 96 ++++-- .../io/cloud/CommitterBindingSuite.scala | 293 ++++++++++++++---- .../PathOutputPartitionedWriteSuite.scala | 42 +++ 5 files changed, 484 insertions(+), 88 deletions(-) create mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc24..506039d0cf9cc 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -38,6 +38,60 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * * Unlike Hadoop's OutputCommitter, this implementation is serializable. * + * == Absolute Path Support == + * One feature of this committer is that is that tasks have + * the ability to request a temporary file in a task attempt which will + * be present in an absolute path in the destination filesystem after + * a successful job commit (and not before). + * + * This implemented as follows. + * 1. [[newTaskTempFileAbsPath()]] takes a final destination directory + * for the target file along with prefix and extension + * 2. A unique filename is generated in the staging directory. + * 3. This path as well as the ultimate path is recorded in the + * transient map [[addedAbsPathFiles]]. + * 4. In task commit, the contents of this map is returned in + * the TaskCommitMessage sent to the driver. As such messages + * are never implicitly received from failed tasks, the + * driver will build a map containing exclusively + * the files generated by successfully committed task attempts. + * Note: files written by failed task attempts may be visible + * in the staging directory, but they will not be renamed and + * are deleted in job abort and cleanup. + * 5. After the underlying FileOutputCommitter/PathOutputCommitter + * job commit, the map of absolute paths is iterated through. + * 6. If (dynamicPartitionOverwrite) is true, all parent directories + * of the destination files are deleted, then recreated + * 7. The files are renamed one by one from the staging directory + * to their absolute destination paths. + * There is an assumption that file and directory operations + * are fast, so there is no parallelization of (6) or (7). + * There is no requirement for atomic file or directory rename. + * + * If the absolute path stage of job commit fails partway through the + * operation the state of the filesystem is undefined + * -directories outside the job destination path may have been deleted, + * recreated and files may have be copied. + * Job cleanup through [[abortJob()]] does not examine or modify those locations. + * + * == Concurrent jobs to the same destination path == + * + * Non-dynamic jobs to the same output paths are unlikely to support + * any form of concurrent job execution; it depends on the underlying + * committer. + * + * Jobs with dynamic partition overwrite always initially write their + * work to a staging subdirectory. Provided the jobId used to create + * the committer is unique, different staging directories will be used + * by different jobs. Accordingly, the entire job will be concurrent + * until the final stage of deleting and recreating updated partitions + * and absolute path directories. + * If concurrent jobs update different partitions of equal depth in + * the directory tree, then, as only those partitions are updated, + * the final table should contain the independent output of both tasks. + * + * If a duplicate jobId is used then the staging directory is shared; + * the final output is highly likely to be corrupted. * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime @@ -58,11 +112,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * 2. When [[FileOutputCommitter]] algorithm version set to 2, * committing tasks directly move task attempt output files to * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. + * 3. When a different `PathOutputCommitter` is used to commit + * work, it is an implicit requirement that after its + * commitJob() call succeeds, the generated file is in the + * appropriate location under .spark-staging-{jobId}. * * At the end of committing job, we move output files from * intermediate path to final path, e.g., move files from * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1 * to /path/to/outputPath/a=1/b=1 + * This done by + * 1. A delete of that destination directory. + * 2. A rename of the directory under .spark-staging to that + * location. + * There is no requirement for atomic directory operations at + * this point. + * However, fast and O(1) operations are often expected by users. + * These expectations may not be met against cloud stores, + * where they may be O(files) or O(data) -this does not affect + * the correctness of the algorithm. + * The list of partitions is calculated during the task attempts; + * each task returns their partition list in their + * TaskCommitMessage. + * + * If the job commit stage fails during any part of the commit + * process prior to the partition overwrite stage then all changes + * are exclusively limited to the .spark-staging subdirectory. + * If the job commit stage fails during the partition overwrite + * process then provided the destination + * filesystem supports atomic directory delete and rename, + * the final output directories may contain one or more + * partitions which have been updated -or even, having been + * deleted and not yet recreated, no longer exist. + * It will *not* contain any partially deleted or incompletely + * renamed partitions. Nor will any directories contain a mix + * of files from before or after the job. + * If the destination filesystem does not support atomic + * directory operations (For example, Google GCS), there + * may be partitions with incomplete "before" or "after" + * datasets. There will still be no mix of data within any + * partition. */ class HadoopMapReduceCommitProtocol( jobId: String, @@ -129,9 +218,7 @@ class HadoopMapReduceCommitProtocol( // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => if (dynamicPartitionOverwrite) { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get + addPartitionedDir(dir) } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) @@ -144,6 +231,31 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Record the directory used so that dynamic partition overwrite + * knows to delete it. + * Includes the check that the directory is defined. + * @param dir directory + */ + protected def addPartitionedDir(dir: Option[String]): Unit = { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } + + /** + * Get an immutable copy of the partition set of a task attempt. + * Will be None unless/until [[setupTask()]], including the Job instance. + * @return None if not initiated; an immutable set otherwise. + */ + protected def getPartitions: Option[Set[String]] = { + if (partitionPaths != null) { + Some(partitionPaths.toSet) + } else { + None + } + } + override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index 06342645e6d9d..89c230b7deb82 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite is required when writing data through a hadoop committer, Spark will always permit this when the original `FileOutputCommitter` is used. For other committers, after their instantiation, Spark -will probe for their declaration of compatibility, and -permit the operation if state that they are compatible. +will probe for their declaration of compatibility. -If the committer is not compatible, the operation will fail with -the error message -`PathOutputCommitter does not support dynamicPartitionOverwrite` +If the committer is not compatible, the job will execute, but it +will print a warning +``` +Committer (committer name) has incomplete support for dynamic partition overwrite. +``` + +The job will still execute safely, and if the spark process fails at +any point before job commit, none of the output will be visible. +During job commit, the final stage of overwriting the existing partitions +may be slow -and the larger the amount of data generated the longer it +will take. -Unless there is a compatible committer for the target filesystem, -the sole solution is to use a cloud-friendly format for data -storage. +The solution is to use a cloud-friendly format for data storage, which should +deliver fast atomic job commits on all of the object stores which +Spark is compatible with. ## Further Reading diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 44a521bd636c5..998ced0ef849f 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -17,14 +17,11 @@ package org.apache.spark.internal.io.cloud -import java.io.IOException - import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} -import org.apache.spark.internal.io.FileNameSpec -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.io.{FileNameSpec, HadoopMapReduceCommitProtocol} /** * Spark Commit protocol for Path Output Committers. @@ -41,16 +38,24 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * Dynamic Partition support will be determined once the committer is * instantiated in the setupJob/setupTask methods. If this * class was instantiated with `dynamicPartitionOverwrite` set to true, - * then the instantiated committer must either be an instance of + * then the instantiated committer should either be an instance of * `FileOutputCommitter` or it must implement the `StreamCapabilities` * interface and declare that it has the capability * `mapreduce.job.committer.dynamic.partitioning`. * That feature is available on Hadoop releases with the Intermediate - * Manifest Committer for GCS and ABFS; it is not supported by the - * S3A committers. + * Manifest Committer for GCS and ABFS; it is not declared + * as supported by the S3A committers where file rename is O(data). + * If a committer does not declare explicit support for dynamic partition + * support then the extra set of renames which take place during job commit, + * after the PathOutputCommitter itself promotes work to the destination + * directory, may take a large amount of time. + * That is exactly the behaviour of dynamic partitioned jobs on S3 + * through the S3A committer: this will work, but job commit reverts + * to being O(data). + * * @constructor Instantiate. - * @param jobId job - * @param dest destination + * @param jobId job + * @param dest destination * @param dynamicPartitionOverwrite does the caller want support for dynamic * partition overwrite? */ @@ -123,7 +128,8 @@ class PathOutputCommitProtocol( logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + logWarning(s"Committer $committer has incomplete support for" + + " dynamic partition overwrite. It may be slow.") } } } @@ -155,6 +161,11 @@ class PathOutputCommitProtocol( dir: Option[String], spec: FileNameSpec): String = { + // if there is dynamic partition overwrite, its directory must + // be validated and included in the set of partitions. + if (dynamicPartitionOverwrite) { + addPartitionedDir(dir) + } val workDir = committer.getWorkPath val parent = dir.map { d => new Path(workDir, d) @@ -165,26 +176,42 @@ class PathOutputCommitProtocol( } /** - * Reject any requests for an absolute path file on a committer which - * is not compatible with it. + * Make the getPartitions call visible for testing. + */ + override protected[cloud] def getPartitions: Option[Set[String]] = + super.getPartitions + + /** + * Create a temporary file with an absolute path. + * Note that this is dangerous as the outcome of any job commit failure + * is undefined, and potentially slow on cloud storage. * * @param taskContext task context * @param absoluteDir final directory * @param spec output filename * @return a path string - * @throws UnsupportedOperationException if incompatible */ override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { - if (supportsDynamicPartitions) { - super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - } else { - throw new UnsupportedOperationException(s"Absolute output locations not supported" + - s" by committer $committer") + // qualify the path in the same fs as the staging dir. + // this makes sure they are in the same filesystem + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + val target = fs.makeQualified(new Path(absoluteDir)) + if (dynamicPartitionOverwrite) { + // safety check to make sure that the destination path + // is not a parent of the destination -as if so it will + // be deleted and the job will fail quite dramatically. + + require(!isAncestorOf(target, stagingDir), + s"cannot not use $target as a destination of work" + + s" in dynamic partitioned overwrite query writing to $stagingDir") } + val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + logTrace(s"Creating temporary file $temp for absolute dir $target") + temp } } @@ -206,10 +233,6 @@ object PathOutputCommitProtocol { */ val REJECT_FILE_OUTPUT_DEFVAL = false - /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + - " dynamicPartitionOverwrite" - /** * Stream Capabilities probe for spark dynamic partitioning compatibility. */ @@ -220,4 +243,33 @@ object PathOutputCommitProtocol { * Scheme prefix for per-filesystem scheme committers. */ private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + + /** + * Classname of the manifest committer factory (Hadoop 3.3.5+). + * If present, the manifest committer is available; if absent it is not. + * By setting the factory for a filesystem scheme or a job to this + * committer, task commit is implemented by saving a JSON manifest of + * files to rename. + * Job commit consists of reading these files, creating the destination directories + * and then renaming the new files into their final location. + */ + private[cloud] val MANIFEST_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory" + + /** + * Is one path equal to or ancestor of another? + * + * @param parent parent path; may be root. + * @param child path which is to be tested + * @return true if the paths are the same or parent is above child + */ + private[cloud] def isAncestorOf(parent: Path, child: Path): Boolean = { + if (parent == child) { + true + } else if (child.isRoot) { + false + } else { + isAncestorOf(parent, child.getParent) + } + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 984c7dbc2cb1b..f2d5fcdb60975 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -17,18 +17,20 @@ package org.apache.spark.internal.io.cloud -import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.io.IOUtils import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat, PathOutputCommitterFactory} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol._ +import org.apache.spark.sql.internal.SQLConf class CommitterBindingSuite extends SparkFunSuite { @@ -55,14 +57,13 @@ class CommitterBindingSuite extends SparkFunSuite { test("BindingParquetOutputCommitter binds to the inner committer") { val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, + taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] + val inner = parquet.boundCommitter + .asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -80,7 +81,7 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobAborted, s"$inner job not aborted") val binding = new BindingPathOutputCommitter(path, tContext) - // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case + // MAPREDUCE-7403 only arrived in hadoop 3.3.5; this test case // is designed to work with versions with and without the feature. if (binding.isInstanceOf[StreamCapabilities]) { // this version of hadoop does support hasCapability probes @@ -101,6 +102,7 @@ class CommitterBindingSuite extends SparkFunSuite { val job = Job.getInstance(new Configuration()) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) conf.setBoolean(successMarker, true) FileOutputFormat.setOutputPath(job, outDir) job @@ -145,34 +147,83 @@ class CommitterBindingSuite extends SparkFunSuite { } /* - * Bind a job to a committer which doesn't support dynamic partitioning. - * Job setup must fail, and calling `newTaskTempFileAbsPath()` must - * raise `UnsupportedOperationException`. + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. */ - test("reject dynamic partitioning if not supported") { - val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + test("permit dynamic partitioning even not declared as supported") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf = newJob(path).getConfiguration + StubPathOutputCommitterBinding.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = instantiateCommitter(path, true) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") + + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + val partionSet1 = committer.getPartitions.get + assert(partionSet1 === Set("part=1")) + + val file2 = new Path( + committer.newTaskTempFile(tContext, Option("part=2"), + FileNameSpec("prefix", ".csv"))) + assert(file2.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file2.getName.startsWith("prefix"), s"wrong prefix in $file1") + + val partionSet2 = committer.getPartitions.get + assert(partionSet2 === Set("part=1", "part=2")) + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /* + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. + */ + test("basic committer") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bind(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer = FileCommitProtocol.instantiate( + val committer = instantiateCommitter(path, false) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") + + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + + assert(committer.getPartitions.get.isEmpty, + "partitions are being collected in a non-dynamic job") + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /** + * Instantiate a committer. + * + * @param path path to bind to + * @param dynamic use dynamicPartitionOverwrite + * @return the committer + */ + private def instantiateCommitter(path: Path, + dynamic: Boolean): PathOutputCommitProtocol = { + FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, path.toUri.toString, - true) - val ioe = intercept[IOException] { - committer.setupJob(tContext) - } - if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw ioe - } - - // calls to newTaskTempFileAbsPath() will be rejected - intercept[UnsupportedOperationException] { - verifyAbsTempFileWorks(tContext, committer) - } + dynamic).asInstanceOf[PathOutputCommitProtocol] } /* @@ -182,59 +233,96 @@ class CommitterBindingSuite extends SparkFunSuite { * can be moved to an absolute path later. */ test("permit dynamic partitioning if the committer says it works") { - val path = new Path("http://example/data") + val path = new Path("http://example/dir1/dir2/dir3") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - path.toUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer: PathOutputCommitProtocol = instantiateCommitter(path, true) committer.setupJob(tContext) committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) + + // attempt to create files in directories above the job + // dir, which in dynamic partitioning will result in the delete + // of the parent dir, hence loss of the job. + List("/dir1", "/dir1/dir2", "/dir1/dir2/dir3", "", "/").foreach { d => + intercept[IllegalArgumentException] { + committer.newTaskTempFileAbsPath(tContext, d, ".ext") + } + } + // "adjacent" paths and child paths are valid. + List("/d", "/dir12", "/dir1/dir2/dir30", "/dir1/dir2/dir3/dir4") + .foreach { + committer.newTaskTempFileAbsPath(tContext, _, ".ext") + } } /* * Create a FileOutputCommitter through the PathOutputCommitProtocol * using the relevant factory in hadoop-mapreduce-core JAR. */ - test("FileOutputCommitter through PathOutputCommitProtocol") { + test("Dynamic FileOutputCommitter through PathOutputCommitProtocol") { // temp path; use a unique filename val jobCommitDir = File.createTempFile( "FileOutputCommitter-through-PathOutputCommitProtocol", "") try { // delete the temp file and create a temp dir. - jobCommitDir.delete(); - val jobUri = jobCommitDir.toURI + jobCommitDir.delete() // hadoop path of the job - val path = new Path(jobUri) - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration bindToFileOutputCommitterFactory(conf, "file") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - jobUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer = instantiateCommitter(path, true) committer.setupJob(tContext) + // unless/until setupTask() is invoked. the partition list is not created, + // this means that the job manager instance will return None + // on a call to getPartitions. + assert(committer.getPartitions.isEmpty, + "committer partition list should be empty") committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) } finally { - jobCommitDir.delete(); + jobCommitDir.delete() } } /** - * Verify that a committer supports `newTaskTempFileAbsPath()`. + * When the FileOutputCommitter has been forcibly disabled, + * attempting to create it will raise an exception. + */ + test("FileOutputCommitter disabled") { + // temp path; use a unique filename + val jobCommitDir = File.createTempFile( + "FileOutputCommitter-disabled", + "") + try { + // delete the temp file and create a temp dir. + jobCommitDir.delete() + // hadoop path of the job + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration + bindToFileOutputCommitterFactory(conf, "file") + conf.setBoolean(REJECT_FILE_OUTPUT, true) + intercept[IllegalArgumentException] { + instantiateCommitter(path, true) + .setupJob(new TaskAttemptContextImpl(conf, taskAttemptId0)) + } + // the committer never created the destination directory + assert(!jobCommitDir.exists(), + s"job commit dir $jobCommitDir should not have been created") + } finally { + jobCommitDir.delete() + } + } + + /** + * Verify that a committer supports `newTaskTempFileAbsPath()`, + * returning a new file under /tmp. * * @param tContext task context * @param committer committer @@ -260,8 +348,103 @@ class CommitterBindingSuite extends SparkFunSuite { */ def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, - "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") + CommitterBindingSuite.FILE_OUTPUT_COMMITTER_FACTORY) + } + +} + +/** + * Constants for this and related test suites. + */ +private[cloud] object CommitterBindingSuite extends Logging { + val FILE_OUTPUT_COMMITTER_FACTORY: String = "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory" + + val PATH_OUTPUT_COMMITTER_NAME: String = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol" + + val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS: String = + "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" + + /** + * Options to bind to the path committer through SQL and parquet. + */ + val BIND_TO_PATH_COMMITTER: Map[String, String] = Map( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + BINDING_PARQUET_OUTPUT_COMMITTER_CLASS, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> PATH_OUTPUT_COMMITTER_NAME, + ) + + /** + * Prefix to use for manifest committer options. + */ + val MANIFEST_OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer." + + /** + * Directory for saving job summary reports. + * These are the `_SUCCESS` files, but are saved even on + * job failures. + */ + val OPT_SUMMARY_REPORT_DIR: String = MANIFEST_OPT_PREFIX + + "summary.report.directory" + + /** + * Directory under target/ for reports. + */ + val JOB_REPORTS_DIR = "./target/reports/" + + /** + * Subdir for manifest committer _SUMMARY files. + */ + val SUMMARY_SUBDIR = "summary" + + /** + * Enable the path committer in a spark configuration, including optionally + * the manifest committer if the test is run on a hadoop build with it. + * This committer, which works on file:// repositories + * scales better on azure and google cloud stores, and + * collects and reports IOStatistics from task commit IO as well + * as Job commit operations. + * + * @param conf the configuration to modify + * @param tryToUseManifest should the manifest be probed for and enabled if found? + * @return (is the manifest in use, report directory) + */ + def enablePathCommitter(conf: SparkConf, + tryToUseManifest: Boolean): (Boolean, File) = { + val reportsDir = new File(JOB_REPORTS_DIR).getCanonicalFile + conf.setAll(BIND_TO_PATH_COMMITTER) + + if (!tryToUseManifest) { + // no need to look for the manifest. + return (false, reportsDir) + } + // look for the manifest committer exactly once. + val loader = getClass.getClassLoader + if (manifestCommitterFound) { + // manifest committer class was found so bind to and configure it. + logInfo("Using Manifest Committer") + conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, + MANIFEST_COMMITTER_FACTORY) + // save full _SUCCESS files for the curious; this includes timings + // of operations in task as well as job commit. + conf.set(OPT_SUMMARY_REPORT_DIR, + new File(reportsDir, SUMMARY_SUBDIR).getCanonicalFile.toURI.toString) + } + (manifestCommitterFound, reportsDir) } + /** + * Is the manifest committer present. + */ + lazy val manifestCommitterFound: Boolean = { + try { + getClass.getClassLoader.loadClass(PathOutputCommitProtocol.MANIFEST_COMMITTER_FACTORY) + true + } catch { + case _: ClassNotFoundException => + false + } + } } + + diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala new file mode 100644 index 0000000000000..fcf733d92ea63 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import org.apache.spark.sql.sources.PartitionedWriteSuite +import org.apache.spark.SparkConf +import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter + +/** + * Run the spark-sql core PartitionedWriteSuite through the + * PathOutputCommitter. + */ +class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite { + + /** + * Create a job configuration using the PathOutputCommitterProtocol + * through Parquet. + * + * @return the spark configuration to use. + */ + override protected def sparkConf: SparkConf = { + + val conf = super.sparkConf + enablePathCommitter(conf, true) + conf + } +}