-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
3.0.1
Minimal reproduce step
- broker.conf:
systemTopicEnabled: "true"
topicLevelPoliciesEnabled: "true"
3.Setting the publishRate is to ensure the system topic has data
String namespace = "xxx/xxx";
List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
topicList = topicList.subList(0, 20);
for (String topic : topicList) {
PublishRate publishRate = new PublishRate();
publishRate.publishThrottlingRateInMsg = 100;
pulsarAdmin.topicPolicies().setPublishRate(topic, publishRate);
}- Concurrent query lookup:
topicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
topicList = topicList.subList(0, 20);
for (String topic : topicList) {
customThreadPool.execute(()->{
try {
while (true){
pulsarAdmin.lookups().lookupPartitionedTopic(topic);
}
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
});
}- Upgrade 3.0.1 from 2.11.1, this will restart broker(Or
unloadnamespace/topic).
If version 3.0.1 has already been installed, It can also be restart broker:
k rollout restart statefulset/pulsar-brokerEventually it was discovered that some topic partitions had been deleted.
try {
stats = pulsarAdmin.topics().getPartitionedStats(topic, true);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
int partitions = stats.getMetadata().partitions;
int size = stats.getPartitions().size();
if (partitions != size) {
log.info("**********topic={},partitions={},size={}", topic, partitions, size);
// pulsarAdmin.topics().updatePartitionedTopic(topic, partitions);
}What did you expect to see?
Data integrity.
What did you see instead?
About 200+ topics were deleted.
Anything else?
Releted code:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1569 to 1590 in 90a82ae
| public CompletableFuture<Void> checkReplication() { | |
| TopicName name = TopicName.get(topic); | |
| if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { | |
| return CompletableFuture.completedFuture(null); | |
| } | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] Checking replication status", name); | |
| } | |
| List<String> configuredClusters = topicPolicies.getReplicationClusters().get(); | |
| int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); | |
| String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); | |
| // if local cluster is removed from global namespace cluster-list : then delete topic forcefully | |
| // because pulsar doesn't serve global topic without local repl-cluster configured. | |
| if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { | |
| log.info("Deleting topic [{}] because local cluster is not part of " | |
| + " global namespace repl list {}", topic, configuredClusters); | |
| return deleteForcefully(); | |
| } |
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug

