From a7162cb0e04d7864368031f22e689e1489c7199b Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Wed, 26 Apr 2017 15:39:46 +0200 Subject: [PATCH 1/9] no rounding of backpressure rate --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 4 ++-- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 0fa3287f36db8..6713ec5773cfa 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -138,11 +138,11 @@ private[spark] class DirectKafkaInputDStream[K, V]( lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = Math.round(lag / totalLag.toFloat * rate) + val backpressureRate = lag / totalLag.toFloat * rate tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) } + case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toFloat } } if (effectiveRateLimitPerPartition.values.sum > 0) { diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index d52c230eb7849..4fab5d8889ec3 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -104,11 +104,11 @@ class DirectKafkaInputDStream[ val totalLag = lagPerPartition.values.sum lagPerPartition.map { case (tp, lag) => - val backpressureRate = Math.round(lag / totalLag.toFloat * rate) + val backpressureRate = lag / totalLag.toFloat * rate tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toFloat } } if (effectiveRateLimitPerPartition.values.sum > 0) { From 2ec4311ccf6b187a606ee942167e2a040aa09dc7 Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 14:51:13 +0200 Subject: [PATCH 2/9] convert to double instead of float --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 4 ++-- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 6713ec5773cfa..ae58b70766f64 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -138,11 +138,11 @@ private[spark] class DirectKafkaInputDStream[K, V]( lagPerPartition.map { case (tp, lag) => val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) - val backpressureRate = lag / totalLag.toFloat * rate + val backpressureRate = lag / totalLag.toDouble * rate tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toFloat } + case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toDouble } } if (effectiveRateLimitPerPartition.values.sum > 0) { diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 4fab5d8889ec3..1db8a3ba535a5 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -104,11 +104,11 @@ class DirectKafkaInputDStream[ val totalLag = lagPerPartition.values.sum lagPerPartition.map { case (tp, lag) => - val backpressureRate = lag / totalLag.toFloat * rate + val backpressureRate = lag / totalLag.toDouble * rate tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toFloat } + case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toDouble } } if (effectiveRateLimitPerPartition.values.sum > 0) { From 8263efc63671ee96d9f2e745d51429eac3477bcc Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 14:52:50 +0200 Subject: [PATCH 3/9] adding tests --- .../kafka010/DirectKafkaStreamSuite.scala | 88 +++++++++++++++ .../kafka/DirectKafkaStreamSuite.scala | 102 +++++++++++++++++- 2 files changed, 186 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 453b5e5ab20d3..071a3e5bd1eb2 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -617,6 +617,94 @@ class DirectKafkaStreamSuite ssc.stop() } + test("max messages per partition on small rate not none") { + val topic = "backpressure" + val kafkaParams = getKafkaParams() + val batchIntervalMilliseconds = 10000 + val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.kafka.maxRatePerPartition", "100") + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) + val estimatedRate = 1 + val fromOffsets = Map( + new TopicPartition(topic, 0) -> 0L, + new TopicPartition(topic, 1) -> 0L, + new TopicPartition(topic, 2) -> 0L + ) + val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), + new DefaultPerPartitionConfig(sparkConf) + ) { + currentOffsets = fromOffsets + override val rateController = Some(new ConstantRateController(id, null, estimatedRate)) + } + } + + val offsets = Map[TopicPartition, Long]( + new TopicPartition(topic, 0) -> 100L, + new TopicPartition(topic, 1) -> 100L, + new TopicPartition(topic, 2) -> 100L + ) + val result = kafkaStream.maxMessagesPerPartition(offsets) + assert(result.isDefined, s"Messages per partitions should exist for estimated rate of 1") + } + + test("max messages per partition can be zero") { + val topic = "backpressure" + val kafkaParams = getKafkaParams() + val batchIntervalMilliseconds = 60000 + val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.kafka.maxRatePerPartition", "100") + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) + val estimateRate = 1L + val fromOffsets = Map( + new TopicPartition(topic, 0) -> 0L, + new TopicPartition(topic, 1) -> 0L, + new TopicPartition(topic, 2) -> 0L, + new TopicPartition(topic, 3) -> 0L + ) + val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), + new DefaultPerPartitionConfig(sparkConf) + ) { + currentOffsets = fromOffsets + override val rateController = Some(new ConstantRateController(id, null, estimateRate)) + } + } + + val offsets = Map[TopicPartition, Long]( + new TopicPartition(topic, 0) -> 0, + new TopicPartition(topic, 1) -> 100L, + new TopicPartition(topic, 2) -> 200L, + new TopicPartition(topic, 3) -> 300L + ) + val result = kafkaStream.maxMessagesPerPartition(offsets) + val expected = Map( + new TopicPartition(topic, 0) -> 0L, + new TopicPartition(topic, 1) -> 10L, + new TopicPartition(topic, 2) -> 20L, + new TopicPartition(topic, 3) -> 30L + ) + assert(result.contains(expected), s"Number of messages must be zero for one partition") + } + /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = { diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 06ef5bc3f8bd0..2f7bc5f3e8d4b 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -441,10 +441,10 @@ class DirectKafkaStreamSuite // Try different rate limits. // Wait for arrays of data to appear matching the rate. Seq(100, 50, 20).foreach { rate => - collectedData.clear() // Empty this buffer on each pass. - estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + collectedData.clear() // Empty this buffer on each pass. + estimator.updateRate(rate) // Set a new rate. + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -456,6 +456,100 @@ class DirectKafkaStreamSuite ssc.stop() } + test("max messages per partition on small rate not none") { + val topic = "backpressure" + val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" + ) + + val batchIntervalMilliseconds = 10000 + val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.kafka.maxRatePerPartition", "100") + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) + val estimatedRate = 1L + val kafkaStream = withClue("Error creating direct stream") { + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val fromOffsets = Map( + TopicAndPartition(topic, 0) -> 0L, + TopicAndPartition(topic, 1) -> 0L, + TopicAndPartition(topic, 2) -> 0L + ) + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( + ssc, kafkaParams, fromOffsets, messageHandler) { + override protected[streaming] val rateController = + Some(new DirectKafkaRateController(id, null) { + override def getLatestRate() = estimatedRate + }) + } + } + + val offsets = Map( + TopicAndPartition(topic, 0) -> 10L, + TopicAndPartition(topic, 1) -> 10L, + TopicAndPartition(topic, 2) -> 10L + ) + val result = kafkaStream.maxMessagesPerPartition(offsets) + assert(result.isDefined, s"Messages per partition should exist for estimated rate of 1") + } + + test("max messages per partition can be zero") { + val topic = "backpressure" + val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" + ) + + val batchIntervalMilliseconds = 60000 + val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.kafka.maxRatePerPartition", "100") + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) + val estimatedRate = 1L + val kafkaStream = withClue("Error creating direct stream") { + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val fromOffsets = Map( + TopicAndPartition(topic, 0) -> 0L, + TopicAndPartition(topic, 1) -> 0L, + TopicAndPartition(topic, 2) -> 0L, + TopicAndPartition(topic, 3) -> 0L + ) + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( + ssc, kafkaParams, fromOffsets, messageHandler) { + override protected[streaming] val rateController = + Some(new DirectKafkaRateController(id, null) { + override def getLatestRate() = estimatedRate + }) + } + } + + val offsets = Map( + TopicAndPartition(topic, 0) -> 0L, + TopicAndPartition(topic, 1) -> 100L, + TopicAndPartition(topic, 2) -> 200L, + TopicAndPartition(topic, 3) -> 300L + ) + val result = kafkaStream.maxMessagesPerPartition(offsets) + val expected = Map( + TopicAndPartition(topic, 0) -> 0L, + TopicAndPartition(topic, 1) -> 10L, + TopicAndPartition(topic, 2) -> 20L, + TopicAndPartition(topic, 3) -> 30L + ) + assert(result.contains(expected), s"Number of messages must be zero for one partition") + } + /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { From 1d02e3fd5a6d15144bf06e78e6902ee7ec284604 Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 14:57:01 +0200 Subject: [PATCH 4/9] unify test names --- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 4 ++-- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 071a3e5bd1eb2..d9b7957ede9a5 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -617,7 +617,7 @@ class DirectKafkaStreamSuite ssc.stop() } - test("max messages per partition on small rate not none") { + test("maxMessagesPerPartition with rate equal to one") { val topic = "backpressure" val kafkaParams = getKafkaParams() val batchIntervalMilliseconds = 10000 @@ -657,7 +657,7 @@ class DirectKafkaStreamSuite assert(result.isDefined, s"Messages per partitions should exist for estimated rate of 1") } - test("max messages per partition can be zero") { + test("maxMessagesPerPartition with zero offset and rate equal to one") { val topic = "backpressure" val kafkaParams = getKafkaParams() val batchIntervalMilliseconds = 60000 diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 2f7bc5f3e8d4b..43efe7c649d1b 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -456,7 +456,7 @@ class DirectKafkaStreamSuite ssc.stop() } - test("max messages per partition on small rate not none") { + test("maxMessagesPerPartition with rate equal to one") { val topic = "backpressure" val kafkaParams = Map( "metadata.broker.list" -> kafkaTestUtils.brokerAddress, @@ -499,7 +499,7 @@ class DirectKafkaStreamSuite assert(result.isDefined, s"Messages per partition should exist for estimated rate of 1") } - test("max messages per partition can be zero") { + test("maxMessagesPerPartition with zero offset and rate equal to one") { val topic = "backpressure" val kafkaParams = Map( "metadata.broker.list" -> kafkaTestUtils.brokerAddress, From 48d07b6f61c6265052089e0434a7f255453d9c27 Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 15:06:36 +0200 Subject: [PATCH 5/9] correct wrong formatting --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 43efe7c649d1b..aedf451db3a8e 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -441,10 +441,10 @@ class DirectKafkaStreamSuite // Try different rate limits. // Wait for arrays of data to appear matching the rate. Seq(100, 50, 20).foreach { rate => - collectedData.clear() // Empty this buffer on each pass. - estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + collectedData.clear() // Empty this buffer on each pass. + estimator.updateRate(rate) // Set a new rate. + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. From 2da0d0a17743fa93b73426872feecc86d1f54e6e Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 19:05:32 +0200 Subject: [PATCH 6/9] max messages per partition is at least 1 --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 3 ++- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 2 +- .../apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 2 +- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index ae58b70766f64..6dfed345b01d3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -148,7 +148,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { - case (tp, limit) => tp -> (secsPerBatch * limit).toLong + case (tp, limit) => + tp -> Math.max((secsPerBatch * limit).toLong, 1L) }) } else { None diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index d9b7957ede9a5..a1613f7dac21a 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -697,7 +697,7 @@ class DirectKafkaStreamSuite ) val result = kafkaStream.maxMessagesPerPartition(offsets) val expected = Map( - new TopicPartition(topic, 0) -> 0L, + new TopicPartition(topic, 0) -> 1L, new TopicPartition(topic, 1) -> 10L, new TopicPartition(topic, 2) -> 20L, new TopicPartition(topic, 3) -> 30L diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 1db8a3ba535a5..d6dd0744441e4 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -114,7 +114,7 @@ class DirectKafkaInputDStream[ if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { - case (tp, limit) => tp -> (secsPerBatch * limit).toLong + case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L) }) } else { None diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index aedf451db3a8e..d82aea1cbd839 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -542,7 +542,7 @@ class DirectKafkaStreamSuite ) val result = kafkaStream.maxMessagesPerPartition(offsets) val expected = Map( - TopicAndPartition(topic, 0) -> 0L, + TopicAndPartition(topic, 0) -> 1L, TopicAndPartition(topic, 1) -> 10L, TopicAndPartition(topic, 2) -> 20L, TopicAndPartition(topic, 3) -> 30L From e1355275cc62f3de70d4aea0a703ac13773c8233 Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 19:07:00 +0200 Subject: [PATCH 7/9] reword assertion text --- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 2 +- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index a1613f7dac21a..6354afda15a45 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -702,7 +702,7 @@ class DirectKafkaStreamSuite new TopicPartition(topic, 2) -> 20L, new TopicPartition(topic, 3) -> 30L ) - assert(result.contains(expected), s"Number of messages must be zero for one partition") + assert(result.contains(expected), s"Number of messages per partition must be at least 1") } /** Get the generated offset ranges from the DirectKafkaStream */ diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index d82aea1cbd839..9fc3cb8b92bc1 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -547,7 +547,7 @@ class DirectKafkaStreamSuite TopicAndPartition(topic, 2) -> 20L, TopicAndPartition(topic, 3) -> 30L ) - assert(result.contains(expected), s"Number of messages must be zero for one partition") + assert(result.contains(expected), s"Number of messages per partition must be at least 1") } /** Get the generated offset ranges from the DirectKafkaStream */ From 72124ea59796f6faa547f9a04e02fe077bad2f83 Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Thu, 27 Apr 2017 19:11:10 +0200 Subject: [PATCH 8/9] formatting --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 6dfed345b01d3..9cb2448fea0f4 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -148,8 +148,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { - case (tp, limit) => - tp -> Math.max((secsPerBatch * limit).toLong, 1L) + case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L) }) } else { None From 1acbe4c2a98e4d4c407fd557005d815dfd94851a Mon Sep 17 00:00:00 2001 From: Sebastian Arzt Date: Fri, 28 Apr 2017 09:43:18 +0200 Subject: [PATCH 9/9] remove redundant tests --- .../kafka010/DirectKafkaStreamSuite.scala | 40 ----------------- .../kafka/DirectKafkaStreamSuite.scala | 43 ------------------- 2 files changed, 83 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 6354afda15a45..8524743ee2846 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -617,46 +617,6 @@ class DirectKafkaStreamSuite ssc.stop() } - test("maxMessagesPerPartition with rate equal to one") { - val topic = "backpressure" - val kafkaParams = getKafkaParams() - val batchIntervalMilliseconds = 10000 - val sparkConf = new SparkConf() - // Safe, even with streaming, because we're using the direct API. - // Using 1 core is useful to make the test more predictable. - .setMaster("local[1]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.kafka.maxRatePerPartition", "100") - - // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) - val estimatedRate = 1 - val fromOffsets = Map( - new TopicPartition(topic, 0) -> 0L, - new TopicPartition(topic, 1) -> 0L, - new TopicPartition(topic, 2) -> 0L - ) - val kafkaStream = withClue("Error creating direct stream") { - new DirectKafkaInputDStream[String, String]( - ssc, - preferredHosts, - ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), - new DefaultPerPartitionConfig(sparkConf) - ) { - currentOffsets = fromOffsets - override val rateController = Some(new ConstantRateController(id, null, estimatedRate)) - } - } - - val offsets = Map[TopicPartition, Long]( - new TopicPartition(topic, 0) -> 100L, - new TopicPartition(topic, 1) -> 100L, - new TopicPartition(topic, 2) -> 100L - ) - val result = kafkaStream.maxMessagesPerPartition(offsets) - assert(result.isDefined, s"Messages per partitions should exist for estimated rate of 1") - } - test("maxMessagesPerPartition with zero offset and rate equal to one") { val topic = "backpressure" val kafkaParams = getKafkaParams() diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9fc3cb8b92bc1..3fea6cfd910bf 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -456,49 +456,6 @@ class DirectKafkaStreamSuite ssc.stop() } - test("maxMessagesPerPartition with rate equal to one") { - val topic = "backpressure" - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) - - val batchIntervalMilliseconds = 10000 - val sparkConf = new SparkConf() - // Safe, even with streaming, because we're using the direct API. - // Using 1 core is useful to make the test more predictable. - .setMaster("local[1]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.kafka.maxRatePerPartition", "100") - - // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) - val estimatedRate = 1L - val kafkaStream = withClue("Error creating direct stream") { - val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) - val fromOffsets = Map( - TopicAndPartition(topic, 0) -> 0L, - TopicAndPartition(topic, 1) -> 0L, - TopicAndPartition(topic, 2) -> 0L - ) - new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, fromOffsets, messageHandler) { - override protected[streaming] val rateController = - Some(new DirectKafkaRateController(id, null) { - override def getLatestRate() = estimatedRate - }) - } - } - - val offsets = Map( - TopicAndPartition(topic, 0) -> 10L, - TopicAndPartition(topic, 1) -> 10L, - TopicAndPartition(topic, 2) -> 10L - ) - val result = kafkaStream.maxMessagesPerPartition(offsets) - assert(result.isDefined, s"Messages per partition should exist for estimated rate of 1") - } - test("maxMessagesPerPartition with zero offset and rate equal to one") { val topic = "backpressure" val kafkaParams = Map(