Skip to content

Conversation

@nodece
Copy link
Member

@nodece nodece commented Mar 28, 2025

Motivation

In a GEO replication scenario, if the remote cluster does not have the replicated topic and the auto-creation type differs between the local and remote clusters, message replication may fail. To ensure seamless replication, the topic metadata must be properly synchronized across clusters.

Modifications

  • When both the local and remote partitioned topic metadata indicate partitions=0, this means the topic is non-partitioned. In this case, the local cluster sends a non-partitioned topic creation request to the remote cluster.

  • If the local partitioned topic metadata has partitions>0, this means the topic is partitioned:

    • If the remote partitioned topic metadata has partitions=0, the local cluster sends a partitioned topic creation request to the remote cluster.
    • If partitions differ between the local and remote cluster, please stop GEO.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 28, 2025
@nodece nodece requested a review from Copilot March 28, 2025 09:01
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves GEO replication resiliency by ensuring that topics are created and updated consistently across clusters by synchronizing topic metadata using PulsarAdmin. Key changes include injecting and utilizing a new replicationAdmin parameter across persistent and nonpersistent replicator implementations, updating test mocks accordingly, and refining the topic creation/update logic in GeoPersistentReplicator.

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated no comments.

Show a summary per file
File Description
PersistentTopicTest.java Added mocks for PulsarAdmin and LookupService to support replication admin operations.
OneWayReplicatorUsingGlobalZKTest.java Disabled tests for cross-cluster topic type scenarios.
OneWayReplicatorTest.java Introduced new tests for topic creation and partition updates with replication admin.
AbstractReplicatorTest.java Updated test helpers to inject a mocked PulsarAdmin into replicators.
ShadowReplicator.java Modified constructor to accept a replicationAdmin parameter.
PersistentTopic.java Updated addReplicationCluster and addShadowReplicationCluster to retrieve and pass replicationAdmin.
PersistentReplicator.java Added replicationAdmin parameter and propagated it to the super constructor.
GeoPersistentReplicator.java Updated topic creation logic to use PulsarAdmin for metadata retrieval and topic creation/update.
NonPersistentTopic.java & NonPersistentReplicator.java Modified replication cluster addition to include replicationAdmin.
AbstractReplicator.java Added replicationAdmin field and updated the constructor accordingly.
Comments suppressed due to low confidence (1)

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java:146

  • The comparison using completeTopicName.getPartitionIndex() might not behave as expected if the topic reference represents the base (non-partitioned) topic rather than a specific partition. Verify that this check correctly distinguishes between partitioned and non-partitioned topics and handle the default partition index accordingly.
