-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix] [client] Messages lost when consumer reconnect #20695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [client] Messages lost when consumer reconnect #20695
Conversation
| // responsible for reconnection. And the variable "duringConnect" will prevent the concurrent execution. | ||
| if (getState() == State.Ready) { | ||
| return CompletableFuture.completedFuture(null); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By duringConnect, did you mean duringSeek in the current method at line 785? I am assuming this because I couldn't find the word duringConnect within the method or the class itself.
If duringConnect is outside, maybe providing JavaDoc with {@link } tag will help readers to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thanks
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A clear patch! LGTM.
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
Outdated
Show resolved
Hide resolved
|
Good catch! |
(cherry picked from commit 09c89cd)
Reopen #20591
Motivation
Background of consumer reconnects
CMD-subscribeto the brokerflow permitsto broker to incrementavailablePermitsBackground of scenarios that could trigger reconnection:
cmd-close_consumer, such asunload topic,reset clusters, and so on.Background of the response of broker received subscribe request
The broker only response
successif it receives a secondsubscriberequest of the same consumerhttps://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1197
I wanted to prevent this by adding a validation below to prevent the messages which in memory is cleared if the method
grabCnxexecute after the subscribe is finished, which result in messages being lost.But I notice that there is a check
consumer.cnx == nullin the methodgrabCnx, because if thestateof the consumer isReady, the variablecnxof the consumer must not be null, so this test can be effective as the checkconsumer.state != Ready.And I notice another issue below:
Issue-1
If the method
grab connectionis executed multi times, it will lose some messages due to a race condition, for example:grab connection 1grab connection 2cnxof the consumer is nullcnxof the consumer is nullduringConnecttotrueconsumer.connectionOpened()consumer.cnxavailablePermitsduringConnecttofalseduringConnecttotrueconsumer.connectionOpened()consumer.cnxavailablePermitsduringConnecttofalseWe should make the check
consumer.cnx == nullexecute after the checkcompare and set duringConnect.Issue-2
After we fixed the
issue-1, the checkconsumer.cnx == nulland the checkduringConnect == falseswitch the order, the new issue occurs:Since we use the variableduringConnectto prevent multigrabCnxrunning at the same time, we should makeset consumer.cnx to nullbeing executed beforeset duringConnect to true` when the subscribe request fails. This can avoid the issue below:reconnect latergrab connectionduringConnecttofalseduringConnecttotruecnxof the consumer is nullset consumer.cnx to nullModifications
set consumer.cnx to nullbeing executed beforeset duringConnect to truewhen the subscribe request failsDocumentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x