diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4048b4ac77627..eb2752187cc6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -415,14 +415,42 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo try { // firstly remove all topics including system topics if (!topics.isEmpty()) { + Set partitionedTopics = new HashSet<>(); + Set nonPartitionedTopics = new HashSet<>(); + for (String topic : topics) { try { - futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + TopicName topicName = TopicName.get(topic); + if (topicName.isPartitioned()) { + String partitionedTopic = topicName.getPartitionedTopicName(); + if (!partitionedTopics.contains(partitionedTopic)) { + // Distinguish partitioned topic to avoid duplicate deletion of the same schema + futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync( + partitionedTopic, true, true)); + partitionedTopics.add(partitionedTopic); + } + } else { + futures.add(pulsar().getAdminClient().topics().deleteAsync( + topic, true, true)); + nonPartitionedTopics.add(topic); + } } catch (Exception e) { - log.error("[{}] Failed to force delete topic {}", clientAppId(), topic, e); - asyncResponse.resume(new RestException(e)); + String errorMessage = String.format("Failed to force delete topic %s, " + + "but the previous deletion command of partitioned-topics:%s " + + "and non-partitioned-topics:%s have been sent out asynchronously. " + + "Reason: %s", + topic, partitionedTopics, nonPartitionedTopics, e.getCause()); + log.error("[{}] {}", clientAppId(), errorMessage, e); + asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, errorMessage)); + return; } } + + if (log.isDebugEnabled()) { + log.debug("Successfully send deletion command of partitioned-topics:{} " + + "and non-partitioned-topics:{} in namespace:{}.", + partitionedTopics, nonPartitionedTopics, namespaceName); + } } // forcefully delete namespace bundles NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index ce6814cea3672..556305df71ca2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -36,6 +36,7 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import javax.ws.rs.core.Response.Status; import lombok.Cleanup; @@ -1472,6 +1474,60 @@ public void testForceDeleteNamespace() throws Exception { } } + @Test + public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception { + conf.setForceDeleteNamespaceAllowed(true); + final String ns = "prop-xyz/distinguish-topic-type-ns"; + final String exNs = "prop-xyz/ex-distinguish-topic-type-ns"; + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().createNamespace(exNs, 2); + + final String p1 = "persistent://" + ns + "/p1"; + final String p5 = "persistent://" + ns + "/p5"; + final String np = "persistent://" + ns + "/np"; + + admin.topics().createPartitionedTopic(p1, 1); + admin.topics().createPartitionedTopic(p5, 5); + admin.topics().createNonPartitionedTopic(np); + + final String exNp = "persistent://" + exNs + "/np"; + admin.topics().createNonPartitionedTopic(exNp); + // insert an invalid topic name + pulsar.getLocalMetadataStore().put( + "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join(); + + List topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get(); + List exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get(); + + // ensure that the topic list contains all the topics + List allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString())); + for (int i = 0; i < 5; i++) { + allTopics.add(TopicName.get(p5).getPartition(i).toString()); + } + Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0); + Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/")); + // partition num = p1 + p5 + np + Assert.assertEquals(topics.size(), 1 + 5 + 1); + Assert.assertEquals(exTopics.size(), 1 + 1); + + admin.namespaces().deleteNamespace(ns, true); + Arrays.asList(p1, p5, np).forEach(t -> { + try { + admin.schemas().getSchemaInfo(t); + } catch (PulsarAdminException e) { + // all the normal topics' schemas have been deleted + Assert.assertEquals(e.getStatusCode(), 404); + } + }); + + try { + admin.namespaces().deleteNamespace(exNs, true); + fail("Should fail due to invalid topic"); + } catch (Exception e) { + //ok + } + } + @Test public void testUpdateClusterWithProxyUrl() throws Exception { ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();