if (completeTopicName.getPartitionIndex() >= metadata.partitions) {

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work @nodece

@nodece
Copy link
Member Author

nodece commented Mar 28, 2025

/pulsarbot rerun-failure-checks

log.info("[{}] Updating partitioned topic {} to {} partitions", replicatorId,
baseTopicName, localMetadata.partitions);
return replicationAdmin.topics()
.updatePartitionedTopicAsync(baseTopicName.toString(),
Copy link
Contributor

@poorbarcode poorbarcode Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not edit users' partition count automatically, it will break their consumption ordering when they use Producer -> MessageRouter or Key_Shared subscription.

Instead of modifying users' partition count, we'd better add some checks and print suitbale errors when users enable Geo-Replication by calling the admin API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not edit users' partition count automatically, it will break their consumption ordering when they use Producer -> MessageRouter or Key_Shared subscription.

Both manual and automatic partition updates can impact producers and consumers.

For Key_Shared subscriptions, messages with the same key are always handled by the same consumer, preserving order.

Instead of modifying users' partition count, we'd better add some checks and print suitbale errors when users enable Geo-Replication by calling the admin API

Users typically don’t check partition consistency between local and remote clusters manually. To minimize their operational burden, Pulsar should automatically synchronize partition counts across clusters.

Pulsar cannot achieve strict order. Overall, it can only ensure the order to the greatest extent possible.

/cc @poorbarcode @lhotari

Copy link
Contributor

@poorbarcode poorbarcode Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not edit users' partition count automatically, it will break their consumption ordering when they use Producer -> MessageRouter or Key_Shared subscription.

Both manual and automatic partition updates can impact producers and consumers.

The system should not do this without the user's permission. If we can not deal with conflict and guarantee full compatibility, leting users handle conflicts is better

For Key_Shared subscriptions, messages with the same key are always handled by the same consumer, preserving order.

  • Case 1: Key ranges that were owned by the same consumer across different topic partitions are not the same
  • Case 2: Producer.messageRouter guarantees that messages with the same key and that came from the same cluster will be sent to the same partition, and failover consumer guarantees a partition will only be consumed by one consumer.

The current PR broke the ordering of consumption

Users typically don’t check partition consistency between local and remote clusters manually. To minimize their operational burden, Pulsar should automatically synchronize partition counts across clusters.

For this goal, I think adding a new tool is better.

Pulsar cannot achieve strict order. Overall, it can only ensure the order to the greatest extent possible.

Rather than complaining, pushing PR and fixing the issue that you found is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The system should not do this without the user's permission. If we can not deal with conflict and guarantee full compatibility, leting users handle conflicts is better

@poorbarcode This logic added in this PR is inside a block if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {. Isn't that a sufficient way for users to control whether partitions should be increased automatically or not? Do we also have policies at namespace level for this setting?

Copy link
Contributor

@poorbarcode poorbarcode Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {. Isn't that a sufficient way for users to control whether partitions should be increased automatically or not?

@lhotari

See also PIP-370 support disable create topics on remote cluster through replication, the purpose of the PIP prevents replicator's starting when users have not create topics in the remote cluster, and the replicator will start automatically when users creates topics on the remote cluster. It helps users who want control topics creation more accurate, they have their customized tools to control Pulsar resources such like tenant, namespace, topic, subscription, topic-policies

Pulsar has a mechanism that creates topics when users create topics through the Admin API

  • namespace-level: If you have enabled namespace-level Geo-Replication, Pulsar will create topics on the remote cluster when you create topics on the source cluster.
  • topic-level: Pulsar will create topics on the remote cluster when you enable topic-level Geo-Replication

And there is a mechanism PIP-136 Sync Pulsar metadata across multiple clouds for the case that users want to synchronize all Pulsar resources if users want, but it has not been completed so far. If users want to minimize their operational burden, it is more helpful.

Pulsar already has so many mechanisms to synchronize topic creation

  • Global ZK
  • Sync when creating topics
  • PIP-136

Instead of adding mechanisms everywhere, we'd better improve the current mechanisms if they are not perfect

Copy link
Member Author

@nodece nodece Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell the difference between enabling Geo-Replication and starting Replicator?

@poorbarcode An example(r1):

# enable replication on the namespace level.
bin/pulsar-admin namespaces set-clusters public/geo-1 -c r1,r2

# topic will be synchronized to the r2 cluster because the replication is enabled on the namespace level.
bin/pulsar-admin topics create-partitioned-topic public/geo-1/tp-1 -p 10 

# disable replication on the namespace level.
bin/pulsar-admin namespaces set-clusters public/geo-1 -c r1

# topic doesn't synchronize to the r2 cluster because the replication is disabled on the namespace level.
bin/pulsar-admin topics create-partitioned-topic public/geo-1/tp-2 -p 10 

# enable replication on the namespace level, but public/geo-1/tp-2 does not exists on the r2 cluster. 
bin/pulsar-admin namespaces set-clusters public/geo-1 -c r1,r2

This is why I say that This approach does not guarantee full consistency, making it an unreliable solution..

I'd say that it's unexpected behavior when createTopicToRemoteClusterForReplication=true since it breaks replication if the partitions aren't created.

This is depends on the topic auto-creation.

However, I think that there's already existing code elsewhere in the replication implementation that modifies the partitions and creates them if they don't exist.

If the single partition doesn't exist on the remote cluster, the broker will auto create this partition on th remote cluster, but the replicator doesn't modifie the number of paritions.

https://github.com/apache/pulsar/blob/v4.0.3/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L493-L505: when the user calls the update parititons, which will be synchronized to the remote cluster.

In fact, GEO users rarely care about the topic types and partition numbers of local and remote clusters. They don't care, they only care about whether the message is synchronized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece

This is why I say that This approach does not guarantee full consistency, making it an unreliable solution..

# enable replication on the namespace level, but public/geo-1/tp-2 does not exists on the r2 cluster. 
bin/pulsar-admin namespaces set-clusters public/geo-1 -c r1,r2

You can add a check when enabling namespace-level Geo Replication, right?


Both the following channels are related to @lhotari's question, I would not trace them

I'd say that it's unexpected behavior when createTopicToRemoteClusterForReplication=true since it breaks replication if the partitions aren't created.
...
However, I think that there's already existing code elsewhere in the replication implementation that modifies the partitions and creates them if they don't exist.
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode

Please do not edit users' partition count automatically, it will break their consumption ordering when they use Producer -> MessageRouter or Key_Shared subscription.

Instead of modifying users' partition count, we'd better add some checks and print suitbale errors when users enable Geo-Replication by calling the admin API

That makes sense. By the way, if the partition numbers differ between the local and remote clusters, we should disable the GEO replication to keep the consumption order.

The partition update logic has been removed, could you have a chance to review this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition update logic has been removed, could you have a chance to review this PR?

Since the API of PulsarClientImpl below can also trigger a topic creation, how about adding a new param expected partitions with auto creation? Which will simplify your implementation.

CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic, boolean metadataAutoCreationEnabled)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition update logic has been removed, could you have a chance to review this PR?

Since the API of PulsarClientImpl below can also trigger a topic creation, how about adding a new param expected partitions with auto creation? Which will simplify your implementation.

CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic, boolean metadataAutoCreationEnabled)

@poorbarcode Explicit creation is different from implicit creation. When using implicit(PulsarClientImpl) creation, if the broker disables automatic creation, the client should fail.

The purpose of my inspection is to ensure that the topic types of local and remote clusters are consistent, which will help us avoid many potential issues.

@nodece nodece force-pushed the improve-topic-creation-and-partition-update-before-starting-geo branch from 1e6f38d to a28ede8 Compare April 1, 2025 09:51
@nodece nodece requested a review from poorbarcode April 1, 2025 09:57
@nodece nodece changed the title [improve][broker] Ensure topic creation and partition update before starting GEO [improve][broker] Ensure topic creation before starting GEO Apr 1, 2025
shibd
shibd previously requested changes Apr 2, 2025
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the local partitioned topic metadata has partitions>0, this means the topic is partitioned:

If the remote partitioned topic metadata has partitions=0, the local cluster sends a partitioned topic creation request to the remote cluster.

In my understanding, Pulsar's geo-replication is not a full replication of the cluster; we allow users to control it at the topic level.

So, for each cluster, there might be independent topics that are not involved in replication.

For this case, I prefer the current approach: returning an error for the user to handle

I think we should avoid implementing such internal implicit logic, as it would make the behavior increasingly unclear.

if (replicationClient == null) {
.getClusterAsync(remoteCluster))
.thenAccept((clusterData) -> {
PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe checking partitions in the class org.apache.pulsar.broker.admin.v2.PersistentTopics is better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#24118 has been implemented to check partitions.

When the cluster version is different, this check is terrific.

@nodece
Copy link
Member Author

nodece commented Apr 3, 2025

In my understanding, Pulsar's geo-replication is not a full replication of the cluster; we allow users to control it at the topic level.

@shibd Sure, I will create a topic before the geo replicator, not set up a namespace/topic policy.

@nodece nodece requested review from poorbarcode and shibd April 3, 2025 06:19
@shibd shibd dismissed their stale review April 7, 2025 02:02

I saw the implementation to change to create a topic when adding replication cluster. So I will dismiss my request change, but I haven't had a chance to review code details.

@nodece
Copy link
Member Author

nodece commented Apr 7, 2025

/pulsarbot rerun-failure-checks

@nodece nodece force-pushed the improve-topic-creation-and-partition-update-before-starting-geo branch from a28ede8 to d6dc701 Compare April 7, 2025 06:25
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the improve-topic-creation-and-partition-update-before-starting-geo branch from d6dc701 to 4db6215 Compare April 9, 2025 06:42
@nodece nodece marked this pull request as draft April 9, 2025 16:31
@nodece
Copy link
Member Author

nodece commented Apr 22, 2025

This is currently a black-box operation — when an error occurs, we can only check the broker logs to debug the issue. This makes it difficult for Pulsar's operation and maintenance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants