Skip to content

KAFKA-12319: Change calculation of window size used to calculate Rate#12045

Closed
divijvaidya wants to merge 9 commits intoapache:trunkfrom
divijvaidya:KAFKA-12319-PR
Closed

KAFKA-12319: Change calculation of window size used to calculate Rate#12045
divijvaidya wants to merge 9 commits intoapache:trunkfrom
divijvaidya:KAFKA-12319-PR

Conversation

@divijvaidya
Copy link
Copy Markdown
Member

@divijvaidya divijvaidya commented Apr 13, 2022

Why does the test fail?

ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() sends 600 connection creation requests at a rate of 40/s with a listenerQuotaConnectionRateLimit set to 30/s. The test asserts that even though the rate of requests is higher than threshold, due to correct throttling, the measured rate at the completion of 600 requests is 30 +- epsilon. The value of epsilon is set to 7 which is exceeded from time to time leading to flaky test failures.

The problem

Currently, calculate of rate function (used for rate limiting) holds the following assumptions:

  1. Start time of a new sample window is time at which first event in that window is recorded.
  2. If we don't have quota.window.num windows retained, assume that we have prior windows with zero records while calculating the rate.

These assumptions lead to wrong calculation of rate in certain scenario as described below.

Consider a scenario when we have some initial requests, followed by a small gap without any requests and then another bunch of requests. More specifically:

