From 8772f70bb3e11321a163c241d83da9ad1d49a428 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 25 Sep 2019 10:59:08 -0700 Subject: [PATCH] [SPARK-23197][STREAMING][TESTS] Fix ReceiverSuite."receiver_life_cycle" to not rely on timing This patch changes ReceiverSuite."receiver_life_cycle" to record actual calls with timestamp in FakeReceiver/FakeReceiverSupervisor, which doesn't rely on timing of stopping and starting receiver in restarting receiver. It enables us to give enough huge timeout on verification of restart as we can verify both stopping and starting together. The test is flaky without this patch. We increased timeout to fix flakyness of this test (https://github.com/apache/spark/commit/15adcc8273e73352e5e1c3fc9915c0b004ec4836) but even with longer timeout it has been still failing intermittently. No I've reproduced test failure artificially via below diff: ``` diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index faf6db82d5..d8977543c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala -191,9 +191,11 private[streaming] abstract class ReceiverSupervisor( // thread pool. logWarning("Restarting receiver with delay " + delay + " ms: " + message, error.getOrElse(null)) + Thread.sleep(1000) stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) logDebug("Sleeping for " + delay) Thread.sleep(delay) + Thread.sleep(1000) logInfo("Starting receiver again") startReceiver() logInfo("Receiver started again") ``` and confirmed this patch doesn't fail with the change. Closes #25862 from HeartSaVioR/SPARK-23197-v2. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../spark/streaming/ReceiverSuite.scala | 87 +++++++++++++------ 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 33f93daac26e1..ff214041ebb19 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -72,7 +72,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { executorStarted.acquire() // Verify that receiver was started - assert(receiver.onStartCalled) + assert(receiver.callsRecorder.calls === Seq("onStart")) assert(executor.isReceiverStarted) assert(receiver.isStarted) assert(!receiver.isStopped()) @@ -105,19 +105,22 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { assert(executor.errors.head.eq(exception)) // Verify restarting actually stops and starts the receiver - receiver.restart("restarting", null, 600) - eventually(timeout(300 millis), interval(10 millis)) { - // receiver will be stopped async - assert(receiver.isStopped) - assert(receiver.onStopCalled) - } - eventually(timeout(1000 millis), interval(10 millis)) { - // receiver will be started async - assert(receiver.onStartCalled) - assert(executor.isReceiverStarted) + executor.callsRecorder.reset() + receiver.callsRecorder.reset() + receiver.restart("restarting", null, 100) + eventually(timeout(10.seconds), interval(10.milliseconds)) { + // below verification ensures for now receiver is already restarted assert(receiver.isStarted) assert(!receiver.isStopped) assert(receiver.receiving) + + // both receiver supervisor and receiver should be stopped first, and started + assert(executor.callsRecorder.calls === Seq("onReceiverStop", "onReceiverStart")) + assert(receiver.callsRecorder.calls === Seq("onStop", "onStart")) + + // check whether the delay between stop and start is respected + assert(executor.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100) + assert(receiver.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100) } // Verify that stopping actually stops the thread @@ -289,6 +292,9 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]] val errors = new ArrayBuffer[Throwable] + // tracks calls of "onReceiverStart", "onReceiverStop" + val callsRecorder = new MethodsCallRecorder() + /** Check if all data structures are clean */ def isAllEmpty: Boolean = { singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty && @@ -324,7 +330,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { errors += throwable } - override protected def onReceiverStart(): Boolean = true + override protected def onReceiverStart(): Boolean = { + callsRecorder.record() + true + } + + override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = { + callsRecorder.record() + super.onReceiverStop(message, error) + } override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { @@ -362,36 +376,55 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { @volatile var otherThread: Thread = null @volatile var receiving = false - @volatile var onStartCalled = false - @volatile var onStopCalled = false + + // tracks calls of "onStart", "onStop" + @transient lazy val callsRecorder = new MethodsCallRecorder() def onStart() { otherThread = new Thread() { override def run() { receiving = true - var count = 0 - while(!isStopped()) { - if (sendData) { - store(count) - count += 1 + try { + var count = 0 + while(!isStopped()) { + if (sendData) { + store(count) + count += 1 + } + Thread.sleep(10) } - Thread.sleep(10) + } finally { + receiving = false } } } - onStartCalled = true + callsRecorder.record() otherThread.start() } def onStop() { - onStopCalled = true + callsRecorder.record() otherThread.join() } +} + +class MethodsCallRecorder { + // tracks calling methods as (timestamp, methodName) + private val records = new ArrayBuffer[(Long, String)] + + def record(): Unit = records.append((System.currentTimeMillis(), callerMethodName)) + + def reset(): Unit = records.clear() - def reset() { - receiving = false - onStartCalled = false - onStopCalled = false + def callsWithTimestamp: scala.collection.immutable.Seq[(Long, String)] = records.toList + + def calls: scala.collection.immutable.Seq[String] = records.map(_._2).toList + + def timestamps: scala.collection.immutable.Seq[Long] = records.map(_._1).toList + + private def callerMethodName: String = { + val stackTrace = new Throwable().getStackTrace + // it should return method name of two levels deeper + stackTrace(2).getMethodName } } -