-
Notifications
You must be signed in to change notification settings - Fork 3.7k
support broker level dispatch rate limiter #11325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support broker level dispatch rate limiter #11325
Conversation
|
@codelipenghui @315157973 please have a look |
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
|
Thanks for your contribution. The PR template contains info about doc, which helps others know more about the changes. Can you provide doc info in future PR descriptions? Thanks |
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
|
@315157973 @codelipenghui @hangc0276 PTAL when you have time |
|
The current reading process is like this:
There is a time difference between 1 and 3. When the Broker just restarts, if it is read concurrently, the problem in that PR will still exists |
The problem you mentioned may be solved in this PR #8611 |
This PR will only prevent the message from being sent to the consumer for a long time later, but it cannot solve the memory surge during concurrent reading when broker restart. |
|
We can support broker level dispatcher rate limiter, but this PR can not fix #7720 |
As current reading process, it's not easy to completely prevent the sudden increase entry read traffic when mutliply comsumers starting at same time. |
...ar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
Outdated
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
|
|
||
| long start = System.currentTimeMillis(); | ||
| // Asynchronously produce messages | ||
| for (int i = 0; i < numProducedMessagesEachTopic; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to stop the thread pool that refreshes the quota in rateLimiter at first, otherwise this unit test will often fail because we cannot predict the running of the thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you point out the thread which need to stop?
| consumer2.close(); | ||
| producer1.close(); | ||
| producer2.close(); | ||
| log.info("-- Exiting {} test --", methodName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a unit test that test the priority of policies at different levels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For topic dispatch rate limiter, we can configure from namespace level policies and topic level policies, and topic level policies will have a high priority. That's because the enforcement is done at the topic level.
For broker level, topic level and subscription level dispatch rate limiter, the enforcement is done at different level, so they have same priority and take effect together.
I have add a test, when all these dispatch rate limiter are configured, the minimum dispatch rate limiter should take effect in case of one consumer for one topic.
|
@315157973 could you have a look again |
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
|
Some unit tests seem to fail |
…riptionMessageDispatchThrottlingTest.java Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
…riptionMessageDispatchThrottlingTest.java Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
544c69b to
9d51e98
Compare
…riptionMessageDispatchThrottlingTest.java Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
…riptionMessageDispatchThrottlingTest.java Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
5f758c9 to
b7be33e
Compare
…jialing218/pulsar into branch-broker-dispatch-rate-limiter
|
/pulsarbot run-failure-checks |
conf/broker.conf
Outdated
|
|
||
| # Max Rate(in 1 seconds) of Message allowed to dispatch from a broker if broker dispatch rate limiting enabled | ||
| # (Disable message rate limit with value 0) | ||
| brokerDispatchThrottlingMaxMessageRate=0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| brokerDispatchThrottlingMaxMessageRate=0 | |
| dispatchThrottlingRateInMsg=0 |
|
/pulsarbot run-failure-checks |
|
As discussed w/ @wangjialing218, he will add the two parameters and corresponding descriptions to the configuration - standalone page. |
Motivation
Fixes #7720
Current we support broker level publish rate limiter.
This PR will add broker level dispatch rate limiter.
Modifications
Add broker level dispatch rate limiter for msgs and bytes.
Refact calculateToRead() to reduce duplicate code.
Verifying this change
Add testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling()