-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] PIP-145: Notifications for faster topic discovery #16062
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit introduces topic list watchers. By using these objects clients can observe the creation or deletion of topics closer to real-time. This reduces latency in consuming the first messages published to a topic when using a pattern-based subscription. Modifications: - New commands were added to the binary protocol to enable registering and deregistering watchers. - Pattern-based consumers create TopicListWatcher objects if the broker supports this feature. Otherwise, they fall back to polling only. - The watchers use ConnectionHandler to obtain a connection to a broker. - Once connected, watchers register and wait for updates. - ServerCnx uses the newly created TopicListService to manage watchers. - TopicListService listens to metadata notifications and sends updates.
codelipenghui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks pretty good. Just left some minor comments.
| // These sets are updated from multiple threads, so they require a threadsafe data structure | ||
| private final Set<ProducerBase<?>> producers = Collections.newSetFromMap(new ConcurrentHashMap<>()); | ||
| private final Set<ConsumerBase<?>> consumers = Collections.newSetFromMap(new ConcurrentHashMap<>()); | ||
| private final Set<TopicListWatcher> topicListWatchers = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like no one add watchers to this Set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. This set is redundant to ClientCnx.topicListWatchers, so I'm going to remove this.
| watcherFuture.completeExceptionally( | ||
| PulsarClientException.wrap(e, String.format("Failed to create topic list watcher %s" | ||
| + "when connecting to the broker", getHandlerName()))); | ||
| client.cleanupTopicListWatcher(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also cleanup the watcher from the ClientCnx.topicListWatchers since line:123 added the watcher to the ClientCnx.topicListWatchers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deregisterFromClientCnx(), this exceptionally block's first call does call ClientCnx.removeTopicListWatcher(long watcherId), so I think it should be fine.
| public CompletableFuture<CommandWatchTopicListSuccess> newUnwatchTopicList( | ||
| BaseCommand commandUnwatchTopicList, long requestId) { | ||
| if (!supportsTopicWatchers) { | ||
| return FutureUtil.failedFuture( | ||
| new PulsarClientException.NotAllowedException( | ||
| "Broker does not allow broker side pattern evaluation.")); | ||
| } | ||
| return sendRequestAndHandleTimeout(Commands.serializeWithSize(commandUnwatchTopicList), requestId, | ||
| RequestType.Command, true); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks this method is unused. The TopicListWatcher uses Commands.newUnwatchTopicList directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. I am removing it.
| BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST); | ||
| cmd.setWatchTopicList() | ||
| .setRequestId(requestId) | ||
| .setNamespace(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the namespace optional? Since we might support pattern subscribe without namespace/tenant. A related PR #7855
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe better to have tenant and namespace, just keep them optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that feature would make sense to me. However, the linked pull request has seen no progress for almost two years, so if you don't mind I would like to keep the namespace parameter, for now, to simplify detecting when a metadata store notification is related to topics that match. Otherwise, it is more difficult to distinguish between topics in v1 namespaces and subscriptions in v2 namespaces. Or would you have a simple solution to that?
| // create operation will complete, the new watcher will be discarded. | ||
| log.info("[{}] Closed watcher before its creation was completed. watcherId={}", | ||
| connection.getRemoteAddress(), watcherId); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason that doesn't remove from the watchers map if reach here?
| if (watcherFuture.isCompletedExceptionally()) { | ||
| log.info("[{}] Closed watcher that already failed to be created. watcherId={}", | ||
| connection.getRemoteAddress(), watcherId); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the above comment.
| CompletableFuture<CommandWatchTopicListSuccess> result = | ||
| cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, "tenant/ns", | ||
| ".*", null), 7); | ||
| assertTrue(result.isCompletedExceptionally()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check the watcher is cleaned up if server does not support.
merlimat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Left just few comments
| } | ||
| } | ||
|
|
||
| Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have this as part of NamespaceName where we can store the compiled pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would improve performance indeed. The only problem I'd have with it is that we'd be referring to a relatively low-level detail (the managed ledger path) in a very general and high-level class. To me, it feels like breaking encapsulation/abstraction.
What do you think about caching the patterns in this class instead? Something along the lines of NamespaceName.cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In TopicName we already have getPersistenceName() which is kind of similar. On one hand I agree that it's not directly related to the logical name, though it was a way to ensure all the naming related ops are in a single place with no duplicated approaches.
| WATCH_TOPIC_LIST = 64; | ||
| WATCH_TOPIC_LIST_SUCCESS = 65; | ||
| WATCH_TOPIC_UPDATE = 66; | ||
| UNWATCH_TOPIC_LIST = 67; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we could rename to WATCH_TOPIC_CLOSE to have the same prefix
| } | ||
| isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { | ||
| if (isAuthorized) { | ||
| topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will potentially get called from a different thread (if the authz is not already cached), therefore we cannot pass on the commandWatchTopicList object because it will get reused when the io-thread is processing the next call.
We need to extract the fields that we are interested in into local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I haven't thought of it.
| } | ||
|
|
||
| protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList commandUnwatchTopicList) { | ||
| topicListService.handleUnwatchTopicList(commandUnwatchTopicList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check authorization here or it will be not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to check authorization because executing this command only alters the state of TopicListService, which is in a 1:1 relation with ServerCnx. So it belongs to a single client, which was already checked when it created the watcher. (And if it did not have the authz to create one, it will be unable to delete anything)
Similar to handleCloseConsumer.
merlimat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…che#16062) * PIP-145: Notifications for faster topic discovery This commit introduces topic list watchers. By using these objects clients can observe the creation or deletion of topics closer to real-time. This reduces latency in consuming the first messages published to a topic when using a pattern-based subscription. Modifications: - New commands were added to the binary protocol to enable registering and deregistering watchers. - Pattern-based consumers create TopicListWatcher objects if the broker supports this feature. Otherwise, they fall back to polling only. - The watchers use ConnectionHandler to obtain a connection to a broker. - Once connected, watchers register and wait for updates. - ServerCnx uses the newly created TopicListService to manage watchers. - TopicListService listens to metadata notifications and sends updates. * Fix checkstyle violation * Fix cpp client compile error * Remove unused code, remove failed watchers * Rename command Unwatch, extract fields from command to avoid concurrent modification * Fix cpp client compile error * Fix cpp client compile error Co-authored-by: Matteo Merli <mmerli@apache.org>
…ery (apache#16062)" This reverts commit d9f5640.
…che#16062) * PIP-145: Notifications for faster topic discovery This commit introduces topic list watchers. By using these objects clients can observe the creation or deletion of topics closer to real-time. This reduces latency in consuming the first messages published to a topic when using a pattern-based subscription. Modifications: - New commands were added to the binary protocol to enable registering and deregistering watchers. - Pattern-based consumers create TopicListWatcher objects if the broker supports this feature. Otherwise, they fall back to polling only. - The watchers use ConnectionHandler to obtain a connection to a broker. - Once connected, watchers register and wait for updates. - ServerCnx uses the newly created TopicListService to manage watchers. - TopicListService listens to metadata notifications and sends updates. * Fix checkstyle violation * Fix cpp client compile error * Remove unused code, remove failed watchers * Rename command Unwatch, extract fields from command to avoid concurrent modification * Fix cpp client compile error * Fix cpp client compile error Co-authored-by: Matteo Merli <mmerli@apache.org>
Fixes #14505
Motivation
When using a pattern-based subscription, clients poll for new topics that
match the pattern. It can happen frequently that the first messages produced
to the topic will be consumed with a significant delay because the client
has not got information about the topic for a while.
This commit introduces topic list watchers. By using these objects
clients act as observers of the creation or deletion of topics closer to
real-time. This reduces latency in consuming the first messages
published to a topic when using a pattern-based subscription.
Modifications
and deregistering watchers.
supports this feature. Otherwise, they fall back to polling only.
Verifying this change
This change added tests
Does this pull request potentially affect one of the following parts:
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)