KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas#8290
KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas#8290rajinisivaram merged 3 commits intoapache:trunkfrom
Conversation
|
ok to test |
rajinisivaram
left a comment
There was a problem hiding this comment.
@apovzner Thanks for the PR, looks good. Left one question. CustomQuotaTest failure in the PR builds may be related, so worth investigating that.
| val metric = throttleMetric(QuotaType.Fetch, consumerClientId) | ||
| throttled = metric != null && metricValue(metric) > 0 | ||
| } while (numConsumed < maxRecords && !throttled) | ||
| } while (numConsumed < maxRecords && !throttled && System.currentTimeMillis < startMs + longTimeoutMs) |
There was a problem hiding this comment.
Does this really need 10 minutes? One minute itself seems like a long time for the tests. Should we send larger messages to get the test to complete faster?
There was a problem hiding this comment.
I did such a long timeout because before there was no timeout, wanted to make sure it is enough. I verified that each test runs just 20-30 seconds max. I updated the code to use 1 minute for a timeout.
|
Hi @rajinisivaram, thanks for the review! I fixed CustomQuotaCallbackTest and reduced the timeout in BaseQuotaTest. |
|
retest this please |
|
ok to test |
rajinisivaram
left a comment
There was a problem hiding this comment.
@apovzner Thanks for the PR, LGTM. Merging to trunk.
* apache-github/trunk: (39 commits) MINOR: cleanup and add tests to StateDirectoryTest (apache#8304) HOTFIX: StateDirectoryTest should use Set instead of List (apache#8305) MINOR: Fix build and JavaDoc warnings (apache#8291) MINOR: Fix kafka.server.RequestQuotaTest missing new ApiKeys. (apache#8302) KAFKA-9712: Catch and handle exception thrown by reflections scanner (apache#8289) KAFKA-9670; Reduce allocations in Metadata Response preparation (apache#8236) MINOR: fix Scala 2.13 build error introduced in apache#8083 (apache#8301) MINOR: enforce non-negative invariant for checkpointed offsets (apache#8297) MINOR: comment apikey types in generated switch (apache#8201) MINOR: Fix typo in CreateTopicsResponse.json (apache#8300) KIP-546: Implement describeClientQuotas and alterClientQuotas. (apache#8083) KAFKA-6647: Do note delete the lock file while holding the lock (apache#8267) KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (apache#8290) KAFKA-9533: Fix JavaDocs of KStream.transformValues (apache#8298) MINOR: reuse pseudo-topic in FKJoin (apache#8296) KAFKA-6145: Pt 2. Include offset sums in subscription (apache#8246) KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (apache#8293) KAFKA-9718; Don't log passwords for AlterConfigs in request logs (apache#8294) KAFKA-8768: DeleteRecords request/response automated protocol (apache#7957) KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer ...
|
@rajinisivaram shall we cherry-pick to 2.5 and 2.4 branches? |
|
@ijuma Yes, cherry-picking to 2.5 and 2.4. |
…8290) When we changed quota communication with KIP-219, fetch requests get throttled by returning empty response with the delay in throttle_time_ms and Kafka consumer retries again after the delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition). The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able to fetch any data. This PR ensures that consumers cannot get blocked by quota by capping fetchMaxBytes in KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…8290) When we changed quota communication with KIP-219, fetch requests get throttled by returning empty response with the delay in throttle_time_ms and Kafka consumer retries again after the delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition). The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able to fetch any data. This PR ensures that consumers cannot get blocked by quota by capping fetchMaxBytes in KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…pache#8290) When we changed quota communication with KIP-219, fetch requests get throttled by returning empty response with the delay in throttle_time_ms and Kafka consumer retries again after the delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition). The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able to fetch any data. This PR ensures that consumers cannot get blocked by quota by capping fetchMaxBytes in KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
When we changed quota communication with KIP-219, fetch requests get throttled by returning empty response with the delay in
throttle_time_msand Kafka consumer retries again after the delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition). The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able to fetch any data.This PR ensures that consumers cannot get blocked by quota by capping
fetchMaxBytesin KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB.Committer Checklist (excluded from commit message)