-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[WIP][Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription #6592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@jiazhai @codelipenghui @merlimat can you review this pull request? |
| try { | ||
| ReplicatedSubscriptionsSnapshot snapshot = Markers.instantiateReplicatedSubscriptionsSnapshot(snapshotId, | ||
| controller.localCluster(), p.getLedgerId(), p.getEntryId(), responses); | ||
| controller.topic().getSubscriptions().forEach((subName, sub) -> { | ||
| if (sub != null) { | ||
| sub.processReplicatedSubscriptionSnapshot(snapshot); | ||
| } | ||
| }); | ||
| } catch (Throwable t) { | ||
| log.warn("[{}] Failed to process replicated subscription snapshot {} -- {}", controller.topic().getName(), | ||
| snapshotId, t.getMessage()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t understand a bit here. If we don't store snapshots into the topic, how can we recover snapshots after broker restarts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that snapshots need to be persisted because they are constantly being updated with new ones, is that wrong? What issues do you think would arise if we don't persist ReplicatedSubscriptionsSnapshot messages?
|
move to 2.7.0, feel free to move it back if needed. |
b402e99 to
6815fad
Compare
|
Close this PR once. |
Fixes #6437
Motivation
In a replicated subscription with no consumers connected, the number of marker messages in the backlog will continue to increase. If at least one consumer is connected, the marker messages will be acknowledged and deleted by the dispatcher.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
Lines 80 to 92 in 5fc4a90
However, if no consumers are connected, the dispatcher does not exist or has stopped reading entries. As a result, the marker messages accumulate in the backlog without being acknoledged by anyone.
Modifications
There are four types of marker messages:
Of these, three messages, except
ReplicatedSubscriptionsSnapshot, are not used in the local cluster. They are published to local topics for sending to remote clusters. So, modified theReplicatedSubscriptionsControllerclass to acknowledge these marker messages on all subscriptions immediately after publishing the messages to a local topic.On the other hand,
ReplicatedSubscriptionsSnapshotis used only in the local cluster and does not need to be sent to remote clusters. So stopped publishingReplicatedSubscriptionsSnapshotto topics.In addition, marker messages sent from remote clusters are now acknowledged by the replicator on all subscriptions.
With these changes, marker messages no longer continue to accumulate in the replicated subscription backlog.