From 28208cbb64fb1871965da79c74a81cf3ecdabcb3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 15 Jul 2019 16:43:17 +0200 Subject: [PATCH] [SPARK-28404][SS][TESTS] Fix negative timeout value in RateStreamContinuousPartitionReader --- .../streaming/continuous/ContinuousRateStreamSource.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index d55f71c7be830..e1b7a8fc283d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -134,8 +134,10 @@ class RateStreamContinuousPartitionReader( nextReadTime += readTimeIncrement try { - while (System.currentTimeMillis < nextReadTime) { - Thread.sleep(nextReadTime - System.currentTimeMillis) + var toWaitMs = nextReadTime - System.currentTimeMillis + while (toWaitMs > 0) { + Thread.sleep(toWaitMs) + toWaitMs = nextReadTime - System.currentTimeMillis } } catch { case _: InterruptedException =>