From 02e50c2df8ff01dd4366f1583205ac28999dd88e Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Tue, 17 Nov 2015 17:17:28 +0800 Subject: [PATCH 1/5] Add a flag to control the restore in DStream to avoid dup works --- .../spark/streaming/dstream/DStream.scala | 15 ++-- .../spark/streaming/CheckpointSuite.scala | 72 ++++++++++++++++++- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 1a6edf9473d8..d1f0ae29542a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] ( private[streaming] val mustCheckpoint = false private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) + @transient + private[streaming] var restoredFromCheckpointData = false // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null @@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] ( * override the updateCheckpointData() method would also need to override this method. */ private[streaming] def restoreCheckpointData() { - // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data") - checkpointData.restore() - dependencies.foreach(_.restoreCheckpointData()) - logInfo("Restored checkpoint data") + if (!restoredFromCheckpointData) { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + restoredFromCheckpointData = true + logInfo("Restored checkpoint data") + } } @throws(classOf[IOException]) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 84f5294aa39c..e9db38832ece 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -30,13 +30,33 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.spark.rdd.RDD import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} +import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver} import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} +/** + * A input stream that records the times of restore() invoked + */ +private[streaming] class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) { + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + override def start(): Unit = { } + override def stop(): Unit = { } + override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.makeRDD(Seq(1))) + private[streaming] + class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { + @transient + var restoredTimes = 0 + override def restore() { + restoredTimes += 1 + super.restore() + } + } +} + /** * This test suites tests the checkpointing functionality of DStreams - * the checkpointing of a DStream's RDDs as well as the checkpointing of @@ -580,6 +600,56 @@ class CheckpointSuite extends TestSuiteBase { } } + test("DStream checkpoint invoke times") { + var clock: ManualClock = null + val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + ssc.checkpoint(checkpointDir) + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val inputDStream = new CheckpointInputDStream(ssc) + val checkpointData = inputDStream.checkpointData + val mappedDStream = inputDStream.map(_ + 100) + val outputStream = new TestOutputStream(mappedDStream, outputBuffer) + outputStream.register() + /// do more two times output + mappedDStream.foreachRDD(rdd => rdd.count()) + mappedDStream.foreachRDD(rdd => rdd.count()) + assert(checkpointData.restoredTimes === 0) + val startTime = System.currentTimeMillis() + ssc.start() + clock.advance(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) + while (outputStream.output.size < 3 && + System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputStream.output.size + ", numExpectedOutput = 3") + ssc.awaitTerminationOrTimeout(50) + } + ssc.stop() + assert(outputStream.output.flatten === Seq(101, 101, 101)) + assert(checkpointData.restoredTimes === 0) + } + logInfo("*********** RESTARTING ************") + withStreamingContext(new StreamingContext(checkpointDir)) { ssc => + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + val checkpointData = ssc.graph.getInputStreams().head.asInstanceOf[CheckpointInputDStream].checkpointData + assert(checkpointData.restoredTimes === 1) + val startTime = System.currentTimeMillis() + ssc.start() + clock.advance(batchDuration.milliseconds) + clock.advance(batchDuration.milliseconds) + while (outputStream.output.size < 3 && + System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputStream.output.size + ", numExpectedOutput = 3") + ssc.awaitTerminationOrTimeout(50) + } + ssc.stop() + assert(outputStream.output.flatten === Seq(101, 101, 101)) + assert(checkpointData.restoredTimes === 1) + } + } + // This tests whether spark can deserialize array object // refer to SPARK-5569 test("recovery from checkpoint contains array object") { From ac8795197f6fa552b4215c63c93b6112652972d9 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Mon, 14 Dec 2015 17:03:51 +0800 Subject: [PATCH 2/5] move the restoredFromCheckpointData to private to DStream --- .../main/scala/org/apache/spark/streaming/dstream/DStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d1f0ae29542a..91a43e14a8b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -98,7 +98,7 @@ abstract class DStream[T: ClassTag] ( private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) @transient - private[streaming] var restoredFromCheckpointData = false + private var restoredFromCheckpointData = false // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null From dc38d0f025b48d1e5ab3658c0d1fd573224374f7 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Tue, 15 Dec 2015 12:19:04 +0800 Subject: [PATCH 3/5] fix style and only test checkpoint invoke times --- .../spark/streaming/CheckpointSuite.scala | 37 +++++-------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 8b1136c6c7ac..4561f454b83b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -42,7 +42,8 @@ import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils} /** * A input stream that records the times of restore() invoked */ -private[streaming] class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) { +private[streaming] +class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) { protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData override def start(): Unit = { } override def stop(): Unit = { } @@ -130,7 +131,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => new StreamingContext(SparkContext.getOrCreate(conf), batchDuration) } - private def generateOutput[V: ClassTag]( + protected def generateOutput[V: ClassTag]( ssc: StreamingContext, targetBatchTime: Time, checkpointDir: String, @@ -735,7 +736,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { } } - test("DStream checkpoint invoke times") { + test("DStreamCheckpointData.restore invoking times") { var clock: ManualClock = null val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => @@ -746,41 +747,21 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { val mappedDStream = inputDStream.map(_ + 100) val outputStream = new TestOutputStream(mappedDStream, outputBuffer) outputStream.register() - /// do more two times output + // do two more times output mappedDStream.foreachRDD(rdd => rdd.count()) mappedDStream.foreachRDD(rdd => rdd.count()) assert(checkpointData.restoredTimes === 0) - val startTime = System.currentTimeMillis() - ssc.start() - clock.advance(batchDuration.milliseconds) - clock.advance(batchDuration.milliseconds) - clock.advance(batchDuration.milliseconds) - while (outputStream.output.size < 3 && - System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputStream.output.size + ", numExpectedOutput = 3") - ssc.awaitTerminationOrTimeout(50) - } - ssc.stop() - assert(outputStream.output.flatten === Seq(101, 101, 101)) + val batchDurationMillis = ssc.progressListener.batchDuration + generateOutput(ssc, Time(batchDurationMillis * 3), checkpointDir, stopSparkContext = true) assert(checkpointData.restoredTimes === 0) } logInfo("*********** RESTARTING ************") withStreamingContext(new StreamingContext(checkpointDir)) { ssc => - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] - val checkpointData = ssc.graph.getInputStreams().head.asInstanceOf[CheckpointInputDStream].checkpointData + val checkpointData = + ssc.graph.getInputStreams().head.asInstanceOf[CheckpointInputDStream].checkpointData assert(checkpointData.restoredTimes === 1) - val startTime = System.currentTimeMillis() ssc.start() - clock.advance(batchDuration.milliseconds) - clock.advance(batchDuration.milliseconds) - while (outputStream.output.size < 3 && - System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputStream.output.size + ", numExpectedOutput = 3") - ssc.awaitTerminationOrTimeout(50) - } ssc.stop() - assert(outputStream.output.flatten === Seq(101, 101, 101)) assert(checkpointData.restoredTimes === 1) } } From 625e20aaccab8bb0102e5d465a326a289efef8bb Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Wed, 16 Dec 2015 11:47:31 +0800 Subject: [PATCH 4/5] Using TestOutputStreamWithPartitions instead of TestOutputStream --- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 4561f454b83b..076eb08ffe2f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -738,14 +738,13 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { test("DStreamCheckpointData.restore invoking times") { var clock: ManualClock = null - val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + val outputBuffer = new ArrayBuffer[Seq[Seq[Int]]] with SynchronizedBuffer[Seq[Seq[Int]]] withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => ssc.checkpoint(checkpointDir) - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val inputDStream = new CheckpointInputDStream(ssc) val checkpointData = inputDStream.checkpointData val mappedDStream = inputDStream.map(_ + 100) - val outputStream = new TestOutputStream(mappedDStream, outputBuffer) + val outputStream = new TestOutputStreamWithPartitions(mappedDStream, outputBuffer) outputStream.register() // do two more times output mappedDStream.foreachRDD(rdd => rdd.count()) From 81310332a12a269e79cf5828433a807f753dfb41 Mon Sep 17 00:00:00 2001 From: jhu-chang Date: Thu, 17 Dec 2015 22:35:32 +0800 Subject: [PATCH 5/5] remove un-used variable in test case --- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 076eb08ffe2f..f5f446f14a0d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -737,14 +737,12 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester { } test("DStreamCheckpointData.restore invoking times") { - var clock: ManualClock = null - val outputBuffer = new ArrayBuffer[Seq[Seq[Int]]] with SynchronizedBuffer[Seq[Seq[Int]]] withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => ssc.checkpoint(checkpointDir) val inputDStream = new CheckpointInputDStream(ssc) val checkpointData = inputDStream.checkpointData val mappedDStream = inputDStream.map(_ + 100) - val outputStream = new TestOutputStreamWithPartitions(mappedDStream, outputBuffer) + val outputStream = new TestOutputStreamWithPartitions(mappedDStream) outputStream.register() // do two more times output mappedDStream.foreachRDD(rdd => rdd.count())