Configuration = quota.window.size.seconds= 1s quota.window.num = 2 listenerName.max.connection.creation.rate = 30/s`

Record events (E) at timestamps:
E1 | CurrentTimeStamp (T1) | Window#1 (start time = T1)
E2 | T2 = T1 + 30ms | Window#1
E3 | T3 = T1 + 995ms | Window#1
< No events from T3 to T4 >
E4 | T4 = T1 + 1020ms | Window#2 (start time = T1 + 1020ms)
E5 | T5 = T1 + 2010ms | Window#2

Rate calculated as per current implementation:
Rate at T1 = 1 / (length of hypothetical prior samples + time elapsed for current sample) = 1 / (1 + 0) = 1 events per second
Rate at T2 = 2 / (1 + 0.030) = 1.94 events per second
Rate at T3 = 3/ (1 + 0.995) = 1.5 events per second
Rate at T4 = 4/ (now - start time of oldest window) = 4 / 1.02 = 3.92 events per second

When calculating rate for T5, first "obsolete windows" are purged, i.e. any window with start time < T5 - (quota.window.size.seconds * quota.window.num), thus, Window#1 is purged (because T1 < T5-2s)

Rate at T5 = 2/ (length of hypothetical prior samples + time elapsed for current sample) = 2 / 1.99 = 1.005 events per second

Note how the rate calculation for T5 has fallen back to using the assumption that there exists prior windows with zero events (due to purge) whereas we do actually have a previous window with > 0 events in it. Hence, rate calculated at T5 is incorrect. In worst case scenarios Window#1 could have large number of events in it but calculation of rate towards end of Window#2 would ignore all those earlier events leading to an incorrect low value of current rate. For throttling use cases, this would lead to allowing more events (since current observed rate is low) and thus, violating the contract to maintain a sustained max.connection.creation.rate

The flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit suffers from the problem described here from time to time leading to higher rate of connection creation than expected.

The solution

The solution is to remove assumption 1 stated earlier. Instead replace assumption 1 with:

Start time of a new sample window is the nearest time at which the window should have started assuming no gaps.

The nearest time is calculated as

currentWindowStartTimeMs = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs())

where 
recordingTimeMs is time of first record in a window
previousWindowEndtime is end time for previous window calculated as previousWindowStartTime + quota.window.size.seconds
config.timeWindowMs is quota.window.size.seconds

With the solution, T5 moves to 3rd window (window rollover occurs at T1 + 2000ms) and the rate at T5 becomes:
Rate at T5 = 2/ (now - start time of oldest window) = 2 / 1.010 = 1.98 events per second

This scenario has also been added as a unit test in MetricsTest.java

Code changes

  1. Changes in SampledStat#record() to make the change in assumption 1 as described above. The change is made when rollover into a window occurs.
  2. Add new tests in MetricsTest.java
  3. Cosmetic syntax changes across files.

Testing

  • New test added to validate the change in assumption.
  • ./gradlew unitTest passes.
  • ./gradlew integrationTest passes.

Longer term solutions

  1. Longer term I think we should move to a sliding window based approach to calculate rate instead of the fixed window approach applied today.
  2. The current rate limiting approach also allows short burst of traffic. There should be a configurable option for the users to choose between the approach which allows short bursts vs. an approach where system tried to maintain a smooth rate over time such that at no time does it go beyond the allocated threshold.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@divijvaidya
Copy link
Copy Markdown
Member Author

Requesting review from @mjsax since you commented on the associated JIRA: https://issues.apache.org/jira/browse/KAFKA-12319

Requesting review from @ijuma @jjkoshy since you folks reviewed the last set of changes in Rate.java file.

Please take a look when you get a chance 🙏

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Apr 19, 2022

cc @apovzner @dajac

@divijvaidya
Copy link
Copy Markdown
Member Author

Hey @apovzner @dajac, did you get a chance to take a look at this? Please let me know if I can make explanation simpler or if you have any questions.

Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
Copy link
Copy Markdown
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!
Unfortunately we can't change public APIs without a KIP. I wonder if we could fix the flaky test while keeping the current sampling logic by adjusting the assertions slightly.

You make a few good points in the description for tweaking how quotas are computed. It's maybe worth starting a discussion on the dev mailing list and if we get consensus then follow up with a KIP. WDYT?

Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
@divijvaidya
Copy link
Copy Markdown
Member Author

@mimaison Thinking about it, I can actually reduce the code changes such that no modifications to any public interface is made. Do you still think a KIP is required for this change in that case? (I am new to Kafka so I am not fully sure what qualifies for a KIP vs. what doesn't)

@divijvaidya
Copy link
Copy Markdown
Member Author

CC'ing a couple of folks who may be interested to review this.
@mimaison @showuon @dengziming @apovzner @wyuka @satishd

Copy link
Copy Markdown
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @divijvaidya, I left a few comments.

I think this would be an improvement, but it would be great to have more eyes on this (cc @guozhangwang @dajac).

Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java Outdated
oldest = curr;
}
return oldest;
return samples.stream().min(Comparator.comparingLong(s -> s.lastWindowMs)).orElse(samples.get(0));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this is worth doing. I know it's shorter, but I think it will be slower, at least until the JIT optimizes it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the new code more readable since we can immediately eye ball that a min is being calculated vs. in the previous version where we have to understand the assignments and logic in for loop to determine what is going on.

Nevertheless, I don't have strong opinion on this one. If you still think we need to revert it back, I will do it. Let me know.

Comment thread clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java Outdated
if (sample.isComplete(recordingTimeMs, config)) {
final long previousWindowStartTime = sample.lastWindowMs;
final long previousWindowEndtime = previousWindowStartTime + config.timeWindowMs();
final long startTimeOfNewWindow = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordingTimeMs seems to usually come from Time.milliseconds, thus from System.currentTimeMillis:

  • It's therefore not guaranteed to be monotonic.
  • Which could be exacerbated by the synchronized blocks in Sensor.recordInternal, because synchronized provides no guarantee about fairness for blocked threads.

sample.isComplete(recordingTimeMs, config) could return true based on the number of samples, not the time.

So I think it's possible that recordingTimeMs < previousWindowEndtime, so that startTimeOfNewWindow ends up ahead of recordingTimeMs. Which I don't think is intended. Or if it is it's definitely something that's worthy of a comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great observation Tom! Ideally the code should be written to ensure that recording a metric should not block because the operation is wall clock time sensitive. But as you observed, we have synchronized at multiple places which may lead to sample being recorded in a window which has already completed in the past.

For cases when the sensor is used for calculating the ConnectionQuota, this problem wouldn't occur because the calculation of Time.milliseconds is done inside a synchronised block which ensures that ensures that only one thread with latest timestamp will be accessing the sensor.record at a time.

But I don't know about other code paths other than ConnectionQuota that use sensor and your observation is valid.

Since this problem is independent of this code change, and breaks existing logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA to address this in a separate PR: https://issues.apache.org/jira/browse/KAFKA-13994

[1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542

@divijvaidya
Copy link
Copy Markdown
Member Author

@machi1990 since you opened a PR to fix this flaky test, you might be familiar with this part of code. May I request you to review this PR please.

@machi1990
Copy link
Copy Markdown
Contributor

@machi1990 since you opened a PR to fix this flaky test, you might be familiar with this part of code. May I request you to review this PR please.

Hey @divijvaidya I am new to Kafka and to this part of the code. It'll be good to get another round of reviews from committers since some of them have started to have a look at this PR. My attempt to fix the the flaky test in #13702 was by slightly modifying the assertions which was more of having a quick win and stabilize the test. While this PR attempts to sort out the underlying issue with quota computation. I think it'll be good to get more eyes on the PR as suggested by #12045 (review) and #12045 (review) what do you think?

@machi1990
Copy link
Copy Markdown
Contributor

machi1990 commented Jun 20, 2023

One of my PR[1] was bitten again by the test failure that this change is attempting to fix.
Would people be open to getting in a tactical fix[2] to stabilize CI while this PR is being reviewed? @mimaison @tombentley @divijvaidya any thoughts?

  1. https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-13700/runs/5
  2. KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8 from 7 #13702

@github-actions
Copy link
Copy Markdown

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Dec 26, 2024
@github-actions
Copy link
Copy Markdown

This PR has been closed since it has not had any activity in 120 days. If you feel like this
was a mistake, or you would like to continue working on it, please feel free to re-open the
PR and ask for a review.

@github-actions github-actions Bot added the closed-stale PRs that were closed due to inactivity label Jan 26, 2025
@github-actions github-actions Bot closed this Jan 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

closed-stale PRs that were closed due to inactivity stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants