From d6e107977e791ca70d7b118739e7160f4dec1867 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 09:43:57 -0700 Subject: [PATCH 01/12] Stack overflow error in RateLimiter on rates over 1000/s --- .../org/apache/spark/streaming/ReceiverSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 91261a9db7360..239f83d1c8efc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { test("block generator throttling") { val blockGeneratorListener = new FakeBlockGeneratorListener val blockIntervalMs = 100 - val maxRate = 100 + val maxRate = 1001 val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). set("spark.streaming.receiver.maxRate", maxRate.toString) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { blockGenerator.addData(count) generatedData += count count += 1 - Thread.sleep(1) + Thread.sleep(0) } blockGenerator.stop() @@ -186,16 +186,17 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet, "Received data not same") // recordedData size should be close to the expected rate - val minExpectedMessages = expectedMessages - 3 - val maxExpectedMessages = expectedMessages + 1 + // use an error margin proportional to the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.3 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.1 * expectedMessages val numMessages = recordedData.size assert( numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" ) - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3 - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1 + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.3 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.1 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") assert( // the first and last block may be incomplete, so we slice them out From 24b1bc02ecd458d5b34f32fe51a221cad621b26d Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 10:00:07 -0700 Subject: [PATCH 02/12] Fix truncation in integer division causing infinite recursion --- .../org/apache/spark/streaming/receiver/RateLimiter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index e4f6ba626ebbf..219892dd02648 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -55,13 +55,13 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { } } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. - val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate + val targetTimeInMillis = messagesWrittenSinceSync.toDouble * 1000 / desiredRate val elapsedTimeInMillis = elapsedNanosecs / 1000000 val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis if (sleepTimeInMillis > 0) { logTrace("Natural rate is " + rate + " per second but desired rate is " + desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.") - Thread.sleep(sleepTimeInMillis) + Thread.sleep(sleepTimeInMillis.toInt) } waitToPush() } From 38f3ca8a4fb7845aebd855ccaa73339fba5ca091 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 10:05:30 -0700 Subject: [PATCH 03/12] Ratchet down the error rate to +/- 5%; tests fail --- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 239f83d1c8efc..f2dfd834d456c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -187,16 +187,16 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { // recordedData size should be close to the expected rate // use an error margin proportional to the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.3 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.1 * expectedMessages + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages val numMessages = recordedData.size assert( numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" ) - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.3 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.1 * expectedMessagesPerBlock + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") assert( // the first and last block may be incomplete, so we slice them out From 27947177802c340e6e465f23006b592b6deb43b5 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 10:11:02 -0700 Subject: [PATCH 04/12] Replace the RateLimiter with the Guava implementation --- .../streaming/receiver/RateLimiter.scala | 30 ++----------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 219892dd02648..0cf28c8262523 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.receiver import org.apache.spark.{Logging, SparkConf} -import java.util.concurrent.TimeUnit._ +import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter} /** Provides waitToPush() method to limit the rate at which receivers consume data. * @@ -33,37 +33,13 @@ import java.util.concurrent.TimeUnit._ */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private var lastSyncTime = System.nanoTime - private var messagesWrittenSinceSync = 0L private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) - private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) def waitToPush() { if( desiredRate <= 0 ) { return } - val now = System.nanoTime - val elapsedNanosecs = math.max(now - lastSyncTime, 1) - val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs - if (rate < desiredRate) { - // It's okay to write; just update some variables and return - messagesWrittenSinceSync += 1 - if (now > lastSyncTime + SYNC_INTERVAL) { - // Sync interval has passed; let's resync - lastSyncTime = now - messagesWrittenSinceSync = 1 - } - } else { - // Calculate how much time we should sleep to bring ourselves to the desired rate. - val targetTimeInMillis = messagesWrittenSinceSync.toDouble * 1000 / desiredRate - val elapsedTimeInMillis = elapsedNanosecs / 1000000 - val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis - if (sleepTimeInMillis > 0) { - logTrace("Natural rate is " + rate + " per second but desired rate is " + - desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.") - Thread.sleep(sleepTimeInMillis.toInt) - } - waitToPush() - } + rateLimiter.acquire() } } From 82ee46ddeea52c99ab63ed0198d7b978cc2cc413 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 11:45:15 -0700 Subject: [PATCH 05/12] Replace guard clause with nested conditional --- .../org/apache/spark/streaming/receiver/RateLimiter.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 0cf28c8262523..9be69ba572700 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,9 +37,8 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) def waitToPush() { - if( desiredRate <= 0 ) { - return + if( desiredRate > 0 ) { + rateLimiter.acquire() } - rateLimiter.acquire() } } From 70ee3103b5740caf677aac666b154670994e77e5 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 12:37:12 -0700 Subject: [PATCH 06/12] Use Thread.yield(), since Thread.sleep(0) is system-dependent --- .../test/scala/org/apache/spark/streaming/ReceiverSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index f2dfd834d456c..28a8a9154866a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { blockGenerator.addData(count) generatedData += count count += 1 - Thread.sleep(0) + Thread.`yield`() } blockGenerator.stop() From 8f2934b034dd2576e897497f46fa58db118088d2 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 23:49:11 -0700 Subject: [PATCH 07/12] Remove arbitrary thread timing / cooperation code --- .../test/scala/org/apache/spark/streaming/ReceiverSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 28a8a9154866a..ac36dcb3f4466 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -176,7 +176,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { blockGenerator.addData(count) generatedData += count count += 1 - Thread.`yield`() } blockGenerator.stop() From b33b796995f4f57d1d5f8a852b274065eec2c04c Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 23:50:21 -0700 Subject: [PATCH 08/12] Eliminate dependency on even distribution by BlockGenerator --- .../org/apache/spark/streaming/ReceiverSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index ac36dcb3f4466..1eb5f7fd6e833 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -194,16 +194,20 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" ) + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + assert( - // the first and last block may be incomplete, so we slice them out - recordedBlocks.drop(1).dropRight(1).forall { block => - block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock - }, + averageBlockSize >= minExpectedMessagesPerBlock && averageBlockSize <= maxExpectedMessagesPerBlock, s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock" + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" ) } From 29011bd3e0190c2f6da68c503f659bf31e11b8ab Mon Sep 17 00:00:00 2001 From: David McGuire Date: Fri, 17 Apr 2015 23:52:20 -0700 Subject: [PATCH 09/12] Further ratchet down the error margins --- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 1eb5f7fd6e833..38db789c92abb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -186,8 +186,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { // recordedData size should be close to the expected rate // use an error margin proportional to the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val minExpectedMessages = expectedMessages - 0.01 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.01 * expectedMessages val numMessages = recordedData.size assert( numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, @@ -196,8 +196,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { // XXX Checking every block would require an even distribution of messages across blocks, // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.01 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.01 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") // the first and last block may be incomplete, so we slice them out From 90e98b907e120d6322686f34944aaf9b6f5fd009 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Mon, 20 Apr 2015 09:34:44 -0700 Subject: [PATCH 10/12] Address scalastyle errors --- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 38db789c92abb..fecfd2f339934 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -184,8 +184,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") assert(recordedData.toSet === generatedData.toSet, "Received data not same") - // recordedData size should be close to the expected rate - // use an error margin proportional to the value, so that rate changes don't cause a brittle test + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test val minExpectedMessages = expectedMessages - 0.01 * expectedMessages val maxExpectedMessages = expectedMessages + 0.01 * expectedMessages val numMessages = recordedData.size @@ -205,7 +205,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size assert( - averageBlockSize >= minExpectedMessagesPerBlock && averageBlockSize <= maxExpectedMessagesPerBlock, + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, s"# records in received blocks = [$receivedBlockSizes], not between " + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" ) From 8be6934030ac2a0ce6b05de1670a5e5c0e771c8a Mon Sep 17 00:00:00 2001 From: David McGuire Date: Mon, 20 Apr 2015 09:36:38 -0700 Subject: [PATCH 11/12] Fix spacing per code review --- .../scala/org/apache/spark/streaming/receiver/RateLimiter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 9be69ba572700..97db9ded83367 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,7 +37,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) def waitToPush() { - if( desiredRate > 0 ) { + if (desiredRate > 0) { rateLimiter.acquire() } } From d29d2e060fe48e8a3f1e506bf2bf2cc13d99d751 Mon Sep 17 00:00:00 2001 From: David McGuire Date: Mon, 20 Apr 2015 11:34:33 -0700 Subject: [PATCH 12/12] Back out to +/-5% error margins, for flexibility in timing --- .../scala/org/apache/spark/streaming/ReceiverSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index fecfd2f339934..e7aee6eadbfc7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -186,8 +186,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { // recordedData size should be close to the expected rate; use an error margin proportional to // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.01 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.01 * expectedMessages + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages val numMessages = recordedData.size assert( numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, @@ -196,8 +196,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { // XXX Checking every block would require an even distribution of messages across blocks, // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.01 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.01 * expectedMessagesPerBlock + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") // the first and last block may be incomplete, so we slice them out