From 9aa992c7f8683ec1a062fc7834633008eb290c51 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 14 Sep 2015 10:43:55 -0700 Subject: [PATCH 1/6] Fixing K-2443 --- .../kafka/common/metrics/KafkaMetric.java | 5 ++++ .../kafka/common/metrics/stats/Rate.java | 8 ++++-- .../kafka/common/metrics/MetricsTest.java | 28 +++++++++++++++++++ .../kafka/server/ClientQuotaManager.scala | 13 +++++---- .../kafka/server/ClientQuotaManagerTest.scala | 11 +++++--- 5 files changed, 54 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 89df1a4ec3e68..66583832f299d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -53,6 +53,11 @@ public double value() { } } + public Measurable measurable() { + return this.measurable; + } + + double value(long timeMs) { return this.measurable.measure(config, timeMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index fe43940b19c46..6cb9152036086 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -59,10 +59,14 @@ public void record(MetricConfig config, double value, long timeMs) { @Override public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); + return value / convert(windowSize(config, now)); + } + + public long windowSize(MetricConfig config, long now) { // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete - long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs; + long elapsedCurrentWindowMs = (now - stat.oldest(now).lastWindowMs) % config.timeWindowMs(); long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); - return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs); + return elapsedCurrentWindowMs + elapsedPriorWindowsMs; } private double convert(long timeMs) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 175a036f344dc..e7142c87cac9c 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -322,6 +322,34 @@ public void testPercentiles() { assertEquals(0.0, p75.value(), 1.0); } + @Test + public void testRateWindowing() throws Exception { + // Use the default time window. Set 3 samples + MetricConfig cfg = new MetricConfig().samples(3); + Sensor s = metrics.sensor("test.sensor", cfg); + s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS)); + + int sum = 0; + int count = cfg.samples() - 1; + // Advance 1 window after every record + for (int i = 0; i < count; i++) { + s.record(100); + sum += 100; + time.sleep(cfg.timeWindowMs()); + } + + // Sleep for half the window. + time.sleep(cfg.timeWindowMs() / 2); + + // prior to any time passing + double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; + + KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1")); + assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS); + assertEquals("Elapsed Time = 75 seconds", elapsedSecs, + ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS); + } + public static class ConstantMeasurable implements Measurable { public double value = 0.0; diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 39dd65a84e65f..0aa8c58f23c41 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -121,7 +121,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case qve: QuotaViolationException => // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) - throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId))) + throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() clientSensors.throttleTimeSensor.record(throttleTimeMs) @@ -139,11 +139,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * we need to add a delay of X to W such that O * W / (W + X) = T. * Solving for X, we get X = (O - T)/T * W. */ - private def throttleTime(metricValue: Double, config: MetricConfig): Int = { + private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = { + // Casting to Rate because we only use Rate in Quota computation + val rateMetric = clientMetric.measurable().asInstanceOf[Rate] val quota = config.quota() - val difference = metricValue - quota.bound - val time = difference / quota.bound * config.timeWindowMs() * config.samples() - time.round.toInt + val difference = clientMetric.value() - quota.bound + // Use the precise window used by the rate calculation + val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds()) + throttleTimeMs.round.toInt } /** diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 997928cfe5d44..75e856a0ba221 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -71,10 +71,13 @@ class ClientQuotaManagerTest { Assert.assertEquals(0, queueSizeMetric.value().toInt) // Create a spike. - // 400*10 + 2000 = 6000/10 = 600 bytes per second. - // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200 - val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback) - Assert.assertEquals("Should be throttled", 2200, sleepTime) + // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100 + // 10.5 seconds because the last window is half complete + time.sleep(500) + val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback) + + Assert.assertEquals("Should be throttled", 2100, sleepTime) Assert.assertEquals(1, queueSizeMetric.value().toInt) // After a request is delayed, the callback cannot be triggered immediately clientMetrics.throttledRequestReaper.doWork() From 74127346ee8c5d77af81a1ec314e69a03a21cb35 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 5 Oct 2015 11:54:21 -0700 Subject: [PATCH 2/6] Addressing Ismael and Edwards comments. --- .../main/java/org/apache/kafka/common/metrics/KafkaMetric.java | 1 - .../main/java/org/apache/kafka/common/metrics/stats/Rate.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 66583832f299d..e4d3ae834fb02 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -57,7 +57,6 @@ public Measurable measurable() { return this.measurable; } - double value(long timeMs) { return this.measurable.measure(config, timeMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 6cb9152036086..0420223ab6c7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -66,7 +66,7 @@ public long windowSize(MetricConfig config, long now) { // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete long elapsedCurrentWindowMs = (now - stat.oldest(now).lastWindowMs) % config.timeWindowMs(); long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); - return elapsedCurrentWindowMs + elapsedPriorWindowsMs; + return elapsedCurrentWindowMs + elapsedPriorWindowsMs; } private double convert(long timeMs) { From d39f5612dd735c34dd05b6db075acdba541e7030 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 6 Oct 2015 10:39:31 -0700 Subject: [PATCH 3/6] Fixing KAFKA-2567 --- .../main/scala/kafka/server/ClientQuotaManager.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 0aa8c58f23c41..b15dbc5002ad6 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -124,10 +124,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() - clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } + // If the request is not throttled, a throttleTime of 0 ms is recorded + clientSensors.throttleTimeSensor.record(throttleTimeMs) throttleTimeMs } @@ -141,7 +142,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = { // Casting to Rate because we only use Rate in Quota computation - val rateMetric = clientMetric.measurable().asInstanceOf[Rate] + val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable()) val quota = config.quota() val difference = clientMetric.value() - quota.bound // Use the precise window used by the rate calculation @@ -149,6 +150,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeMs.round.toInt } + private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = { + if (! measurable.isInstanceOf[Rate]) + throw new IllegalArgumentException("Metric " + name + " is not a Rate metric") + + measurable.asInstanceOf[Rate] + } + /** * Returns the consumer quota for the specified clientId * @return From 2f27026eaed2072eb104e554236b93f7e0d965e8 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 6 Oct 2015 10:55:01 -0700 Subject: [PATCH 4/6] Fixing 2567 --- core/src/test/scala/integration/kafka/api/QuotasTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 3343c53991fe5..38b3dbd102d9f 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -156,7 +156,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.ProduceKey), "Tracking throttle-time per client", "client-id", producerId2) - Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value()) + Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value()) // The "client" consumer does not get throttled. consume(consumers(1), numRecords) @@ -167,7 +167,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.FetchKey), "Tracking throttle-time per client", "client-id", consumerId2) - Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value()) + Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value()) } def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { From 4345f0d15833a2b4f36f167c07e221736af55b82 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 7 Oct 2015 16:28:09 -0700 Subject: [PATCH 5/6] addressing ismaels comments --- .../main/scala/kafka/server/ClientQuotaManager.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index b15dbc5002ad6..361f3221b8527 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -141,7 +141,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Solving for X, we get X = (O - T)/T * W. */ private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = { - // Casting to Rate because we only use Rate in Quota computation val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable()) val quota = config.quota() val difference = clientMetric.value() - quota.bound @@ -150,11 +149,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeMs.round.toInt } + // Casting to Rate because we only use Rate in Quota computation private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = { - if (! measurable.isInstanceOf[Rate]) - throw new IllegalArgumentException("Metric " + name + " is not a Rate metric") - - measurable.asInstanceOf[Rate] + measurable match { + case r: Rate => r + case _ => throw new IllegalArgumentException(s"Metric $name is not a Rate metric, value $measurable") + } } /** From 458ab8f8b6c2fd6eaf652372c8068fa7835fc543 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Fri, 9 Oct 2015 16:32:58 -0700 Subject: [PATCH 6/6] addressing comments --- .../kafka/common/metrics/stats/Rate.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 0420223ab6c7d..9dfc457b0912e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -63,10 +63,30 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { - // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete - long elapsedCurrentWindowMs = (now - stat.oldest(now).lastWindowMs) % config.timeWindowMs(); - long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); - return elapsedCurrentWindowMs + elapsedPriorWindowsMs; + // purge old samples before we compute the window size + stat.purgeObsoleteSamples(config, now); + + /* + * Here we check the total amount of time elapsed since the oldest non-obsolete window. + * This give the total windowSize of the batch which is the time used for Rate computation. + * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second + * window, the measured rate will be very high. + * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * + * Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time, + * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, + * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. + */ + long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; + // Check how many full windows of data we have currently retained + int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs()); + int minFullWindows = config.samples() - 1; + + // If the available windows are less than the minimum required, add the difference to the totalElapsedTime + if (numFullWindows < minFullWindows) + totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs(); + + return totalElapsedTimeMs; } private double convert(long timeMs) {