Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public double value() {
}
}

public Measurable measurable() {
return this.measurable;
}

double value(long timeMs) {
return this.measurable.measure(config, timeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,34 @@ public void record(MetricConfig config, double value, long timeMs) {
@Override
public double measure(MetricConfig config, long now) {
double value = stat.measure(config, now);
// the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete
long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs;
long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1);
return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs);
return value / convert(windowSize(config, now));
}

public long windowSize(MetricConfig config, long now) {
// purge old samples before we compute the window size
stat.purgeObsoleteSamples(config, now);

/*
* Here we check the total amount of time elapsed since the oldest non-obsolete window.
* This give the total windowSize of the batch which is the time used for Rate computation.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the comment and the fix. Minor typo: "gives". I'm going to check-in with this though.

* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
* window, the measured rate will be very high.
* Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
*
* Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
*/
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;

// If the available windows are less than the minimum required, add the difference to the totalElapsedTime
if (numFullWindows < minFullWindows)
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();

return totalElapsedTimeMs;
}

private double convert(long timeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,34 @@ public void testPercentiles() {
assertEquals(0.0, p75.value(), 1.0);
}

@Test
public void testRateWindowing() throws Exception {
// Use the default time window. Set 3 samples
MetricConfig cfg = new MetricConfig().samples(3);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also minor comment - third sample will be unused.

Sensor s = metrics.sensor("test.sensor", cfg);
s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));

int sum = 0;
int count = cfg.samples() - 1;
// Advance 1 window after every record
for (int i = 0; i < count; i++) {
s.record(100);
sum += 100;
time.sleep(cfg.timeWindowMs());
}

// Sleep for half the window.
time.sleep(cfg.timeWindowMs() / 2);

// prior to any time passing
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;

KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1"));
assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
}

public static class ConstantMeasurable implements Measurable {
public double value = 0.0;

Expand Down
23 changes: 17 additions & 6 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case qve: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
// If the request is not throttled, a throttleTime of 0 ms is recorded
clientSensors.throttleTimeSensor.record(throttleTimeMs)
throttleTimeMs
}

Expand All @@ -139,11 +140,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* we need to add a delay of X to W such that O * W / (W + X) = T.
* Solving for X, we get X = (O - T)/T * W.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the comment in the RB - can we document the discussion on the right vs conservative formula here?

*/
private def throttleTime(metricValue: Double, config: MetricConfig): Int = {
private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
val quota = config.quota()
val difference = metricValue - quota.bound
val time = difference / quota.bound * config.timeWindowMs() * config.samples()
time.round.toInt
val difference = clientMetric.value() - quota.bound
// Use the precise window used by the rate calculation
val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
throttleTimeMs.round.toInt
}

// Casting to Rate because we only use Rate in Quota computation
private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = {
measurable match {
case r: Rate => r
case _ => throw new IllegalArgumentException(s"Metric $name is not a Rate metric, value $measurable")
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/integration/kafka/api/QuotasTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.ProduceKey),
"Tracking throttle-time per client",
"client-id", producerId2)
Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value())
Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value())

// The "client" consumer does not get throttled.
consume(consumers(1), numRecords)
Expand All @@ -167,7 +167,7 @@ class QuotasTest extends KafkaServerTestHarness {
RequestKeys.nameForKey(RequestKeys.FetchKey),
"Tracking throttle-time per client",
"client-id", consumerId2)
Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value())
Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value())
}

def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ class ClientQuotaManagerTest {
Assert.assertEquals(0, queueSizeMetric.value().toInt)

// Create a spike.
// 400*10 + 2000 = 6000/10 = 600 bytes per second.
// (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200
val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback)
Assert.assertEquals("Should be throttled", 2200, sleepTime)
// 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
// (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
// 10.5 seconds because the last window is half complete
time.sleep(500)
val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback)

Assert.assertEquals("Should be throttled", 2100, sleepTime)
Assert.assertEquals(1, queueSizeMetric.value().toInt)
// After a request is delayed, the callback cannot be triggered immediately
clientMetrics.throttledRequestReaper.doWork()
Expand Down