-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] PIP-307: Implement broker and client consumer changes when topic is unloading #21682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
heesung-sohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I would like to confirm the following.
When topic.isTransferring() == true, we would like to implement the following consumer logic to make this topic transfer graceful.
- ignore readEntry error (when the ledger is closed)
- stop msg delivery to consumers (close subscription and dispatcher)
- ignore in-flight and completed readEntry (do not send to the consumers)
- ignore ack msg
Could you discuss if we further handle the above cases?
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
Show resolved
Hide resolved
heesung-sohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I think we covered 2 and 3 logic so far. Can we confirm if we covered 1 and 4 logics in this list?
- ignore readEntry error (when the ledger is closed)
- stop msg delivery to consumers (close subscription and dispatcher)
- ignore in-flight and completed readEntry (do not send to the consumers)
- ignore ack msg
...org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
.../src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Show resolved
Hide resolved
...va/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
heesung-sohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
gaoran10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Left some comments.
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
Show resolved
Hide resolved
Demogorgon314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { | ||
| Consumer consumer = consumerFuture.getNow(null); | ||
| Subscription subscription = consumer.getSubscription(); | ||
| if (subscription.getTopic().isTransferring()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test to cover in transferring acks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not specifically, but the modified test covers ack behavior: https://github.com/apache/pulsar/pull/21682/files#diff-744119c61c9f6a1b786c3966acd1e0f63748985cf68a6a1a15418c8d9900a9a8R543-R547. The test does not succeed unless all messages sent during the transfer surface out on the consumers. Since there are 200 messages involved, at least one of them likely has the acknowledgement initially ignored.
…ersistent/PersistentTopic.java Co-authored-by: Kai Wang <kwang@streamnative.io>
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
…ersistent/PersistentDispatcherMultipleConsumers.java Co-authored-by: Penghui Li <penghui@apache.org>
|
It looks like this PR broke the test SubscriptionMessageDispatchThrottlingTest.testClosingRateLimiter (#21756). The fix is in #21736 . @dragosvictor @heesung-sn @gaoran10 Please review. |
PIP: PIP-307
Motivation
Topic unloading can be sped up, as described by PIP-307, by forwarding the target broker lookup data to clients (both producers and consumers). PR #21408 added support for the producers, this PR introduces similar functionality for the consumers.
Modifications
When the load balancer reassigns a bundle from an old 'source' broker to a new 'target' broker, the consumers are currently forcefully disconnected. The expectation is that they will issue topic lookup calls to locate the new broker.
This PR forwards this information to the consumers as part of the disconnect workflow, allowing them to connect directly to the target broker and skip the lookups in the process.
Specifically:
assignedBrokerLookupDatato theCloseConsumercommand. The consumer uses this information to attempt to connect to the broker once. If the connection fails, it falls back to topic lookups.Verifying this change
This change added tests and can be verified as follows:
testTransferClientReconnectionWithoutLookupandtestUnloadClientReconnectionWithLookupto cover both producer and consumer behavior, as well as all subscription types. Ran the suite 100 times to make sure it was stable.pulsar-perfin a k8s environment. Asserted that all messages pass through during a load balancing run, while no lookups are performed by the clients.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: (dragosvictor#1)