From 1b6af5a8a8419ed6383d9f47dc7b200bd72c3775 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 1 Feb 2023 17:35:19 +0800 Subject: [PATCH 01/15] Using retry to fix problem --- .../broker/admin/impl/NamespacesBase.java | 71 ++++++++++++------- .../pulsar/broker/service/AbstractTopic.java | 3 - .../service/persistent/PersistentTopic.java | 2 +- .../pulsar/broker/admin/AdminApi2Test.java | 8 ++- 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 324c84048751f..b6a8489c9d3c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -106,6 +109,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicHashPositions; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.ValidateResult; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; @@ -204,23 +208,48 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie }); } - @SuppressWarnings("unchecked") - protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { + /** + * Delete the namespace and retry to resolve some topics that were not created successfully(in metadata) + * during the deletion. + */ + protected @Nonnull CompletableFuture internalDeleteNamespaceAsync(boolean force) { + final CompletableFuture future = new CompletableFuture<>(); + RetryUtil.retryAsynchronously(() -> internalDeleteNamespaceAsync0(force), + new BackoffBuilder() + .setInitialTime(200, TimeUnit.MILLISECONDS) + .setMandatoryStop(15, TimeUnit.SECONDS) + .setMax(15, TimeUnit.SECONDS) + .create(), + pulsar().getExecutor(), future); + return future; + } + private @Nonnull CompletableFuture internalDeleteNamespaceAsync0(boolean force) { CompletableFuture preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force); return preconditionCheck .thenCompose(policies -> { + final CompletableFuture markDeleteFuture; + if (policies != null && policies.deleted) { + markDeleteFuture = CompletableFuture.completedFuture(null); + } else { + markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> { + old.deleted = true; + return old; + }); + } if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){ - return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + return markDeleteFuture.thenCompose(__ -> + pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)); } - return pulsar().getNamespaceService().getFullListOfTopics(namespaceName); + return markDeleteFuture.thenCompose(__ -> + pulsar().getNamespaceService().getFullListOfTopics(namespaceName)); }) .thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) - .thenCompose(allPartitionedTopics -> { - List> topicsSum = new ArrayList<>(2); - topicsSum.add(allTopics); - topicsSum.add(allPartitionedTopics); - return CompletableFuture.completedFuture(topicsSum); - })) + .thenCompose(allPartitionedTopics -> { + List> topicsSum = new ArrayList<>(2); + topicsSum.add(allTopics); + topicsSum.add(allPartitionedTopics); + return CompletableFuture.completedFuture(topicsSum); + })) .thenCompose(topics -> { List allTopics = topics.get(0); ArrayList allUserCreatedTopics = new ArrayList<>(); @@ -260,22 +289,12 @@ protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); } } - return namespaceResources().setPoliciesAsync(namespaceName, old -> { - old.deleted = true; - return old; - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(allUserCreatedTopics); - }).thenCompose(ignore -> { - return internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics); - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(allSystemTopics); - }).thenCompose(ignore__ -> { - return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics); - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(topicPolicy); - }).thenCompose(ignore__ -> { - return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy); - }); + return internalDeleteTopicsAsync(allUserCreatedTopics) + .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics)) + .thenCompose(ignore -> internalDeleteTopicsAsync(allSystemTopics)) + .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) + .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy)) + .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy)); }) .thenCompose(ignore -> pulsar().getNamespaceService() .getNamespaceBundleFactory().getBundlesAsync(namespaceName)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c9f95ab524f55..5b1ee454c7eef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -246,9 +246,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { if (log.isDebugEnabled()) { log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); } - if (namespacePolicies.deleted) { - return; - } topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies); topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold); topicPolicies.getReplicationClusters().updateNamespaceValue( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d009d3778f2d1..af1210a0e5f7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -346,7 +346,7 @@ public CompletableFuture initialize() { Policies policies = optPolicies.get(); - this.updateTopicPolicyByNamespacePolicy(policies); + updateTopicPolicyByNamespacePolicy(policies); initializeDispatchRateLimiterIfNeeded(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 282b26488f9a6..48ecaffaf1c27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; @@ -1637,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test + @Test(invocationCount = 200) public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); @@ -1654,6 +1655,11 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception { // verify namespace can be deleted even without topic policy events admin.namespaces().deleteNamespace(namespace, true); + Awaitility.await().untilAsserted(() -> { + final CompletableFuture> eventTopicFuture = + pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events"); + assertNull(eventTopicFuture); + }); admin.namespaces().createNamespace(namespace, Set.of("test")); // create topic String topic = namespace + "/test-topic2"; From 2ab44b969751a7d247de7ca48c7437cef29690cf Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 1 Feb 2023 17:39:14 +0800 Subject: [PATCH 02/15] Remove useless test code --- .../test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 48ecaffaf1c27..34ff5463e24b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1638,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test(invocationCount = 200) + @Test public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); From 67758ca8d92e6fb53ad47d01d78d80c4881f585a Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 2 Feb 2023 06:05:42 +0800 Subject: [PATCH 03/15] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index af1210a0e5f7b..d009d3778f2d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -346,7 +346,7 @@ public CompletableFuture initialize() { Policies policies = optPolicies.get(); - updateTopicPolicyByNamespacePolicy(policies); + this.updateTopicPolicyByNamespacePolicy(policies); initializeDispatchRateLimiterIfNeeded(); From 1d0eb649c4113a9b5997fb9a664075af5b71a2b4 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Thu, 2 Feb 2023 06:17:16 +0800 Subject: [PATCH 04/15] Fix checkstyle --- .../java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b6a8489c9d3c5..aea9767bfbb58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -109,7 +109,6 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicHashPositions; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.ValidateResult; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; From ef7b10f5899c69c05b8c838b88348c15d25eb3fa Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 15:49:37 +0800 Subject: [PATCH 05/15] Using retry to instead backoff. --- .../broker/admin/impl/NamespacesBase.java | 35 +++++++++++++------ .../pulsar/broker/admin/AdminApi2Test.java | 2 +- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index aea9767bfbb58..57f2951f263fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -118,6 +118,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.apache.zookeeper.KeeperException; @Slf4j public abstract class NamespacesBase extends AdminResource { @@ -213,18 +214,16 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie */ protected @Nonnull CompletableFuture internalDeleteNamespaceAsync(boolean force) { final CompletableFuture future = new CompletableFuture<>(); - RetryUtil.retryAsynchronously(() -> internalDeleteNamespaceAsync0(force), - new BackoffBuilder() - .setInitialTime(200, TimeUnit.MILLISECONDS) - .setMandatoryStop(15, TimeUnit.SECONDS) - .setMax(15, TimeUnit.SECONDS) - .create(), - pulsar().getExecutor(), future); + internalRetryableDeleteNamespaceAsync0(force, 5, future); return future; } - private @Nonnull CompletableFuture internalDeleteNamespaceAsync0(boolean force) { - CompletableFuture preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force); - return preconditionCheck + private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes, + @Nonnull CompletableFuture callback) { + if (retryTimes == 0) { + // drop out recursive + return; + } + precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { final CompletableFuture markDeleteFuture; if (policies != null && policies.deleted) { @@ -317,7 +316,21 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie return CompletableFuture.completedFuture(null); }) ).collect(Collectors.toList()))) - .thenCompose(ignore -> internalClearZkSources()); + .thenCompose(ignore -> internalClearZkSources()) + .whenComplete((result, error) -> { + if (error != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(error); + if (rc instanceof MetadataStoreException) { + if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { + internalRetryableDeleteNamespaceAsync0(force, retryTimes, callback); + return; + } + } + callback.completeExceptionally(error); + return; + } + callback.complete(result); + }); } private CompletableFuture internalDeletePartitionedTopicsAsync(List topicNames) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 34ff5463e24b7..e980fd46c9323 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1638,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test + @Test(invocationCount = 500) public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); From 24ed229a3c6ae75f41826b3e735dfd94df600f31 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 16:00:25 +0800 Subject: [PATCH 06/15] Fix checkstyle --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 57f2951f263fe..e7db7dd184834 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -70,8 +70,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.BackoffBuilder; -import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; From 2b810fed23fe4620f074706b451ae5cfcf6905e3 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 16:21:41 +0800 Subject: [PATCH 07/15] Remove reduntant logic --- .../java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 6 ------ .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 6 ------ 2 files changed, 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index c13441db3dfdb..bba59ce7c22af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -250,12 +250,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarAdminException.ConflictException) { - log.info("[{}] There are new topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); - pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force)); - } if (!isRedirectException(ex)) { log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index efaf038d6326c..fe6a8ed7dbba2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -199,12 +199,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarAdminException.ConflictException) { - log.info("[{}] There are new topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); - pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force)); - } if (!isRedirectException(ex)) { log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex); } From 3124e588533d5ef158ab0d0106cecfbddeab02d1 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 16:23:47 +0800 Subject: [PATCH 08/15] Fix checkstyle --- .../main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 1 - .../main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 1 - 2 files changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index bba59ce7c22af..86741ffb0cd82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -48,7 +48,6 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceName; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index fe6a8ed7dbba2..79ed59bc98127 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -51,7 +51,6 @@ import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; From 4681d5beab3705beaa5d6f0a3992fbc3a0e5ad7f Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 16:27:00 +0800 Subject: [PATCH 09/15] Remove test invocations --- .../test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index e980fd46c9323..34ff5463e24b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1638,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test(invocationCount = 500) + @Test public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); From 522aa6e143499f3fddd179e7d90272437146ba27 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 22:41:46 +0800 Subject: [PATCH 10/15] Add the log --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index e7db7dd184834..bcebc3ed77266 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -320,6 +320,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime final Throwable rc = FutureUtil.unwrapCompletionException(error); if (rc instanceof MetadataStoreException) { if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { + log.info("[{}] There are in-flight topics created during the namespace deletion, " + + "retry to delete the namespace again.", namespaceName); internalRetryableDeleteNamespaceAsync0(force, retryTimes, callback); return; } From f8db021264c11fa44ea0af17c863db77889ed941 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 15 Feb 2023 22:44:35 +0800 Subject: [PATCH 11/15] Add forgotten -1 --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bcebc3ed77266..4d8460a849123 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -322,7 +322,7 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { log.info("[{}] There are in-flight topics created during the namespace deletion, " + "retry to delete the namespace again.", namespaceName); - internalRetryableDeleteNamespaceAsync0(force, retryTimes, callback); + internalRetryableDeleteNamespaceAsync0(force, retryTimes - 1, callback); return; } } From 288374f0b94109863ef2d46c39a79bd15620419e Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Thu, 16 Feb 2023 00:15:36 +0800 Subject: [PATCH 12/15] Apply comments --- .../pulsar/broker/admin/impl/NamespacesBase.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4d8460a849123..c2b348b2cd86a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -217,10 +217,6 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie } private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes, @Nonnull CompletableFuture callback) { - if (retryTimes == 0) { - // drop out recursive - return; - } precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { final CompletableFuture markDeleteFuture; @@ -322,7 +318,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { log.info("[{}] There are in-flight topics created during the namespace deletion, " + "retry to delete the namespace again.", namespaceName); - internalRetryableDeleteNamespaceAsync0(force, retryTimes - 1, callback); + if (retryTimes != 0) { + internalRetryableDeleteNamespaceAsync0(force, retryTimes - 1, callback); + } else { + callback.completeExceptionally( + new RestException(Status.CONFLICT, "The broker still have in-flight topics" + + " created during namespace deletion, please try again.")); + // drop out recursive + } return; } } From 4d55e1e11a33db5fbb6ab331c888ae1f4d4d6f8b Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Thu, 16 Feb 2023 00:17:55 +0800 Subject: [PATCH 13/15] Apply comment --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c2b348b2cd86a..2a8f6d5b43f35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -318,8 +318,10 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { log.info("[{}] There are in-flight topics created during the namespace deletion, " + "retry to delete the namespace again.", namespaceName); - if (retryTimes != 0) { - internalRetryableDeleteNamespaceAsync0(force, retryTimes - 1, callback); + final int next = retryTimes - 1; + if (next > 0) { + // async recursive + internalRetryableDeleteNamespaceAsync0(force, next, callback); } else { callback.completeExceptionally( new RestException(Status.CONFLICT, "The broker still have in-flight topics" From 3fa7af411fdc93ad5278f8c4541743a48f1229f5 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Fri, 17 Feb 2023 13:32:21 +0800 Subject: [PATCH 14/15] Change the logic to fix the test --- .../broker/admin/impl/NamespacesBase.java | 139 +++++++++--------- .../pulsar/broker/admin/AdminApi2Test.java | 2 +- 2 files changed, 74 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 2a8f6d5b43f35..76eb6518db19d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -219,74 +219,81 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime @Nonnull CompletableFuture callback) { precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { - final CompletableFuture markDeleteFuture; - if (policies != null && policies.deleted) { - markDeleteFuture = CompletableFuture.completedFuture(null); - } else { - markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> { - old.deleted = true; - return old; - }); - } + final CompletableFuture> topicsFuture; if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){ - return markDeleteFuture.thenCompose(__ -> - pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)); - } - return markDeleteFuture.thenCompose(__ -> - pulsar().getNamespaceService().getFullListOfTopics(namespaceName)); - }) - .thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) - .thenCompose(allPartitionedTopics -> { - List> topicsSum = new ArrayList<>(2); - topicsSum.add(allTopics); - topicsSum.add(allPartitionedTopics); - return CompletableFuture.completedFuture(topicsSum); - })) - .thenCompose(topics -> { - List allTopics = topics.get(0); - ArrayList allUserCreatedTopics = new ArrayList<>(); - List allPartitionedTopics = topics.get(1); - ArrayList allUserCreatedPartitionTopics = new ArrayList<>(); - boolean hasNonSystemTopic = false; - List allSystemTopics = new ArrayList<>(); - List allPartitionedSystemTopics = new ArrayList<>(); - List topicPolicy = new ArrayList<>(); - List partitionedTopicPolicy = new ArrayList<>(); - for (String topic : allTopics) { - if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { - hasNonSystemTopic = true; - allUserCreatedTopics.add(topic); - } else { - if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { - topicPolicy.add(topic); - } else { - allSystemTopics.add(topic); - } - } - } - for (String topic : allPartitionedTopics) { - if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { - hasNonSystemTopic = true; - allUserCreatedPartitionTopics.add(topic); - } else { - if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { - partitionedTopicPolicy.add(topic); - } else { - allPartitionedSystemTopics.add(topic); - } - } - } - if (!force) { - if (hasNonSystemTopic) { - throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); - } + topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + } else { + topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName); } - return internalDeleteTopicsAsync(allUserCreatedTopics) - .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics)) - .thenCompose(ignore -> internalDeleteTopicsAsync(allSystemTopics)) - .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) - .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy)) - .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy)); + return topicsFuture.thenCompose(allTopics -> + pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) + .thenCompose(allPartitionedTopics -> { + List> topicsSum = new ArrayList<>(2); + topicsSum.add(allTopics); + topicsSum.add(allPartitionedTopics); + return CompletableFuture.completedFuture(topicsSum); + })) + .thenCompose(topics -> { + List allTopics = topics.get(0); + ArrayList allUserCreatedTopics = new ArrayList<>(); + List allPartitionedTopics = topics.get(1); + ArrayList allUserCreatedPartitionTopics = new ArrayList<>(); + boolean hasNonSystemTopic = false; + List allSystemTopics = new ArrayList<>(); + List allPartitionedSystemTopics = new ArrayList<>(); + List topicPolicy = new ArrayList<>(); + List partitionedTopicPolicy = new ArrayList<>(); + for (String topic : allTopics) { + if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { + hasNonSystemTopic = true; + allUserCreatedTopics.add(topic); + } else { + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + topicPolicy.add(topic); + } else { + allSystemTopics.add(topic); + } + } + } + for (String topic : allPartitionedTopics) { + if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { + hasNonSystemTopic = true; + allUserCreatedPartitionTopics.add(topic); + } else { + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + partitionedTopicPolicy.add(topic); + } else { + allPartitionedSystemTopics.add(topic); + } + } + } + if (!force) { + if (hasNonSystemTopic) { + throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); + } + } + final CompletableFuture markDeleteFuture; + if (policies != null && policies.deleted) { + markDeleteFuture = CompletableFuture.completedFuture(null); + } else { + markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> { + old.deleted = true; + return old; + }); + } + return markDeleteFuture.thenCompose(__ -> + internalDeleteTopicsAsync(allUserCreatedTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics)) + .thenCompose(ignore -> + internalDeleteTopicsAsync(allSystemTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) + .thenCompose(ignore -> + internalDeleteTopicsAsync(topicPolicy)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(partitionedTopicPolicy)); + }); }) .thenCompose(ignore -> pulsar().getNamespaceService() .getNamespaceBundleFactory().getBundlesAsync(namespaceName)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 34ff5463e24b7..e980fd46c9323 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1638,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test + @Test(invocationCount = 500) public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); From 9fff22f444145df261cd603663f0ae22756b77a2 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Fri, 17 Feb 2023 13:33:25 +0800 Subject: [PATCH 15/15] Revert useless code --- .../test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index e980fd46c9323..34ff5463e24b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1638,7 +1638,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - @Test(invocationCount = 500) + @Test public void testDeleteNamespaceWithTopicPolicies() throws Exception { String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant));