Skip to content

[fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval#25188

Merged
lhotari merged 27 commits intoapache:masterfrom
lhotari:lh-topiclistwatcher-should-poll
Feb 6, 2026
Merged

[fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval#25188
lhotari merged 27 commits intoapache:masterfrom
lhotari:lh-topiclistwatcher-should-poll

Conversation

@lhotari
Copy link
Copy Markdown
Member

@lhotari lhotari commented Jan 27, 2026

Fixes #25020
Fixes #25191
Fixes #25192

Motivation

The TopicListService implementation does not handle metadata store session events (SessionEvent), causing the client-side topic list to become out of sync when the metadata store session experiences temporary disruptions. During metadata store disconnections, topic add/delete events are missed, leading to inconsistent state between the broker and pattern topic consumers. Additionally, metadata store change event delivery is not guaranteed, making state reconciliation necessary to maintain consistency.

This PR also addresses several related issues:

Modifications

This PR introduces session event handling and hash-based verification to ensure topic list consistency:

Server-side changes:

  • Added TopicListener interface to handle both topic events and session events
  • Modified TopicResources to register session listeners and dispatch session events to topic listeners
  • Enhanced TopicListService.TopicListWatcher to:
    • Retrieve the topic list after metadata store reconnection
    • Reconstruct missed add/delete events by comparing pre-disconnection and post-reconnection states
    • Send synthetic events to clients to synchronize their topic lists
  • Changed PulsarResources to use MetadataStoreExtended instead of MetadataStore to access session event functionality
  • Modified TopicListService to accept topic list watcher commands for already existing watchers, enabling state reconciliation when the client-side topic listing hash mismatches
  • Postponed topic list watcher creation until after initial topic subscription completes to prevent race conditions
  • Fixed NamespaceService.getListOfUserTopics to consistently exclude system topics from results, ensuring hash consistency between topic listing methods
  • Added new feature flag supportsTopicWatcherReconcile for backwards compatibility
    • the broker now supports calling CommandWatchTopicList for existing watchers so that the topic list watcher state can be reconciled.

Client-side changes:

  • Schedule periodic state refresh using the configured patternAutoDiscoveryPeriod interval to detect and correct any drift
  • Implemented hash-based verification to detect inconsistencies:
    • Updated CommandWatchTopicList to include the client's current topics hash
    • Modified PatternConsumerUpdateQueue to combine add and delete operations into a single task, enabling hash comparison after applying changes to local state
    • Added hash mismatch detection in PatternMultiTopicsConsumerImpl that triggers reconciliation via recheckTopicsChange() when the broker's hash differs from the expected local state
    • Enhanced reconciliation logic to compare local state with broker state and trigger a refresh when differences are detected
  • Use the new feature flag supportsTopicWatcherReconcile for backwards compatibility
    • Issue reconciliation operation via CommandWatchTopicList when the feature flag is set
    • Older brokers don't support issuing CommandWatchTopicList for existing watchers

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.2.0 milestone Jan 27, 2026
@lhotari lhotari self-assigned this Jan 27, 2026
@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Jan 27, 2026
@lhotari lhotari changed the title [fix][broker] Handle metadata store session events in TopicListService to prevent missed topic changes [fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval Jan 27, 2026
@lhotari lhotari requested a review from Technoboy- January 27, 2026 14:19
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an issue where the TopicListService implementation doesn't handle metadata store session events, causing client-side topic lists to become out of sync during metadata store disruptions. The solution introduces session event handling and hash-based verification to ensure consistency between broker and client.

Changes:

  • Added TopicListener interface to handle both topic events and session events, with session reconnection triggering topic list reconciliation
  • Implemented hash-based verification to detect state inconsistencies between client and broker, triggering reconciliation when mismatches are detected
  • Modified client-side pattern consumer to schedule periodic state refresh and postpone topic list watcher creation until after initial subscription completes

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicListener.java New interface defining contract for topic and session event handling
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java Updated to use MetadataStoreExtended and register session listeners, dispatch events to TopicListener implementations
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java Changed constructor parameters to accept MetadataStoreExtended instead of MetadataStore for session event support
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java Enhanced TopicListWatcher to handle session events, reconcile state after reconnection, and support hash-based refresh commands
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java Added reconcile() method and changed to use supplier for local state hash, updated to pass hash in watch commands
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java Postponed watcher creation, added periodic reconciliation scheduling, implemented hash calculation and watcher-based reconciliation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java Combined separate add/remove operations into single TOPICS_CHANGED task with hash verification and mismatch detection
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java Added convenience method for creating WatchTopicList requests with topics hash parameter
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java Added String deduplication for filtered topic results using TopicName cache
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java Added comprehensive tests for session event handling, hash matching, and reconciliation scenarios
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java Added test for periodic reconciliation and improved existing test methods
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java Updated tests for new TopicListener interface
pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java Added test for hash mismatch reconciliation
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java Updated tests for combined topics changed operations
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java Simplified tests after removing topicsHashSetter parameter
pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java Added helper method to wait for topic list watcher startup
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java Updated test resources to use MetadataStoreExtended
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java Updated tests for renamed method onTopicEvent
Comments suppressed due to low confidence (1)

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:132

  • The onTopicEvent method accesses and modifies matchingTopics without synchronization, which can lead to race conditions. The method is not synchronized, but it:
  1. Calls matchingTopics.remove(topicName) on line 121
  2. Calls matchingTopics.add(topicName) on line 124
  3. Passes matchingTopics to TopicList.calculateHash() on line 128

Meanwhile, other synchronized methods like updateTopics() (line 185) and prepareUpdateTopics() (line 178) also access matchingTopics. This creates a data race where unsynchronized access in onTopicEvent can interleave with synchronized access in other methods, potentially causing:

  • Lost updates
  • Inconsistent hash calculations
  • ConcurrentModificationException if the Set implementation isn't thread-safe

The method should be declared synchronized to ensure thread-safe access to the shared matchingTopics field.

        public void onTopicEvent(String topicName, NotificationType notificationType) {
            if (closed) {
                return;
            }
            String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName();
            String domainLessTopicName = TopicList.removeTopicDomainScheme(partitionedTopicName);

            if (topicsPattern.matches(domainLessTopicName)) {
                List<String> newTopics = Collections.emptyList();
                List<String> deletedTopics = Collections.emptyList();
                if (notificationType == NotificationType.Deleted) {
                    if (matchingTopics.remove(topicName)) {
                        deletedTopics = Collections.singletonList(topicName);
                    }
                } else if (notificationType == NotificationType.Created && matchingTopics.add(topicName)) {
                    newTopics = Collections.singletonList(topicName);
                }
                if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
                    String hash = TopicList.calculateHash(matchingTopics);
                    sendTopicListUpdate(hash, deletedTopics, newTopics);
                }
            }
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 24 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pulsar-common/src/main/proto/PulsarApi.proto
@lhotari lhotari requested a review from Denovo1998 January 29, 2026 13:09
@nodece
Copy link
Copy Markdown
Member

nodece commented Feb 5, 2026

The change from MetadataStore to MetadataStoreExtended in PulsarResources and TopicResources is acceptable.

Third-party plugins usually do not directly depend on the concrete MetadataStore type, so this change alone is unlikely to cause real compatibility issues.

The main concern is the change of public methods that changing the TopicResources listener APIs from
BiConsumer<String, NotificationType> to the new TopicListener interface will break existing third-party plugins that register listeners.

@lhotari
Copy link
Copy Markdown
Member Author

lhotari commented Feb 5, 2026

The change from MetadataStore to MetadataStoreExtended in PulsarResources and TopicResources is acceptable.

Third-party plugins usually do not directly depend on the concrete MetadataStore type, so this change alone is unlikely to cause real compatibility issues.

The main concern is the change of public methods that changing the TopicResources listener APIs from BiConsumer<String, NotificationType> to the new TopicListener interface will break existing third-party plugins that register listeners.

@nodece I've added the old API back as deprecated methods. PTAL

@lhotari lhotari merged commit 2e06cc0 into apache:master Feb 6, 2026
97 of 102 checks passed
lhotari added a commit that referenced this pull request Feb 6, 2026
…dule periodic refresh with patternAutoDiscoveryPeriod interval (#25188)

(cherry picked from commit 2e06cc0)
lhotari added a commit that referenced this pull request Feb 6, 2026
…dule periodic refresh with patternAutoDiscoveryPeriod interval (#25188)

(cherry picked from commit 2e06cc0)
hshankar31 pushed a commit to datastax/pulsar that referenced this pull request Feb 11, 2026
…dule periodic refresh with patternAutoDiscoveryPeriod interval (apache#25188)

(cherry picked from commit 2e06cc0)
(cherry picked from commit ba2a230)
hshankar31 pushed a commit to datastax/pulsar that referenced this pull request Feb 16, 2026
…dule periodic refresh with patternAutoDiscoveryPeriod interval (apache#25188)

(cherry picked from commit 2e06cc0)
(cherry picked from commit ba2a230)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 18, 2026
…datastax 4 0 ds 16 feb (#589)

* [improve][broker] Ensure metadata session state visibility and improve Unstable observability for ServiceUnitStateChannelImpl (apache#25132)

(cherry picked from commit 2a29be0)
(cherry picked from commit 85dc758)

* [improve][broker] Upgrade bookkeeper to 4.17.3 (apache#25166)

(cherry picked from commit 45def39)
(cherry picked from commit 333110a)

* fix license and pom file

* [fix][ml] Fix NoSuchElementException in EntryCountEstimator caused by a race condition (apache#25177)

(cherry picked from commit 9b70ba3)
(cherry picked from commit 9261869)

* [fix][test] Bump org.assertj:assertj-core from 3.27.5 to 3.27.7 (apache#25186)

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
(cherry picked from commit ce4ebea)
(cherry picked from commit 2c3402e)

* [improve][misc] Upgrade snappy version to 1.1.10.8 (apache#25182)

(cherry picked from commit b15f53b)
(cherry picked from commit 304fea1)

* [fix][proxy] Close client connection immediately when credentials expire and forwardAuthorizationCredentials is disabled (apache#25179)

(cherry picked from commit 3348470)
(cherry picked from commit c06f8ba)

* [fix][client] ControlledClusterFailover avoid unnecessary reconnection. (apache#25178)

Co-authored-by: fengwenzhi <fengwenzhi.max@bigo.sg>
(cherry picked from commit f0ec07b)
(cherry picked from commit b41488d)

* [fix][sec] Bump org.apache.solr:solr-core from 9.8.0 to 9.10.1 in /pulsar-io/solr (apache#25175)

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
(cherry picked from commit a2f888a)
(cherry picked from commit b532068)

* [improve][pip] PIP-453: Improve the metadata store threading model (apache#25173)

(cherry picked from commit c51346f)
(cherry picked from commit d81d6b3)

* [improve][client]Reduce unnecessary getPartitionedTopicMetadata requests when using retry and DLQ topics. (apache#25172)

(cherry picked from commit 52a4d5e)
(cherry picked from commit 71a3994)

* [fix][misc] Allow JWT tokens in OpenID auth without nbf claim (apache#25197)

(cherry picked from commit d630394)
(cherry picked from commit 2760ee9)

* [fix][sec] Exclude org.lz4:lz4-java and standardize on at.yawk.lz4-java to remediate CVE-2025-12183 and CVE-2025-66566 (apache#25198)

(cherry picked from commit c07f2ad)
(cherry picked from commit 2ac6d03)

* fix checkstyle failure and license issues

* [fix] [test] Upgrade docker-java to 3.7.0 (apache#25209)

(cherry picked from commit 4add84c)
(cherry picked from commit 92b5d55)

* [fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler (apache#25208)

(cherry picked from commit 5aab2f0)
(cherry picked from commit 2206949)

* [improve][monitor] Upgrade OpenTelemetry to 1.56.0, Otel instrumentation to 2.21.0 and Otel semconv to 1.37.0 (apache#24994)

(cherry picked from commit 53162ff)
(cherry picked from commit a1d5b6c)

* [improve][proxy] Add regression tests for package upload with 'Expect: 100-continue' (apache#25211)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
(cherry picked from commit e8fedb1)
(cherry picked from commit 0947639)

* fix license issues

* [fix][test]Fix flaky ExtensibleLoadManagerImplTest_testGetMetrics (apache#25216)

(cherry picked from commit 257d42f)
(cherry picked from commit a8eac91)

* [fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous async mark delete properties in race condition (apache#25165)

(cherry picked from commit bea6f8a)
(cherry picked from commit 4332a44)

* [fix][broker]Fix ledgerHandle failed to read by using new BK API (apache#25199)

(cherry picked from commit 6d51f88)
(cherry picked from commit 1631fed)

* [fix][client] Fix producer synchronous retry handling in failPendingMessages method (apache#25207)

(cherry picked from commit 611efe4)
(cherry picked from commit 30ae8fb)

* [fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval (apache#25188)

(cherry picked from commit 2e06cc0)
(cherry picked from commit ba2a230)

* fix for complilation error

* [feat][io] implement pip-297 for jdbc sinks (apache#25195)

(cherry picked from commit 6f4ac21)
(cherry picked from commit 998a4b1)

* [fix][broker] Fix httpProxyTimeout config (apache#25223)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
(cherry picked from commit 2d6ef6f)
(cherry picked from commit 3b39c7b)

* [improve][broker] Add strictAuthMethod to require explicit authentication method (apache#25185)

Co-authored-by: Ómar K. Yasin <oyasin@apple.com>
(cherry picked from commit bae9173)
(cherry picked from commit 27e34f6)

* [feat][client] oauth2 trustcerts file and timeouts (apache#24944)

(cherry picked from commit b789d82)
(cherry picked from commit f8827bd)

* [improve][client] Make authorization server metadata path configurable in AuthenticationOAuth2 (apache#25052)

Co-authored-by: hoguni <hoguni@lycorp.co.jp>
(cherry picked from commit 3cb7a7b)
(cherry picked from commit 705a99d)

* Revert "[improve][broker] Add strictAuthMethod to require explicit authentication method (apache#25185)"

This reverts commit 531eb91.

* [improve][broker] Add idle timeout support for http (apache#25224)

(cherry picked from commit 63220ea)
(cherry picked from commit 144e064)

* [fix][broker] Fix incomplete futures in topic property update/delete methods (apache#25228)

(cherry picked from commit c2ae180)
(cherry picked from commit ab05ca2)

* [fix][test] Fix Mockito stubbing race in TopicListServiceTest (apache#25227)

(cherry picked from commit c93dd7a)
(cherry picked from commit 38a126b)

* [improve][broker] Give the detail error msg when authenticate failed with AuthenticationException (apache#25221)

(cherry picked from commit 0a0ce6d)
(cherry picked from commit 2a46c70)

* [fix][client] Send all chunkMessageIds to broker for redelivery (apache#25229)

(cherry picked from commit 0a0ce6d)
(cherry picked from commit f49c7b2)

* [fix][broker] Fix transactionMetadataFuture completeExceptionally with null value (apache#25231)

Co-authored-by: 张浩 <zhanghao60@100.me>
(cherry picked from commit 0e5d424)
(cherry picked from commit 42283f4)

* uncomment distribution management in pom

* Reapply "[improve][meta] PIP-453: Improve the metadata store threading model (apache#25187)"

This reverts commit a6aab86.

(cherry picked from commit 4f9b2ca)

* [improve] Upgrade Netty to 4.1.131.Final (apache#25232)

(cherry picked from commit db91b93)
(cherry picked from commit a6c602a)

* [fix][test] fix testBatchMetadataStoreMetrics. (apache#25241)

(cherry picked from commit 9db31cc)
(cherry picked from commit abbd478)

* [fix][test] Fix ResourceQuotaCalculatorImplTest#testNeedToReportLocalUsage (apache#25247)

(cherry picked from commit 48774de)
(cherry picked from commit 9343837)

* [fix][meta] Metadata cache refresh might not take effect (apache#25246)

(cherry picked from commit 24eba10)
(cherry picked from commit 6d81292)

* fix pulsar-proxy unit test case failure

* fix safe delete URLRegexLookupProxyHandler which is not used

* Revert "fix safe delete URLRegexLookupProxyHandler which is not used"

This reverts commit 158fc14.

* Revert "fix pulsar-proxy unit test case failure"

This reverts commit 4efcf70.

* updated hardcoded newLookupProxyHandler in ProxyService for failing URLRegexLookupProxyHandlerTest

* Revert "[improve][monitor] Upgrade OpenTelemetry to 1.56.0, Otel instrumentation to 2.21.0 and Otel semconv to 1.37.0 (apache#24994)"

This reverts commit 5e5328e

* reverted lincense for opentelemetry upgrade changes

* Revert "updated hardcoded newLookupProxyHandler in ProxyService for failing URLRegexLookupProxyHandlerTest"

This reverts commit a4f07dc.

* reverted mismatch commits changes in ProxyConnection.java

* fix code-style issue

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Kai Wang <kwang@apache.org>
Co-authored-by: Yong Zhang <zhangyong1025.zy@gmail.com>
Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
Co-authored-by: Wenzhi Feng <thetumbled@apache.org>
Co-authored-by: fengwenzhi <fengwenzhi.max@bigo.sg>
Co-authored-by: Yunze Xu <xyzinfernity@163.com>
Co-authored-by: zhenJiangWang <zhenjiang427@gmail.com>
Co-authored-by: guptas6est <sanaya.gupta@est.tech>
Co-authored-by: Matteo Merli <mmerli@apache.org>
Co-authored-by: Oneby Wang <44369297+oneby-wang@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: fengyubiao <yubiao.feng@streamnative.io>
Co-authored-by: Malla Sandeep <sandeep.malla78@gmail.com>
Co-authored-by: Bäm <dev@sandchaschte.ch>
Co-authored-by: Omar Yasin <omarkj@icloud.com>
Co-authored-by: Ómar K. Yasin <oyasin@apple.com>
Co-authored-by: gulecroc <gu.lecroc@gmail.com>
Co-authored-by: Hideaki Oguni <22386882+izumo27@users.noreply.github.com>
Co-authored-by: hoguni <hoguni@lycorp.co.jp>
Co-authored-by: Cong Zhao <zhaocong@apache.org>
Co-authored-by: sinan liu <liusinan1998@gmail.com>
Co-authored-by: Jiwei Guo <technoboy@apache.org>
Co-authored-by: cai minjian <905767378@qq.com>
Co-authored-by: Hao Zhang <zhanghao1@cmss.chinamobile.com>
Co-authored-by: 张浩 <zhanghao60@100.me>
Co-authored-by: Lari Hotari <lhotari@apache.org>
Co-authored-by: zzb <48124861+zhaizhibo@users.noreply.github.com>
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 23, 2026
…dule periodic refresh with patternAutoDiscoveryPeriod interval (apache#25188)

(cherry picked from commit 2e06cc0)
(cherry picked from commit ba2a230)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

4 participants