-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix] [broker] The broker has two identical Persitenttopics #16247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@gaoran10 @congbobo184 @Technoboy- Please take a look. ^_^ |
|
/pulsarbot run-failure-checks |
2 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
codelipenghui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should consider to avoid a topic can be closed more than once.
| final CompletableFuture<Optional<Topic>> createTopicFuture = getTopic(topicNameString, false); | ||
| return createTopicFuture.thenCompose(optionalTopic -> { | ||
| if (optionalTopic.isPresent() && optionalTopic.get() == topic){ | ||
| return removeTopicFromCache(topicNameString, createTopicFuture); | ||
| } | ||
| // If topic is not in Cache, do nothing. | ||
| return CompletableFuture.completedFuture(null); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use map.compute() to simplify the logic? And looks like we don't need to wait for the future complete, because we already have the topic reference here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use map.compute() to simplify the logic? And looks like we don't need to wait for the future complete
Yes, I've rewritten the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should consider to avoid a topic can be closed more than once.
I have appended the reason why topic.close was executed twice to the Motivation, and in this PR I've overwritten the topic.close to fix it. I also added a lock to the 'reset topic stat to UN-fenced' operation, could you review the code.
I have appended the reason why
Unfortunately, we can't use
I also rewritten the Motivation of this PR to make it easier to understand. Thanks. |
|
@eolivelli @lhotari @michaeljmarshall @gaozhangmin @Jason918 @nicoloboschi Please take a look, if you have time. Thanks. |
3a3e08e to
0707daa
Compare
0707daa to
482fdfe
Compare
482fdfe to
7a9a1a0
Compare
7a9a1a0 to
8ce8d11
Compare
|
/pulsarbot run-failure-checks |
8ce8d11 to
5337613
Compare
|
/pulsarbot rerun-failure-checks |
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| // We don't need to wait for the future complete, because we already have the topic reference here. | ||
| if (!createTopicFuture.isDone()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should wait for this future to complete, otherwise it may create another Topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can save us from using getNow and we can chain the CompletableFuture with "thenCompose"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should wait for this future to complete, otherwise it may create another Topic
One comment suggestion here: "don't need to wait for the future complete"
#16247 (comment)
we can save us from using getNow and we can chain the CompletableFuture with "thenCompose"
Already use "thenCompose" instead "getNow". Thanks
| return removeTopicFromCache(topic, (CompletableFuture) null); | ||
| } | ||
|
|
||
| public CompletableFuture<Void> removeTopicFromCache(String topic, CompletableFuture createTopicFuture) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add generic type to CompletableFuture
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already add generic type to CompletableFuture. Thanks
| return this.fullyCloseFuture; | ||
| } else { | ||
| // Why not return this.fullyCloseFuture ? | ||
| // I don't know, just keep the same implementation as before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this kind of comments "I don't know"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already remove.Thanks
| // Close limiters. | ||
| try { | ||
| closeLimiters(); | ||
| } catch (Throwable t){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we catching "Throwable" ? this is usually a bad practice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already instead "Throwable" to "Exception". Thanks
| closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger()); | ||
| } | ||
| // Complete resultFuture. If managed ledger close failure, reset topic to resume. | ||
| closePhase2Future.thenApply(__ -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use "whenComplete" instead of theApply/exceptionally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already use "whenComplete" instead of "theApply". Thanks
| }).exceptionally(exception -> { | ||
| log.error("[{}] Error closing topic", topic, exception); | ||
| // Restart rate-limiter after close managed ledger failure. Success is not guaranteed. | ||
| // TODO Guarantee rate-limiter open finish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please to not leave "TODOs", open a new GH ticket and link it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already remove "TODO". Thanks
| // TODO Guarantee rate-limiter open finish. | ||
| try { | ||
| restartLimitersAfterCloseTopicFail(); | ||
| } catch (Throwable t){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not catch Throwable blindly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already instead "Throwable" to "Exception". Thanks
|
/pulsarbot rerun-failure-checks |
60180d2 to
bb68f61
Compare
|
/pulsarbot rerun-failure-checks |
|
Hi @eolivelli Could you review this PR again? |
| } | ||
|
|
||
| public CompletableFuture<Void> removeTopicFromCache(String topic){ | ||
| return removeTopicFromCache(topic, (CompletableFuture) null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return removeTopicFromCache(topic, (CompletableFuture) null); | |
| return removeTopicFromCache(topic, (CompletableFuture<Optional<Topic>>) null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already fixed
| // Close limiters. | ||
| try { | ||
| closeLimiters(); | ||
| } catch (Exception t){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should catch the Exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep the logic the same as before: close limiters failure does not affect topic close.
| } | ||
|
|
||
| // Close client components. | ||
| CompletableFuture<Void> closeClientsFuture = asyncCloseClients(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CompletableFuture<Void> closeClientsFuture = asyncCloseClients(); | |
| CompletableFuture<Void> closeClientsFuture = asyncCloseClients(boolean closeWithoutWaitingClientDisconnect); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in line:1294~line:1298 handles the logic closeWithoutWaitingClientDisconnect:
CompletableFuture<Void> closePhase2Future;
if (closeWithoutWaitingClientDisconnect){
closePhase2Future = asyncCloseLedger();
} else {
closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
}| log.error("[{}] Error closing topic", topic, ex); | ||
| // Restart rate-limiter after close managed ledger failure. Success is not guaranteed. | ||
| try { | ||
| restartLimitersAfterCloseTopicFail(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should catch the Exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep the logic the same as before: Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
|
To make this PR easy to Review, it was split into two other PR:
|
Motivation
Problem occur
With the transaction feature, we send and receive messages, and at the same time, execute
admin API: unload namespace1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we triedadmin API: get topic stats, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat isReadyin the client. This means that the state of the consumer is inconsistent between the broker and the client.Location problem
Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka
topic-c), and producer registered on another one (akatopic-p). At this time, when we send messages, the data flow like this :But the consumer exactly registered on another topic:
topic-c, so consumer could not receive any message.Repreduce
Make
transaction buffer recover,admin unload namespace,client create consumer,client create producerexecuted at the same time, the process flow like this (at the step-11, the problem occurs ):transaction buffer recoverradmin unload namespaceclient create consumerclient create producertopic-ctopic-ctopic-cfinishtopic-ptopic-pclient create consumer,client create producer.brokerService.topics.Timeis used only to indicate the order of each step, not the actual time.Even if persistent topic property
isClosingOrDeletinghave already changed totrue, it still can be executed another once, see line-1247::pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1240 to 1249 in f230d15
Whether close can be executed depends on two predicates:
is closingor@param closeWithoutWaitingClientDisconnect is true. This means that methodtopic.closecan be reentrant executed when@param closeWithoutWaitingClientDisconnectis true, and in the implementation ofadmin API: unload namespacethe parametercloseWithoutWaitingClientDisconnectis exactlytrue.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
Lines 723 to 725 in f230d15
So when
transaction buffer recover failandadmin unload namespaceis executed at the same time, andtransaction buffer recover failbeforeadmin unload namespace, the topic will be removed frombrokerService.topicstwice.Because of the current implementation of
BrokerService. removeTopicFromCacheuse cmdmap.remove(key), not usemap.remove(key, value), So this cmd can remove any value in the map, even if it's not the desired one.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
Line 1956 in f230d15
To sum up: We should make these two changes:
topic.closenon-reentrant. Also prevent reentrant betweentopic.closeandtopic.delete.map.remove(key, value)instead ofmap.remove(key)in implementation ofBrokerService. removeTopicFromCache. This change will apply to both scenes:topic.closeandtopic.delete.Other Modifications
In the current implementation, if closing the ledger fails, it determines that the closing topic failed. Then will reset the topic stat to
no-fenced. But it changes two states [isFenced,isClosingOrDeleting] without locking, this could not guarantee consistency between them. I will fix it in this PR too (this change may not be relevant to current subject).pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1307 to 1312 in f230d15
PR Relations
#16240
When repeating creates the same topic at one broker node, the following phenomena occur: Transaction pending ack store reuses the cached managed cursor object when initializing constructor, the process flow like this:
client create consumer 1client create consumer 2topic'subscription'topic''pending_ackledgersubscription''pending_ack_cursorpending_ackledgerpending_ack_cursorIf the Transaction pending ack store reuses the managed cursor in the cache, it will cause the task
replay pending ack logto loop forever. This PR solves the problem of “Repeat creates topic”, and also eliminates the possibility of reuse pending-ack-cursor: #16240.Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required(Your PR needs to update docs, and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)