From 368bf515979dc81bb55c837026b74de6fab6e425 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 7 Sep 2022 18:20:16 +0800 Subject: [PATCH 1/4] [fix][broker]Consumer can't consume messages because there has two sames topics in one broker --- .../pulsar/broker/service/BrokerService.java | 43 ++++++++++++++++--- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 6 +-- .../service/BrokerBkEnsemblesTests.java | 4 +- 4 files changed, 45 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index de4b70c7046c9..a89df99f9e596 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1997,7 +1997,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { TopicName topicName = TopicName.get(topic); if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) { log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic); - pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit); + pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null); } } } @@ -2006,15 +2006,43 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFromCache(String topic) { + public CompletableFuture removeTopicFromCache(String topicNameString, Topic topic) { + if (topic == null){ + return removeTopicFutureFromCache(topicNameString, null); + } + final CompletableFuture> createTopicFuture = topics.get(topicNameString); + // If not exists in cache, do nothing. + if (createTopicFuture == null){ + return CompletableFuture.completedFuture(null); + } + // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic + // in the argument. Do nothing. + if (!createTopicFuture.isDone()){ + return CompletableFuture.completedFuture(null); + } + return createTopicFuture.thenCompose(topicOptional -> { + Topic topicInCache = topicOptional.orElse(null); + // If @param topic is not equals with cached, do nothing. + if (topicInCache == null || topicInCache != topic){ + return CompletableFuture.completedFuture(null); + } else { + // Do remove. + return removeTopicFutureFromCache(topicNameString, createTopicFuture); + } + }); + } + + private CompletableFuture removeTopicFutureFromCache(String topic, + CompletableFuture> createTopicFuture) { TopicName topicName = TopicName.get(topic); return pulsar.getNamespaceService().getBundleAsync(topicName) .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle); + removeTopicFromCache(topic, namespaceBundle, createTopicFuture); }); } - public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) { + public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, + CompletableFuture> createTopicFuture) { String bundleName = namespaceBundle.toString(); String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); @@ -2041,7 +2069,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) } } } - topics.remove(topic); + + if (createTopicFuture == null) { + topics.remove(topic); + } else { + topics.remove(topic, createTopicFuture); + } Compactor compactor = pulsar.getNullableCompactor(); if (compactor != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 3f15a36b074e8..ee1e20b23a5fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -439,7 +439,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(topic, NonPersistentTopic.this); unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -516,7 +516,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect // unload topic iterates over topics map and removing from the map with the same thread creates deadlock. // so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(topic, NonPersistentTopic.this); unregisterTopicPolicyListener(); closeFuture.complete(null); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 484120cce245a..890e19e102e3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1194,7 +1194,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(topic, PersistentTopic.this); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1305,7 +1305,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect @Override public void closeComplete(Object ctx) { // Everything is now closed, remove the topic from map - brokerService.removeTopicFromCache(topic) + brokerService.removeTopicFromCache(topic, PersistentTopic.this) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); @@ -1327,7 +1327,7 @@ public void closeComplete(Object ctx) { @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(topic, PersistentTopic.this); unregisterTopicPolicyListener(); closeFuture.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 612d9368b8c70..056e96d74ad7d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -127,7 +127,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { // (3) remove topic and managed-ledger from broker which means topic is not closed gracefully consumer.close(); producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic1); + pulsar.getBrokerService().removeTopicFromCache(topic1, null); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); @@ -252,7 +252,7 @@ public void testSkipCorruptDataLedger() throws Exception { // clean managed-ledger and recreate topic to clean any data from the cache producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic1); + pulsar.getBrokerService().removeTopicFromCache(topic1, null); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); From 64e9226858a2d47567c32cb7b752c8c4ae3422c5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 14 Sep 2022 23:56:10 +0800 Subject: [PATCH 2/4] Cover more logical branches: topic future has exceptionally complete --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a89df99f9e596..c3bc350408238 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2029,7 +2029,9 @@ public CompletableFuture removeTopicFromCache(String topicNameString, Topi // Do remove. return removeTopicFutureFromCache(topicNameString, createTopicFuture); } - }); + // If the future in cache has exception complete, + // the topic instance in the cache is not the same with the topic. + }).exceptionally(ex -> null); } private CompletableFuture removeTopicFutureFromCache(String topic, From 90e0a3bec4e298b3d21d7ce5f3ff6fa70a35186b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 20 Sep 2022 14:35:18 +0800 Subject: [PATCH 3/4] change logic to avoid ignore exceptions --- .../pulsar/broker/service/BrokerService.java | 45 ++++++++++++------- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 6 +-- .../service/BrokerBkEnsemblesTests.java | 4 +- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c3bc350408238..d8bcf9261fba4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2006,20 +2006,15 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFromCache(String topicNameString, Topic topic) { - if (topic == null){ - return removeTopicFutureFromCache(topicNameString, null); - } - final CompletableFuture> createTopicFuture = topics.get(topicNameString); - // If not exists in cache, do nothing. + public CompletableFuture removeTopicFromCache(String topicName) { + return removeTopicFutureFromCache(topicName, null); + } + + public CompletableFuture removeTopicFromCache(Topic topic) { + CompletableFuture> createTopicFuture = findTopicFutureInCache(topic); if (createTopicFuture == null){ return CompletableFuture.completedFuture(null); } - // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic - // in the argument. Do nothing. - if (!createTopicFuture.isDone()){ - return CompletableFuture.completedFuture(null); - } return createTopicFuture.thenCompose(topicOptional -> { Topic topicInCache = topicOptional.orElse(null); // If @param topic is not equals with cached, do nothing. @@ -2027,11 +2022,31 @@ public CompletableFuture removeTopicFromCache(String topicNameString, Topi return CompletableFuture.completedFuture(null); } else { // Do remove. - return removeTopicFutureFromCache(topicNameString, createTopicFuture); + return removeTopicFutureFromCache(topic.getName(), createTopicFuture); } - // If the future in cache has exception complete, - // the topic instance in the cache is not the same with the topic. - }).exceptionally(ex -> null); + }); + } + + private CompletableFuture> findTopicFutureInCache(Topic topic){ + if (topic == null){ + return null; + } + final CompletableFuture> createTopicFuture = topics.get(topic.getName()); + // If not exists in cache, do nothing. + if (createTopicFuture == null){ + return null; + } + // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic + // in the argument. Do nothing. + if (!createTopicFuture.isDone()){ + return null; + } + // If the future in cache has exception complete, + // the topic instance in the cache is not the same with the topic. + if (createTopicFuture.isCompletedExceptionally()){ + return null; + } + return createTopicFuture; } private CompletableFuture removeTopicFutureFromCache(String topic, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index ee1e20b23a5fb..0400326748d59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -439,7 +439,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic, NonPersistentTopic.this); + brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); @@ -516,7 +516,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect // unload topic iterates over topics map and removing from the map with the same thread creates deadlock. // so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic, NonPersistentTopic.this); + brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); closeFuture.complete(null); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 890e19e102e3e..6f73602996d52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1194,7 +1194,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(topic, PersistentTopic.this); + brokerService.removeTopicFromCache(PersistentTopic.this); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1305,7 +1305,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect @Override public void closeComplete(Object ctx) { // Everything is now closed, remove the topic from map - brokerService.removeTopicFromCache(topic, PersistentTopic.this) + brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); @@ -1327,7 +1327,7 @@ public void closeComplete(Object ctx) { @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - brokerService.removeTopicFromCache(topic, PersistentTopic.this); + brokerService.removeTopicFromCache(PersistentTopic.this); unregisterTopicPolicyListener(); closeFuture.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 056e96d74ad7d..612d9368b8c70 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -127,7 +127,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { // (3) remove topic and managed-ledger from broker which means topic is not closed gracefully consumer.close(); producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic1, null); + pulsar.getBrokerService().removeTopicFromCache(topic1); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); @@ -252,7 +252,7 @@ public void testSkipCorruptDataLedger() throws Exception { // clean managed-ledger and recreate topic to clean any data from the cache producer.close(); - pulsar.getBrokerService().removeTopicFromCache(topic1, null); + pulsar.getBrokerService().removeTopicFromCache(topic1); ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); field.setAccessible(true); From 4db04f509451dc1f17b96009b2f5e268c1ea644b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 20 Sep 2022 18:30:49 +0800 Subject: [PATCH 4/4] instead 'null' to optional --- .../pulsar/broker/service/BrokerService.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d8bcf9261fba4..9289b2cf141ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2011,42 +2011,38 @@ public CompletableFuture removeTopicFromCache(String topicName) { } public CompletableFuture removeTopicFromCache(Topic topic) { - CompletableFuture> createTopicFuture = findTopicFutureInCache(topic); - if (createTopicFuture == null){ + Optional>> createTopicFuture = findTopicFutureInCache(topic); + if (createTopicFuture.isEmpty()){ return CompletableFuture.completedFuture(null); } - return createTopicFuture.thenCompose(topicOptional -> { - Topic topicInCache = topicOptional.orElse(null); - // If @param topic is not equals with cached, do nothing. - if (topicInCache == null || topicInCache != topic){ - return CompletableFuture.completedFuture(null); - } else { - // Do remove. - return removeTopicFutureFromCache(topic.getName(), createTopicFuture); - } - }); + return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); } - private CompletableFuture> findTopicFutureInCache(Topic topic){ + private Optional>> findTopicFutureInCache(Topic topic){ if (topic == null){ - return null; + return Optional.empty(); } final CompletableFuture> createTopicFuture = topics.get(topic.getName()); // If not exists in cache, do nothing. if (createTopicFuture == null){ - return null; + return Optional.empty(); } - // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic - // in the argument. Do nothing. + // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic. if (!createTopicFuture.isDone()){ - return null; + return Optional.empty(); } // If the future in cache has exception complete, // the topic instance in the cache is not the same with the topic. if (createTopicFuture.isCompletedExceptionally()){ - return null; + return Optional.empty(); + } + Optional optionalTopic = createTopicFuture.join(); + Topic topicInCache = optionalTopic.orElse(null); + if (topicInCache == null || topicInCache != topic){ + return Optional.empty(); + } else { + return Optional.of(createTopicFuture); } - return createTopicFuture; } private CompletableFuture removeTopicFutureFromCache(String topic, @@ -2058,7 +2054,7 @@ private CompletableFuture removeTopicFutureFromCache(String topic, }); } - public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, + private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, CompletableFuture> createTopicFuture) { String bundleName = namespaceBundle.toString(); String namespaceName = TopicName.get(topic).getNamespaceObject().toString();