From c3b870e70dc1fb3d806c25c41102de1eaff508c3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 13 Jul 2019 12:11:06 -0500 Subject: [PATCH] [SPARK-28247][SS] Fix flaky test "query without test harness" on ContinuousSuite This patch fixes the flaky test "query without test harness" on ContinuousSuite, via adding some more gaps on waiting query to commit the epoch which writes output rows. The observation of this issue is below (injected some debug logs to get them): ``` reader creation time 1562225320210 epoch 1 launched 1562225320593 (+380ms from reader creation time) epoch 13 launched 1562225321702 (+1.5s from reader creation time) partition reader creation time 1562225321715 (+1.5s from reader creation time) next read time for first next call 1562225321210 (+1s from reader creation time) first next called in partition reader 1562225321746 (immediately after creation of partition reader) wait finished in next called in partition reader 1562225321746 (no wait) second next called in partition reader 1562225321747 (immediately after first next()) epoch 0 commit started 1562225321861 writing rows (0, 1) (belong to epoch 13) 1562225321866 (+100ms after first next()) wait start in waitForRateSourceTriggers(2) 1562225322059 next read time for second next call 1562225322210 (+1s from previous "next read time") wait finished in next called in partition reader 1562225322211 (+450ms wait) writing rows (2, 3) (belong to epoch 13) 1562225322211 (immediately after next()) epoch 14 launched 1562225322246 desired wait time in waitForRateSourceTriggers(2) 1562225322510 (+2.3s from reader creation time) epoch 12 committed 1562225323034 ``` These rows were written within desired wait time, but the epoch 13 couldn't be committed within it. Interestingly, epoch 12 was lucky to be committed within a gap between finished waiting in waitForRateSourceTriggers and query.stop() - but even suppose the rows were written in epoch 12, it would be just in luck and epoch should be committed within desired wait time. This patch modifies Rate continuous stream to track the highest committed value, so that test can wait until desired value is reported to the stream as committed. This patch also modifies Rate continuous stream to track the timestamp at stream gets the first committed offset, and let `waitForRateSourceTriggers` use the timestamp. This also relies on waiting for specific period, but safer approach compared to current based on the observation above. Based on the change, this patch saves couple of seconds in test time. 10 sequential test runs succeeded locally. Closes #25048 from HeartSaVioR/SPARK-28247. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Sean Owen --- .../continuous/ContinuousSuite.scala | 63 ++++++++++++++----- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 4980b0cd41f81..bcca821755950 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -36,18 +36,43 @@ class ContinuousSuiteBase extends StreamTest { "continuous-stream-test-sql-context", sparkConf.set("spark.sql.testkey", "true"))) - protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = { - query match { - case s: ContinuousExecution => - assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") - val reader = s.lastExecution.executedPlan.collectFirst { - case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReader) => r - }.get - - val deltaMs = numTriggers * 1000 + 300 - while (System.currentTimeMillis < reader.creationTime + deltaMs) { - Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis) + protected def waitForRateSourceTriggers(query: ContinuousExecution, numTriggers: Int): Unit = { + query.awaitEpoch(0) + + // This is called after waiting first epoch to be committed, so we can just treat + // it as partition readers for rate source are already initialized. + val firstCommittedTime = System.nanoTime() + val deltaNs = (numTriggers * 1000 + 300) * 1000000L + var toWaitNs = firstCommittedTime + deltaNs - System.nanoTime() + while (toWaitNs > 0) { + Thread.sleep(toWaitNs / 1000000) + toWaitNs = firstCommittedTime + deltaNs - System.nanoTime() + } + } + + protected def waitForRateSourceCommittedValue( + query: ContinuousExecution, + desiredValue: Long, + maxWaitTimeMs: Long): Unit = { + def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = { + c.committedOffsets.lastOption.map { case (_, offset) => + offset match { + case o: RateStreamOffset => + o.partitionToValueAndRunTimeMs.map { + case (_, ValueRunTimeMsPair(value, _)) => value + }.max } + } + } + + val maxWait = System.currentTimeMillis() + maxWaitTimeMs + while (System.currentTimeMillis() < maxWait && + readHighestCommittedValue(query).getOrElse(Long.MinValue) < desiredValue) { + Thread.sleep(100) + } + if (System.currentTimeMillis() > maxWait) { + logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" + + s"Current highest committed value is ${readHighestCommittedValue(query)}") } } @@ -216,14 +241,16 @@ class ContinuousSuite extends ContinuousSuiteBase { .queryName("noharness") .trigger(Trigger.Continuous(100)) .start() + + val expected = Set(0, 1, 2, 3) val continuousExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution] - continuousExecution.awaitEpoch(0) - waitForRateSourceTriggers(continuousExecution, 2) + waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 1000) query.stop() val results = spark.read.table("noharness").collect() - assert(Set(0, 1, 2, 3).map(Row(_)).subsetOf(results.toSet)) + assert(expected.map(Row(_)).subsetOf(results.toSet), + s"Result set ${results.toSet} are not a superset of $expected!") } } @@ -241,7 +268,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase { testStream(df, useV2Sink = true)( StartStream(longContinuousTrigger), AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 201)), + Execute { exec => + waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 50) + }, IncrementEpoch(), StopStream, CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_))) @@ -259,7 +288,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase { testStream(df, useV2Sink = true)( StartStream(Trigger.Continuous(2012)), AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 201)), + Execute { exec => + waitForRateSourceTriggers(exec.asInstanceOf[ContinuousExecution], 50) + }, IncrementEpoch(), StopStream, CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_))))