From 47bc2299ac08824e68a166426347d6e629f0078a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 9 Aug 2022 18:25:31 +0100 Subject: [PATCH 1/7] SPARK-40034. PathOutputCommitters to support dynamic partition overwrite. Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Change-Id: I5cbc391bc021b4dd177374e82de9fc33137ac319 Change-Id: I772caf861d6c92f0da6d9a02d9f899236ddaddf9 --- .../cloud/BindingParquetOutputCommitter.scala | 8 ++- .../io/cloud/PathOutputCommitProtocol.scala | 44 ++++++++++---- .../io/cloud/CommitterBindingSuite.scala | 51 ++++++++++++---- .../io/cloud/StubPathOutputCommitter.scala | 60 ++++++++++++++++--- 4 files changed, 131 insertions(+), 32 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala index 81a57385dd971..1e740a6e7786d 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} import org.apache.parquet.hadoop.ParquetOutputCommitter @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging class BindingParquetOutputCommitter( path: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(path, context) with Logging { + extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities { logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") @@ -119,4 +119,8 @@ class BindingParquetOutputCommitter( } override def toString: String = s"BindingParquetOutputCommitter($committer)" + + override def hasCapability(capability: String): Boolean = + committer.isInstanceOf[StreamCapabilities] && + committer.asInstanceOf[StreamCapabilities].hasCapability(capability) } diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index fc5d0a0b3a7f5..b1b4359b7b48c 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} @@ -51,14 +51,7 @@ class PathOutputCommitProtocol( jobId: String, dest: String, dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable { - - if (dynamicPartitionOverwrite) { - // until there's explicit extensions to the PathOutputCommitProtocols - // to support the spark mechanism, it's left to the individual committer - // choice to handle partitioning. - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) - } + extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable { /** The committer created. */ @transient private var committer: PathOutputCommitter = _ @@ -88,6 +81,7 @@ class PathOutputCommitProtocol( logTrace(s"Setting up committer for path $destination") committer = PathOutputCommitterFactory.createCommitter(destPath, context) + // Special feature to force out the FileOutputCommitter, so as to guarantee // that the binding is working properly. val rejectFileOutput = context.getConfiguration @@ -114,10 +108,29 @@ class PathOutputCommitProtocol( // failures. Warn logTrace(s"Committer $committer may not be tolerant of task commit failures") } + } else { + + // if required other committers need to be checked for dynamic partition + // compatibility through a StreamCapabilities probe. + if (dynamicPartitionOverwrite) { + logDebug(s"Checking dynamic partition overwrite support in committer $committer") + if (committer.isInstanceOf[StreamCapabilities] + && committer.asInstanceOf[StreamCapabilities] + .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) { + logDebug( + s"Committer $committer has declared compatibility with dynamic partition overwrite") + } else { + throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + } + } } + + + committer } + /** * Create a temporary file for a task. * @@ -161,7 +174,18 @@ object PathOutputCommitProtocol { val REJECT_FILE_OUTPUT_DEFVAL = false /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" + + private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + " dynamicPartitionOverwrite" + + /** + * Stream Capabilities probe for spark dynamic partitioning compatibility. + */ + private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = "mapreduce.job.committer.dynamic.partitioning" + + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 546f54229ea59..2fc8d477f0547 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.internal.io.cloud import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} -import java.lang.reflect.InvocationTargetException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.CAPABILITY_DYNAMIC_PARTITIONING class CommitterBindingSuite extends SparkFunSuite { @@ -57,10 +57,12 @@ class CommitterBindingSuite extends SparkFunSuite { conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterFactory.bind(conf, "http") + StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter] + val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] + assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING), + s"committer $parquet does not declare dynamic partition support") parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -130,16 +132,41 @@ class CommitterBindingSuite extends SparkFunSuite { assert("file:///tmp" === protocol.destination) } - test("reject dynamic partitioning") { - val cause = intercept[InvocationTargetException] { - FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, "file:///tmp", true) - }.getCause - if (cause == null || !cause.isInstanceOf[IOException] - || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw cause + test("reject dynamic partitioning if not supported") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + + StubPathOutputCommitterBinding.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, path.toUri.toString, true) + + val ioe = intercept[IOException] { + committer.setupJob(tContext) } + if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { + throw ioe + } + } + + test("permit dynamic partitioning if the committer says it works") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + + StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, path.toUri.toString, true) + committer.setupJob(tContext) + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala index 88a36d227b11a..5a0dba45ba87a 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala @@ -18,10 +18,12 @@ package org.apache.spark.internal.io.cloud import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory} +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} + /** * A local path output committer which tracks its state, for use in tests. * @param outputPath final destination. @@ -91,10 +93,45 @@ class StubPathOutputCommitterFactory extends PathOutputCommitterFactory { } private def workPath(out: Path): Path = new Path(out, - StubPathOutputCommitterFactory.TEMP_DIR_NAME) + StubPathOutputCommitterBinding.TEMP_DIR_NAME) +} + +/** + * An extension which declares that it supports dynamic partitioning. + * @param outputPath final destination. + * @param workPath work path + * @param context task/job attempt. + */ +class StubPathOutputCommitterWithDynamicPartioning( + outputPath: Path, + workPath: Path, + context: TaskAttemptContext) extends StubPathOutputCommitter(outputPath, workPath, context) + with StreamCapabilities { + + override def hasCapability(capability: String): Boolean = + CAPABILITY_DYNAMIC_PARTITIONING == capability + } -object StubPathOutputCommitterFactory { + +class StubPathOutputCommitterWithDynamicPartioningFactory extends PathOutputCommitterFactory { + + override def createOutputCommitter( + outputPath: Path, + context: TaskAttemptContext): PathOutputCommitter = { + new StubPathOutputCommitterWithDynamicPartioning(outputPath, workPath(outputPath), context) + } + + private def workPath(out: Path): Path = new Path(out, + StubPathOutputCommitterBinding.TEMP_DIR_NAME) +} + + +/** + * Class to help binding job configurations to the different + * stub committers available. + */ +object StubPathOutputCommitterBinding { /** * This is the "Pending" directory of the FileOutputCommitter; @@ -102,11 +139,6 @@ object StubPathOutputCommitterFactory { */ val TEMP_DIR_NAME = "_temporary" - /** - * Scheme prefix for per-filesystem scheme committers. - */ - val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" - /** * Given a hadoop configuration, set up the factory binding for the scheme. * @param conf config to patch @@ -117,4 +149,16 @@ object StubPathOutputCommitterFactory { conf.set(key, classOf[StubPathOutputCommitterFactory].getName()) } + /** + * Bind the configuration/scheme to the stub committer which + * declares support for dynamic partitioning. + * + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bindWithDynamicPartitioning(conf: Configuration, scheme: String): Unit = { + val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme + conf.set(key, + classOf[StubPathOutputCommitterWithDynamicPartioningFactory].getName()) + } } From 545f294f57ebeebe52b39b16a6b9a99271700469 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 15 Aug 2022 19:57:29 +0100 Subject: [PATCH 2/7] SPARK-40034. reject newTaskTempFileAbsPath() when dynamic partition unsupported I believe this was always implicit; only committers with dynamic partition overwrite would be asked for absolute path temp files(*). With this change it is explicit, with tests. (*) certainly nobody has ever complained about it not working with the s3a committers Change-Id: I57c2a02ad799f7ab5d9d0a3053da24f960bad289 --- .../io/cloud/PathOutputCommitProtocol.scala | 37 +++++++++++++++++-- .../io/cloud/CommitterBindingSuite.scala | 24 ++++++++++-- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index b1b4359b7b48c..f40f9a1447670 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -114,9 +114,7 @@ class PathOutputCommitProtocol( // compatibility through a StreamCapabilities probe. if (dynamicPartitionOverwrite) { logDebug(s"Checking dynamic partition overwrite support in committer $committer") - if (committer.isInstanceOf[StreamCapabilities] - && committer.asInstanceOf[StreamCapabilities] - .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) { + if (supportsDynamicPartitions) { logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { @@ -131,6 +129,17 @@ class PathOutputCommitProtocol( } + /** + * Does the instantiated committer support dynamic partitions? + * @return true if the committer declares itself compatible. + */ + private def supportsDynamicPartitions = { + committer.isInstanceOf[FileOutputCommitter] || + (committer.isInstanceOf[StreamCapabilities] && + committer.asInstanceOf[StreamCapabilities] + .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) + } + /** * Create a temporary file for a task. * @@ -153,6 +162,28 @@ class PathOutputCommitProtocol( file.toString } + /** + * Reject any requests for an absolute path file on a committer which + * is not compatible with it. + * + * @param taskContext task context + * @param absoluteDir final directory + * @param spec output filename + * @return a path string + * @throws UnsupportedOperationException if incompatible + */ + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, + absoluteDir: String, + spec: FileNameSpec): String = { + + if (supportsDynamicPartitions) { + super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + } else { + throw new UnsupportedOperationException(s"Absolute output locations not supported" + + s" by committer $committer") + } + } } object PathOutputCommitProtocol { diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 2fc8d477f0547..52d78eee04151 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.CAPABILITY_DYNAMIC_PARTITIONING class CommitterBindingSuite extends SparkFunSuite { @@ -151,6 +151,12 @@ class CommitterBindingSuite extends SparkFunSuite { if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { throw ioe } + intercept[UnsupportedOperationException] { + committer.newTaskTempFileAbsPath( + tContext, + "/tmp", + FileNameSpec("lotus", ".123")) + } } test("permit dynamic partitioning if the committer says it works") { @@ -162,11 +168,21 @@ class CommitterBindingSuite extends SparkFunSuite { StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer = FileCommitProtocol.instantiate( + val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( pathCommitProtocolClassname, - jobId, path.toUri.toString, true) + jobId, + path.toUri.toString, + true).asInstanceOf[PathOutputCommitProtocol] committer.setupJob(tContext) - + committer.setupTask(tContext) + + val spec = FileNameSpec(".lotus.", ".123") + val absPath = committer.newTaskTempFileAbsPath( + tContext, + "/tmp", + spec) + assert(absPath.endsWith(".123"), s"wrong suffix in $absPath") + assert(absPath.contains("lotus"), s"wrong prefix in $absPath") } } From daae6937f363c57b496aed1170627da211c4c703 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 16 Aug 2022 15:03:14 +0100 Subject: [PATCH 3/7] SPARK-40034. skip parquet dynamic binding tests on incompatible hadoop versions If mapreduce-core BindingPathOutputCommitter doesn't implement StreamCapabilities the probes for dynamic commit support through the parquet committer don't work. So skip that bit of the test case Change-Id: I5225c70a54c63adf858a9f429fddad251b79783e --- .../io/cloud/CommitterBindingSuite.scala | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 52d78eee04151..221e715418e32 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.internal.io.cloud import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.io.IOUtils -import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite @@ -49,20 +49,27 @@ class CommitterBindingSuite extends SparkFunSuite { * [[BindingParquetOutputCommitter]] committer bind to the schema-specific * committer declared for the destination path? And that lifecycle events * are correctly propagated? + * This only works with a hadoop build where BindingPathOutputCommitter + * does passthrough of stream capabilities, so check that first. */ test("BindingParquetOutputCommitter binds to the inner committer") { + + val path = new Path("http://example/data") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + + StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + + val parquet = new BindingParquetOutputCommitter(path, tContext) val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] - assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING), - s"committer $parquet does not declare dynamic partition support") + parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -78,6 +85,16 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobCommitted, s"$inner job not committed") parquet.abortJob(tContext, JobStatus.State.RUNNING) assert(inner.jobAborted, s"$inner job not aborted") + + val binding = new BindingPathOutputCommitter(path, tContext) + if (binding.isInstanceOf[StreamCapabilities]) { + // this version of hadoop does support hasCapability probes + // through the BindingPathOutputCommitter used by the + // parquet committer, so verify that it goes through + // to the stub committer. + assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING), + s"committer $parquet does not declare dynamic partition support") + } } /** From fdf4cf46c365d32bdf905189dde6738353140358 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 19 Aug 2022 13:55:17 +0100 Subject: [PATCH 4/7] SPARK-40034. address review feedback Change-Id: I6ddc92f56d8762cebb76857a30a5b9dd4fe4948d --- .../io/cloud/PathOutputCommitProtocol.scala | 21 ++++++++----------- .../io/cloud/CommitterBindingSuite.scala | 17 --------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index f40f9a1447670..fefdaa715dad3 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -38,14 +38,18 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * In `setupCommitter` the factory is identified and instantiated; * this factory then creates the actual committer implementation. * - * @constructor Instantiate. dynamic partition overwrite is not supported, - * so that committers for stores which do not support rename - * will not get confused. + * Dynamic Partition support will be determined once the committer is + * instantiated in the setupJob/setupTask methods. If this + * class was instantiated with `dynamicPartitionOverwrite` set to true, + * then the instantiated committer must either be an instance of + * `FileOutputCommitter` or it must implement the `StreamCapabilities` + * interface and declare that it has the capability + * `mapreduce.job.committer.dynamic.partitioning`. + * @constructor Instantiate. * @param jobId job * @param dest destination * @param dynamicPartitionOverwrite does the caller want support for dynamic - * partition overwrite. If so, it will be - * refused. + * partition overwrite? */ class PathOutputCommitProtocol( jobId: String, @@ -109,11 +113,9 @@ class PathOutputCommitProtocol( logTrace(s"Committer $committer may not be tolerant of task commit failures") } } else { - // if required other committers need to be checked for dynamic partition // compatibility through a StreamCapabilities probe. if (dynamicPartitionOverwrite) { - logDebug(s"Checking dynamic partition overwrite support in committer $committer") if (supportsDynamicPartitions) { logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") @@ -122,9 +124,6 @@ class PathOutputCommitProtocol( } } } - - - committer } @@ -208,13 +207,11 @@ object PathOutputCommitProtocol { private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + " dynamicPartitionOverwrite" - /** * Stream Capabilities probe for spark dynamic partitioning compatibility. */ private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = "mapreduce.job.committer.dynamic.partitioning" - /** * Scheme prefix for per-filesystem scheme committers. */ diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 221e715418e32..10c515b7d20c4 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -54,22 +54,15 @@ class CommitterBindingSuite extends SparkFunSuite { */ test("BindingParquetOutputCommitter binds to the inner committer") { - val path = new Path("http://example/data") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - - - StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - - val parquet = new BindingParquetOutputCommitter(path, tContext) val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] - parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -113,23 +106,18 @@ class CommitterBindingSuite extends SparkFunSuite { test("committer protocol can be serialized and deserialized") { val tempDir = File.createTempFile("ser", ".bin") - tempDir.delete() val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false) - val serData = File.createTempFile("ser", ".bin") var out: ObjectOutputStream = null var in: ObjectInputStream = null - try { out = new ObjectOutputStream(new FileOutputStream(serData)) out.writeObject(committer) out.close in = new ObjectInputStream(new FileInputStream(serData)) val result = in.readObject() - val committer2 = result.asInstanceOf[PathOutputCommitProtocol] - assert(committer.destination === committer2.destination, "destination mismatch on round trip") assert(committer.destPath === committer2.destPath, @@ -144,7 +132,6 @@ class CommitterBindingSuite extends SparkFunSuite { val instance = FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, "file:///tmp", false) - val protocol = instance.asInstanceOf[PathOutputCommitProtocol] assert("file:///tmp" === protocol.destination) } @@ -155,13 +142,11 @@ class CommitterBindingSuite extends SparkFunSuite { val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterBinding.bind(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val committer = FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, path.toUri.toString, true) - val ioe = intercept[IOException] { committer.setupJob(tContext) } @@ -182,7 +167,6 @@ class CommitterBindingSuite extends SparkFunSuite { val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( @@ -192,7 +176,6 @@ class CommitterBindingSuite extends SparkFunSuite { true).asInstanceOf[PathOutputCommitProtocol] committer.setupJob(tContext) committer.setupTask(tContext) - val spec = FileNameSpec(".lotus.", ".123") val absPath = committer.newTaskTempFileAbsPath( tContext, From c776fd5df48bcadff713e9f78f5f49cab2a872c1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Aug 2022 12:13:20 +0100 Subject: [PATCH 5/7] SPARK-40034. Documentation changes * Section in cloud-integration docs * Add references to committers and papers related to them. * Remove hadoop openstack reference (it's going to be cut soon) No mention of the Intermediate Manifest Committer until it is shipped in an ASF release. It is in Cloudera CDH and has been trouble free, unlike FileOutputCommitter with abfs (scale) and gcs (correctness). Change-Id: I97bf56336f6fd6cbd6d56e87c911e62a6deff9c8 --- docs/cloud-integration.md | 49 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index d65616ed0b8d1..de32442f5d566 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -271,17 +271,55 @@ More details on these committers can be found in the latest Hadoop documentation Note: depending upon the committer used, in-progress statistics may be under-reported with Hadoop versions before 3.3.1. +## Cloud Committers and `INSERT OVERWRITE TABLE` + +Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those +partitions into which new data is added will have their contents replaced. + +This is used in SQL statements of the form `INSERT OVERWRITE TABLE`, +and when Datasets are written in mode "overwrite" + +{% highlight scala %} +eventDataset.write + .mode("overwrite") + .partitionBy("year", "month") + .format("parquet") + .save(tablePath) +{% endhighlight %} + +This feature uses file renaming and has specific requirements of +both the committer and the filesystem: + +1. The committer's working directory must be in the destination filesystem. +2. The target filesystem must support file rename efficiently. + +These conditions are _not_ met by the S3A committers and AWS S3 storage. + +Committers for other cloud stores _may_ support this feature, and +declare to spark that they are compatible. If dynamic partition overwrite +is required when writing data through a hadoop committer, Spark +will always permit this when the original `FileOutputCommitter` +is used. For other committers, after their instantiation, Spark +will probe for their declaration of compatibility, and +permit the operation if state that they are compatible. + +If the committer is not compatible, the operation will fail with +the error message +`PathOutputCommitter does not support dynamicPartitionOverwrite` + +Unless there is a compatible committer for the target filesystem, +the sole solution is to use a cloud-friendly format for data +storage. + ## Further Reading Here is the documentation on the standard connectors both from Apache and the cloud providers. -* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). * [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html). * [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html). * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). * [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/) * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html). -* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html). * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon. * [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google. @@ -289,3 +327,10 @@ Here is the documentation on the standard connectors both from Apache and the cl * IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM. * [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs). + +The Cloud Committer problem and hive-compatible solutions +* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html) +* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/) +* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/). +* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812) + From 2d501834db6317461b8f57f42abec357f6cc1dfb Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 7 Sep 2022 15:39:38 +0100 Subject: [PATCH 6/7] SPARK-40034. review feedback 1. docs 2. tests Change-Id: Ia31cf91999157057f1a85061826da74db7f1713e --- docs/cloud-integration.md | 63 +++++++++++++- .../io/cloud/CommitterBindingSuite.scala | 82 +++++++++++++++++-- 2 files changed, 136 insertions(+), 9 deletions(-) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index de32442f5d566..fbcef7db12d02 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -242,8 +242,13 @@ exhibits eventual consistency (example: S3), and often slower than classic filesystem renames. Some object store connectors provide custom committers to commit tasks and -jobs without using rename. In versions of Spark built with Hadoop 3.1 or later, -the S3A connector for AWS S3 is such a committer. +jobs without using rename. + +### Hadoop S3A committers + +In versions of Spark built with Hadoop 3.1 or later, +the hadoop-aws JAR contains committers safe to use for S3 storage +accessed via the s3a connector. Instead of writing data to a temporary directory on the store for renaming, these committers write the files to the final destination, but do not issue @@ -266,11 +271,62 @@ It has been tested with the most common formats supported by Spark. mydataframe.write.format("parquet").save("s3a://bucket/destination") ``` -More details on these committers can be found in the latest Hadoop documentation. +More details on these committers can be found in +[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/) +with S3A committer detail covered in +[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html). Note: depending upon the committer used, in-progress statistics may be under-reported with Hadoop versions before 3.3.1. +### Amazon EMR: the EMRFS S3-optimized committer + +Amazon EMR has its own S3-aware committers for parquet data. +For instructions on use, see +[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) + +For implementation and performanc details, see +["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/ + + +### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer. + +Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later) +contain a committer optimized for performance and resilience on +Azure ADLS Generation 2 and Google Cloud Storage. + +This committer, the "manifest committer" uses a manifest file to propagate +directory listing information from the task committers to the job committer. +These manifests can be written atomically, without relying on atomic directory rename, +something GCS lacks. + +The job commmitter reads these manifests and will rename files from the task output +directories directly into the destination directory, in parallel, with optional +rate limiting to avoid throttling IO. +This deliviers performance and scalability on the object stores. + +It is not critical for job correctness to use this with Azure storage; the +classic FileOutputCommitter is safe there -however this new committer scales +better for large jobs with deep and wide directory trees. + +Because google GCS does not support atomic directory renaming, +the manifest committer should be used where available. + +This committer does support "dynamic partition overwrite" (see below). + +For details on availability and use of this committer, consult +the hadoop documentation for the Hadoop release used. + +It is not available on Hadoop 3.3.4 or earlier. + +### IBM Cloud Object Storage: Stocator + +IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift. + +Source, documentation and releasea can be found at +[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark). + + ## Cloud Committers and `INSERT OVERWRITE TABLE` Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those @@ -331,6 +387,7 @@ Here is the documentation on the standard connectors both from Apache and the cl The Cloud Committer problem and hive-compatible solutions * [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html) * [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/) +* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) * [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/). * [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812) diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 10c515b7d20c4..51caa4597fd2c 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.CAPABILITY_DYNAMIC_PARTITIONING +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} class CommitterBindingSuite extends SparkFunSuite { @@ -80,6 +80,8 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobAborted, s"$inner job not aborted") val binding = new BindingPathOutputCommitter(path, tContext) + // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case + // is designed to work with versions with and without the feature. if (binding.isInstanceOf[StreamCapabilities]) { // this version of hadoop does support hasCapability probes // through the BindingPathOutputCommitter used by the @@ -136,6 +138,11 @@ class CommitterBindingSuite extends SparkFunSuite { assert("file:///tmp" === protocol.destination) } + /* + * Bind a job to a committer which doesn't support dynamic partitioning. + * Job setup must fail, and calling `newTaskTempFileAbsPath()` must + * raise `UnsupportedOperationException`. + */ test("reject dynamic partitioning if not supported") { val path = new Path("http://example/data") val job = newJob(path) @@ -146,21 +153,28 @@ class CommitterBindingSuite extends SparkFunSuite { val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val committer = FileCommitProtocol.instantiate( pathCommitProtocolClassname, - jobId, path.toUri.toString, true) + jobId, + path.toUri.toString, + true) val ioe = intercept[IOException] { committer.setupJob(tContext) } if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { throw ioe } + + // calls to newTaskTempFileAbsPath() will be rejected intercept[UnsupportedOperationException] { - committer.newTaskTempFileAbsPath( - tContext, - "/tmp", - FileNameSpec("lotus", ".123")) + verifyAbsTempFileWorks(tContext, committer) } } + /* + * Bind to a committer with dynamic partitioning support, + * verify that job and task setup works, and that + * `newTaskTempFileAbsPath()` creates a temp file which + * can be moved to an absolute path later. + */ test("permit dynamic partitioning if the committer says it works") { val path = new Path("http://example/data") val job = newJob(path) @@ -176,6 +190,50 @@ class CommitterBindingSuite extends SparkFunSuite { true).asInstanceOf[PathOutputCommitProtocol] committer.setupJob(tContext) committer.setupTask(tContext) + verifyAbsTempFileWorks(tContext, committer) + } + + /* + * Create a FileOutputCommitter through the PathOutputCommitProtocol + * using the relevant factory in hadoop-mapreduce-core JAR. + */ + test("FileOutputCommitter through PathOutputCommitProtocol") { + // temp path; use a unique filename + val jobCommitDir = File.createTempFile("FileOutputCommitter-through-PathOutputCommitProtocol", "") + try { + // delete the temp file and create a temp dir. + jobCommitDir.delete(); + val jobUri = jobCommitDir.toURI + // hadoop path of the job + val path = new Path(jobUri) + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + bindToFileOutputCommitterFactory(conf, "file") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, + jobUri.toString, + true).asInstanceOf[PathOutputCommitProtocol] + committer.setupJob(tContext) + committer.setupTask(tContext) + verifyAbsTempFileWorks(tContext, committer) + } finally { + jobCommitDir.delete(); + } + } + + /** + * Verify that a committer supports `newTaskTempFileAbsPath()`. + * + * @param tContext task context + * @param committer committer + */ + private def verifyAbsTempFileWorks( + tContext: TaskAttemptContextImpl, + committer: FileCommitProtocol): Unit = { val spec = FileNameSpec(".lotus.", ".123") val absPath = committer.newTaskTempFileAbsPath( tContext, @@ -185,5 +243,17 @@ class CommitterBindingSuite extends SparkFunSuite { assert(absPath.contains("lotus"), s"wrong prefix in $absPath") } + /** + * Given a hadoop configuration, explicitly set up the factory binding for the scheme + * to a committer factory which always creates FileOutputCommitters. + * + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { + conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, + "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") + } + } From 42b3fb099ce63e1692aca379b00a59344cbcbd67 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 8 Sep 2022 15:32:53 +0100 Subject: [PATCH 7/7] SPARK-40034. review feedback Change-Id: I71a15ba3909a8912351987ad2dfbba8dca83b5b8 --- docs/cloud-integration.md | 2 +- .../io/cloud/PathOutputCommitProtocol.scala | 16 ++++++++++------ .../io/cloud/CommitterBindingSuite.scala | 10 +++++++++- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index fbcef7db12d02..86dd380a91755 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -309,7 +309,7 @@ It is not critical for job correctness to use this with Azure storage; the classic FileOutputCommitter is safe there -however this new committer scales better for large jobs with deep and wide directory trees. -Because google GCS does not support atomic directory renaming, +Because Google GCS does not support atomic directory renaming, the manifest committer should be used where available. This committer does support "dynamic partition overwrite" (see below). diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index fefdaa715dad3..44a521bd636c5 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -45,6 +45,9 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * `FileOutputCommitter` or it must implement the `StreamCapabilities` * interface and declare that it has the capability * `mapreduce.job.committer.dynamic.partitioning`. + * That feature is available on Hadoop releases with the Intermediate + * Manifest Committer for GCS and ABFS; it is not supported by the + * S3A committers. * @constructor Instantiate. * @param jobId job * @param dest destination @@ -55,7 +58,8 @@ class PathOutputCommitProtocol( jobId: String, dest: String, dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable { + extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) + with Serializable { /** The committer created. */ @transient private var committer: PathOutputCommitter = _ @@ -85,7 +89,6 @@ class PathOutputCommitProtocol( logTrace(s"Setting up committer for path $destination") committer = PathOutputCommitterFactory.createCommitter(destPath, context) - // Special feature to force out the FileOutputCommitter, so as to guarantee // that the binding is working properly. val rejectFileOutput = context.getConfiguration @@ -174,7 +177,7 @@ class PathOutputCommitProtocol( override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, - spec: FileNameSpec): String = { + spec: FileNameSpec): String = { if (supportsDynamicPartitions) { super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) @@ -208,9 +211,10 @@ object PathOutputCommitProtocol { " dynamicPartitionOverwrite" /** - * Stream Capabilities probe for spark dynamic partitioning compatibility. - */ - private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = "mapreduce.job.committer.dynamic.partitioning" + * Stream Capabilities probe for spark dynamic partitioning compatibility. + */ + private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = + "mapreduce.job.committer.dynamic.partitioning" /** * Scheme prefix for per-filesystem scheme committers. diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 51caa4597fd2c..984c7dbc2cb1b 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -108,18 +108,23 @@ class CommitterBindingSuite extends SparkFunSuite { test("committer protocol can be serialized and deserialized") { val tempDir = File.createTempFile("ser", ".bin") + tempDir.delete() val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false) + val serData = File.createTempFile("ser", ".bin") var out: ObjectOutputStream = null var in: ObjectInputStream = null + try { out = new ObjectOutputStream(new FileOutputStream(serData)) out.writeObject(committer) out.close in = new ObjectInputStream(new FileInputStream(serData)) val result = in.readObject() + val committer2 = result.asInstanceOf[PathOutputCommitProtocol] + assert(committer.destination === committer2.destination, "destination mismatch on round trip") assert(committer.destPath === committer2.destPath, @@ -134,6 +139,7 @@ class CommitterBindingSuite extends SparkFunSuite { val instance = FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, "file:///tmp", false) + val protocol = instance.asInstanceOf[PathOutputCommitProtocol] assert("file:///tmp" === protocol.destination) } @@ -199,7 +205,9 @@ class CommitterBindingSuite extends SparkFunSuite { */ test("FileOutputCommitter through PathOutputCommitProtocol") { // temp path; use a unique filename - val jobCommitDir = File.createTempFile("FileOutputCommitter-through-PathOutputCommitProtocol", "") + val jobCommitDir = File.createTempFile( + "FileOutputCommitter-through-PathOutputCommitProtocol", + "") try { // delete the temp file and create a temp dir. jobCommitDir.delete();