From 762e473d3d2bd90110029006b06fb701825ecdde Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 29 Dec 2014 17:13:50 -0800 Subject: [PATCH 1/6] [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs. --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 13 +++++++++++-- docs/configuration.md | 5 ++++- .../spark/streaming/scheduler/JobScheduler.scala | 10 +++++++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 4469c89e6bb1c..f2e081af2c070 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -25,6 +25,7 @@ import scala.collection.{Map, mutable} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} @@ -964,7 +965,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value + if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -1042,7 +1044,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value + if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) @@ -1121,4 +1124,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] object PairRDDFunctions { val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + /** + * Used by Spark Streaming in order to bypass the `spark.hadoop.validateOutputSpecs` checks + * for save actions launched by Spark Streaming, since the validation may break Spark Streaming's + * ability to recover from checkpoints. See SPARK-4835 for more details. + */ + val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) } diff --git a/docs/configuration.md b/docs/configuration.md index fa9d311f85068..19c48f354a756 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -709,7 +709,10 @@ Apart from these, the following properties are also available, and may be useful If set to true, validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing output directories. We recommend that users do not disable this except if trying to achieve compatibility with - previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. + previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. + This setting is ignored for jobs launched by the Spark Streaming scheduler, since data may need to be rewritten to + pre-existing output directories during checkpoint recovery; to enable these checks in streaming, use the + spark.streaming.hadoop.validateOutputSpecs setting instead. spark.hadoop.cloneConf diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index cfa3cd8925c80..8f704729bf112 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import akka.actor.{ActorRef, Actor, Props} import org.apache.spark.{SparkException, Logging, SparkEnv} +import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ @@ -39,6 +40,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private val jobSets = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) + private val validateOutputSpecs = + ssc.conf.getBoolean("spark.streaming.hadoop.validateOutputSpecs", false) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock @@ -168,7 +171,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) - job.run() + // Disable checks for existing output directories in jobs launched by the streaming scheduler, + // since we may need to write output to an existing directory during checkpoint recovery; + // see SPARK-4835 for more details. + PairRDDFunctions.disableOutputSpecValidation.withValue(!validateOutputSpecs) { + job.run() + } eventActor ! JobCompleted(job) } } From e581d17965c00b5ed36b61e5ea2003ec4f0efeb1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 2 Jan 2015 17:42:26 -0800 Subject: [PATCH 2/6] Deduplicate isOutputSpecValidationEnabled logic. --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f2e081af2c070..219aa111df63e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -965,8 +965,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value - if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -1044,8 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value - if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) @@ -1120,6 +1118,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] def valueClass: Class[_] = vt.runtimeClass private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) + + // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation + // setting can take effect: + private def isOutputSpecValidationEnabled: Boolean = { + val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value + val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) + enabledInConf && !validationDisabled + } } private[spark] object PairRDDFunctions { From bf9094d8f5661d82d52c36f76e4c9471f85ee01d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 2 Jan 2015 17:46:59 -0800 Subject: [PATCH 3/6] Revise disableOutputSpecValidation() comment to not refer to Spark Streaming. --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 219aa111df63e..f8df5b2a08866 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1130,10 +1130,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] object PairRDDFunctions { val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + /** - * Used by Spark Streaming in order to bypass the `spark.hadoop.validateOutputSpecs` checks - * for save actions launched by Spark Streaming, since the validation may break Spark Streaming's - * ability to recover from checkpoints. See SPARK-4835 for more details. + * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case + * basis; see SPARK-4835 for more details. */ val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) } From 7b3e06a47004c229231c4edfc8bb3dcced5561ed Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 2 Jan 2015 17:51:13 -0800 Subject: [PATCH 4/6] Remove Streaming-specific setting to undo this change; update conf. guide --- docs/configuration.md | 5 ++--- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 4 +--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 19c48f354a756..9bb6499993735 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -710,9 +710,8 @@ Apart from these, the following properties are also available, and may be useful used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. - This setting is ignored for jobs launched by the Spark Streaming scheduler, since data may need to be rewritten to - pre-existing output directories during checkpoint recovery; to enable these checks in streaming, use the - spark.streaming.hadoop.validateOutputSpecs setting instead. + This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since + data may need to be rewritten to pre-existing output directories during checkpoint recovery. spark.hadoop.cloneConf diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 8f704729bf112..0e0f5bd3b9db4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -40,8 +40,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private val jobSets = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) - private val validateOutputSpecs = - ssc.conf.getBoolean("spark.streaming.hadoop.validateOutputSpecs", false) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock @@ -174,7 +172,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Disable checks for existing output directories in jobs launched by the streaming scheduler, // since we may need to write output to an existing directory during checkpoint recovery; // see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(!validateOutputSpecs) { + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } eventActor ! JobCompleted(job) From 6485cf880465cf7bd8e501dc861869be58029995 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 2 Jan 2015 18:36:03 -0800 Subject: [PATCH 5/6] Add test case in Streaming; fix bug for transform() --- .../spark/streaming/dstream/DStream.scala | 10 ++++++-- .../dstream/TransformedDStream.scala | 2 +- .../spark/streaming/CheckpointSuite.scala | 25 +++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 7f8651e719d84..cb862ecf72421 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import scala.util.matching.Regex import org.apache.spark.{Logging, SparkException} -import org.apache.spark.rdd.{BlockRDD, RDD} +import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName @@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] ( // set this DStream's creation site, generate RDDs and then restore the previous call site. val prevCallSite = ssc.sparkContext.getCallSite() ssc.sparkContext.setCallSite(creationSite) - val rddOption = compute(time) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. We need to have this call here because + // compute() might cause Spark jobs to be launched. + val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + compute(time) + } ssc.sparkContext.setCallSite(prevCallSite) rddOption.foreach { case newRDD => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 7cd4554282ca1..71b61856e23c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 72d055eb2ea31..1f098785a50d0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -255,6 +255,31 @@ class CheckpointSuite extends TestSuiteBase { } } + test("recovery with saveAsHadoopFile inside transform operation") { + // Regression test for SPARK-4835 + val tempDir = Files.createTempDir() + try { + testCheckpointedOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => { + s.transform { (rdd, time) => + val output = rdd.map(x => (x, 1)).reduceByKey(_ + _) + output.saveAsHadoopFile( + new File(tempDir, "result-" + time.milliseconds).getAbsolutePath, + classOf[Text], + classOf[IntWritable], + classOf[TextOutputFormat[Text, IntWritable]]) + output + } + }, + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + 3 + ) + } finally { + Utils.deleteRecursively(tempDir) + } + } + // This tests whether the StateDStream's RDD checkpoints works correctly such // that the system can recover from a master failure. This assumes as reliable, // replayable input source - TestInputDStream. From 36eaf355a17a9e6568d33257a84b4ad35e72b67f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 17:27:39 -0800 Subject: [PATCH 6/6] Add comment explaining use of transform() in test. --- .../apache/spark/streaming/CheckpointSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 1f098785a50d0..5d232c6ade7a9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -256,7 +256,21 @@ class CheckpointSuite extends TestSuiteBase { } test("recovery with saveAsHadoopFile inside transform operation") { - // Regression test for SPARK-4835 + // Regression test for SPARK-4835. + // + // In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch + // was restarted from a checkpoint since the output directory would already exist. However, + // the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the + // output matched correctly and not whether the post-restart batch had successfully finished + // without throwing any errors. The following test reproduces the same bug with a test that + // actually fails because the error in saveAsHadoopFile causes transform() to fail, which + // prevents the expected output from being written to the output stream. + // + // This is not actually a valid use of transform, but it's being used here so that we can test + // the fix for SPARK-4835 independently of additional test cleanup. + // + // After SPARK-5079 is addressed, should be able to remove this test since a strengthened + // version of the other saveAsHadoopFile* tests would prevent regressions for this issue. val tempDir = Files.createTempDir() try { testCheckpointedOperation(