diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index c982630438309..888106ead3114 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -19,8 +19,6 @@ package org.apache.pulsar.broker.admin; import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; -import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -47,7 +45,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; @@ -109,7 +106,6 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; @@ -1474,7 +1470,7 @@ public void testDeleteTenant() throws Exception { assertFalse(admin.topics().getList(namespace).isEmpty()); try { - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant @@ -1485,7 +1481,7 @@ public void testDeleteTenant() throws Exception { assertTrue(admin.topics().getList(namespace).isEmpty()); // delete namespace - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); @@ -1565,11 +1561,8 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { admin.topics().createPartitionedTopic(topic, 10); assertFalse(admin.topics().getList(namespace).isEmpty()); - // Wait for change event topic and compaction create finish. - awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic)); - try { - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant @@ -1580,7 +1573,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { assertTrue(admin.topics().getList(namespace).isEmpty()); // delete namespace - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); @@ -1598,49 +1591,6 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { setup(); } - private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception { - if (!pulsar.getConfiguration().isSystemTopicEnabled()){ - return; - } - // Trigger change event topic create. - SubscribeRate subscribeRate = new SubscribeRate(-1, 60); - admin.topicPolicies().setSubscribeRate(topic, subscribeRate); - // Wait for change event topic and compaction create finish. - String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); - int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); - ArrayList expectChangeEventTopics = new ArrayList<>(); - if ("non-partitioned".equals(allowAutoTopicCreationType)){ - String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME); - expectChangeEventTopics.add(t); - } else { - for (int i = 0; i < defaultNumPartitions; i++){ - String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i); - expectChangeEventTopics.add(t); - } - } - Awaitility.await().until(() -> { - boolean finished = true; - for (String changeEventTopicName : expectChangeEventTopics){ - CompletableFuture> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false); - if (completableFuture == null){ - finished = false; - } - Optional optionalTopic = completableFuture.get(); - if (!optionalTopic.isPresent()){ - finished = false; - } - PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get(); - if (!changeEventTopic.isCompactionEnabled()){ - continue; - } - if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){ - finished = false; - } - } - return finished; - }); - } - @Test public void testDeleteNamespaceWithTopicPolicies() throws Exception { cleanup(); @@ -1675,7 +1625,7 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception { }); producer.close(); admin.topics().delete(topic); - admin.namespaces().deleteNamespace(namespace); + deleteNamespaceGraceFully(namespace, false); Awaitility.await().untilAsserted(() -> { assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); }); @@ -1886,7 +1836,7 @@ public void testForceDeleteNamespace() throws Exception { final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID(); pulsarClient.newProducer(Schema.DOUBLE).topic(topic).create().close(); Awaitility.await().untilAsserted(() -> assertNotNull(admin.schemas().getSchemaInfo(topic))); - admin.namespaces().deleteNamespace(namespaceName, true); + deleteNamespaceGraceFully(namespaceName, true); try { admin.schemas().getSchemaInfo(topic); } catch (PulsarAdminException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index fe7c13057ba44..2246e184384e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -271,7 +271,7 @@ public void clusters() throws Exception { Awaitility.await() .untilAsserted(() -> assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"))); - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); admin.clusters().deleteCluster("test"); assertEquals(admin.clusters().getClusters(), new ArrayList<>()); @@ -500,7 +500,7 @@ public void brokers() throws Exception { String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get())); Assert.assertEquals(nsMap2.size(), 2); - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); admin.clusters().deleteCluster("test"); assertEquals(admin.clusters().getClusters(), new ArrayList<>()); } @@ -702,7 +702,7 @@ public void testGetDynamicLocalConfiguration() throws Exception { } @Test - public void properties() throws PulsarAdminException { + public void properties() throws Exception { try { admin.tenants().getTenantInfo("does-not-exist"); fail("should have failed"); @@ -731,7 +731,7 @@ public void properties() throws PulsarAdminException { assertEquals(e.getStatusCode(), 409); assertEquals(e.getMessage(), "The tenant still has active namespaces"); } - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); admin.tenants().deleteTenant("prop-xyz"); assertEquals(admin.tenants().getTenants(), new ArrayList<>()); @@ -760,7 +760,7 @@ public void namespaces() throws Exception { assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getNumBundles(), 4); assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getBoundaries().size(), 5); - admin.namespaces().deleteNamespace("prop-xyz/ns3"); + deleteNamespaceGraceFully("prop-xyz/ns3", false); try { admin.namespaces().createNamespace("non-existing/ns1"); @@ -834,7 +834,7 @@ public void namespaces() throws Exception { } assertTrue(i < 10); - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); assertEquals(admin.namespaces().getNamespaces("prop-xyz"), Lists.newArrayList("prop-xyz/ns2")); try { @@ -1260,7 +1260,7 @@ public void testGetPartitionedStatsInternal() throws Exception { @Test(dataProvider = "numBundles") public void testDeleteNamespaceBundle(Integer numBundles) throws Exception { - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test")); @@ -1272,7 +1272,7 @@ public void testDeleteNamespaceBundle(Integer numBundles) throws Exception { assertEquals(admin.namespaces().getTopics("prop-xyz/ns1-bundles"), new ArrayList<>()); - admin.namespaces().deleteNamespace("prop-xyz/ns1-bundles"); + deleteNamespaceGraceFully("prop-xyz/ns1-bundles", false); assertEquals(admin.namespaces().getNamespaces("prop-xyz", "test"), new ArrayList<>()); } @@ -1359,14 +1359,14 @@ public void testDeleteNamespaceForcefully() throws Exception { assertFalse(admin.topics().getList(namespace).isEmpty()); try { - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); fail("should have failed due to namespace not empty"); } catch (PulsarAdminException e) { // Expected: cannot delete non-empty tenant } // delete namespace forcefully - admin.namespaces().deleteNamespace(namespace, true); + deleteNamespaceGraceFully(namespace, true); assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); @@ -2233,7 +2233,7 @@ public void testBackwardCompatibility() throws Exception { assertEquals(result.someNewIntField, 0); assertNull(result.someNewString); - admin.namespaces().deleteNamespace("prop-xyz/ns1"); + deleteNamespaceGraceFully("prop-xyz/ns1", false); admin.tenants().deleteTenant("prop-xyz"); assertEquals(admin.tenants().getTenants(), new ArrayList<>()); } @@ -3144,9 +3144,9 @@ public void testSubscriptionExpiry() throws Exception { admin.topics().delete(topic1); admin.topics().delete(topic2); admin.topics().delete(topic3); - admin.namespaces().deleteNamespace(namespace1); - admin.namespaces().deleteNamespace(namespace2); - admin.namespaces().deleteNamespace(namespace3); + deleteNamespaceGraceFully(namespace1, false); + deleteNamespaceGraceFully(namespace2, false); + deleteNamespaceGraceFully(namespace3, false); } @Test @@ -3159,11 +3159,11 @@ public void testCreateAndDeleteNamespaceWithBundles() throws Exception { String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns"); admin.namespaces().createNamespace(ns, 24); - admin.namespaces().deleteNamespace(ns); + deleteNamespaceGraceFully(ns, false); // Re-create and re-delete admin.namespaces().createNamespace(ns, 32); - admin.namespaces().deleteNamespace(ns); + deleteNamespaceGraceFully(ns, false); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index 15fbcb6d3a3f8..26dc6163f9f72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -459,7 +459,7 @@ public void testDeleteNamespace() throws Exception { admin.topics().delete("tenant1/ns1/foobar", true); log.info("Deleting namespace"); - admin.namespaces().deleteNamespace("tenant1/ns1"); + deleteNamespaceGraceFully("tenant1/ns1", false, admin); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java index 154c3a9227781..e35e96970bbc8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java @@ -117,7 +117,7 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { // cleanup. admin.topics().delete(topicName); - admin.namespaces().deleteNamespace(namespaceName); + deleteNamespaceGraceFully(namespaceName, false); admin.tenants().deleteTenant(tenantName); admin.clusters().deleteCluster(clusterName); // super cleanup. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index b4d7b2e5a32b7..d3eea7996a187 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -59,6 +59,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -652,5 +653,21 @@ public Map, Collection>> register(Object callback, Object... c } + /** + * see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)} + */ + protected void deleteNamespaceGraceFully(String ns, boolean force) + throws Exception { + BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsar, admin); + } + + /** + * see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)} + */ + protected void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin admin) + throws Exception { + BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsar, admin); + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java index c0a468816281d..1ba87d3e4302a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java @@ -21,7 +21,6 @@ import com.google.common.collect.Sets; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -34,7 +33,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import static org.testng.Assert.assertTrue; @@ -55,7 +53,7 @@ protected void cleanup() throws Exception { } @Test - public void testNamespaceBundleOwnershipListener() throws PulsarAdminException, InterruptedException, PulsarClientException { + public void testNamespaceBundleOwnershipListener() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final AtomicBoolean onLoad = new AtomicBoolean(false); @@ -101,11 +99,11 @@ public void unLoad(NamespaceBundle bundle) { Assert.assertTrue(onLoad.get()); Assert.assertTrue(unLoad.get()); admin.topics().delete(topic); - admin.namespaces().deleteNamespace(namespace); + deleteNamespaceGraceFully(namespace, false); } @Test - public void testGetAllPartitions() throws PulsarAdminException, ExecutionException, InterruptedException { + public void testGetAllPartitions() throws Exception { final String namespace = "prop/" + UUID.randomUUID().toString(); admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace)); @@ -122,11 +120,11 @@ public void testGetAllPartitions() throws PulsarAdminException, ExecutionExcepti } admin.topics().deletePartitionedTopic(topicName); - admin.namespaces().deleteNamespace(namespace); + deleteNamespaceGraceFully(namespace, false); } @Test - public void testNamespaceBundleLookupOnwershipListener() throws PulsarAdminException, InterruptedException, + public void testNamespaceBundleLookupOnwershipListener() throws Exception, PulsarClientException { final CountDownLatch countDownLatch = new CountDownLatch(2); final AtomicInteger onLoad = new AtomicInteger(0); @@ -172,6 +170,6 @@ public boolean test(NamespaceBundle namespaceBundle) { Assert.assertEquals(onLoad.get(), 1); Assert.assertEquals(unLoad.get(), 1); admin.topics().delete(topic); - admin.namespaces().deleteNamespace(namespace); + deleteNamespaceGraceFully(namespace, false); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java index 38cae3253fe33..6d0ec4f3e03b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java @@ -197,7 +197,7 @@ public void testResourceGroupAttachToNamespace() throws Exception { assertNull(pulsar.getResourceGroupServiceManager() .getNamespaceResourceGroup(NamespaceName.get(namespaceName)))); - admin.namespaces().deleteNamespace(namespaceName); + deleteNamespaceGraceFully(namespaceName, false); deleteResourceGroup(rgName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index bcdaecaaeb4bc..b100c3a977ab2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -80,6 +80,14 @@ public class BacklogQuotaManagerTest { private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2; private static final int MAX_ENTRIES_PER_LEDGER = 5; + /** + * see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)} + */ + protected void deleteNamespaceGraceFully(String ns, boolean force) + throws Exception { + BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsar, admin); + } + @DataProvider(name = "backlogQuotaSizeGB") public Object[][] backlogQuotaSizeGB() { return new Object[][] { { true }, { false } }; @@ -159,10 +167,10 @@ void createNamespaces() throws PulsarAdminException { } @AfterMethod(alwaysRun = true) - void clearNamespaces() throws PulsarAdminException { - admin.namespaces().deleteNamespace("prop/ns-quota", true); - admin.namespaces().deleteNamespace("prop/quotahold", true); - admin.namespaces().deleteNamespace("prop/quotaholdasync", true); + void clearNamespaces() throws Exception { + deleteNamespaceGraceFully("prop/ns-quota", true); + deleteNamespaceGraceFully("prop/quotahold", true); + deleteNamespaceGraceFully("prop/quotaholdasync", true); } private void rolloverStats() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java index 573d473ed587a..b640d2fbe3956 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java @@ -59,7 +59,7 @@ public void testRecreateNamespace() throws Exception { // Delete and recreate with 32 bundles admin.topics().delete(topic); - admin.namespaces().deleteNamespace(namespace, false); + deleteNamespaceGraceFully(namespace, false); admin.namespaces().createNamespace(namespace, 32); BundlesData bundlesData = admin.namespaces().getBundles(namespace); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index 03068133a84b4..efdfe0fe188c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -18,13 +18,32 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.coordination.LockManager; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,5 +127,103 @@ protected String newTopicName() { return "prop/ns-abc/topic-" + Long.toHexString(random.nextLong()); } + /** + * see {@link #deleteNamespaceGraceFully} + */ + protected void deleteNamespaceGraceFully(String ns, boolean force) + throws Exception { + deleteNamespaceGraceFully(ns, force, pulsar, admin); + } + + /** + * Wait until system topic "__change_event" and subscription "__compaction" are created, and then delete the namespace. + */ + public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarService pulsar, PulsarAdmin admin) + throws Exception { + // namespace v1 should not wait system topic create. + if (ns.split("/").length > 2){ + admin.namespaces().deleteNamespace(ns, force); + return; + } + if (!pulsar.getConfiguration().isSystemTopicEnabled()){ + admin.namespaces().deleteNamespace(ns, force); + return; + } + // If no bundle has been loaded, then the System Topic will not trigger creation. + LockManager lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class); + List lockedBundles = (List) lockManager.listLocks("/namespace" + "/" + ns).join(); + if (CollectionUtils.isEmpty(lockedBundles)){ + admin.namespaces().deleteNamespace(ns, force); + return; + } + // Trigger change event topic create. + NamespaceName namespace = NamespaceName.get(ns); + NamespaceBundle namespaceBundle = mock(NamespaceBundle.class); + when(namespaceBundle.getNamespaceObject()).thenReturn(namespace); + pulsar.getTopicPoliciesService().addOwnedNamespaceBundleAsync(namespaceBundle); + // Wait for change event topic and compaction create finish. + String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); + int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); + ArrayList expectChangeEventTopics = new ArrayList<>(); + if ("non-partitioned".equals(allowAutoTopicCreationType)){ + String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME); + expectChangeEventTopics.add(t); + } else { + for (int i = 0; i < defaultNumPartitions; i++){ + String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i); + expectChangeEventTopics.add(t); + } + } + Awaitility.await().until(() -> { + boolean finished = true; + for (String changeEventTopicName : expectChangeEventTopics){ + boolean bundleExists = pulsar.getNamespaceService() + .checkTopicOwnership(TopicName.get(changeEventTopicName)) + .exceptionally(ex -> false).join(); + if (!bundleExists){ + finished = false; + break; + } + CompletableFuture> completableFuture = + pulsar.getBrokerService().getTopic(changeEventTopicName, false); + if (completableFuture == null){ + finished = false; + break; + } + Optional optionalTopic = completableFuture.get(); + if (!optionalTopic.isPresent()){ + finished = false; + break; + } + PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get(); + if (!changeEventTopic.isCompactionEnabled()){ + break; + } + if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){ + finished = false; + break; + } + } + return finished; + }); + int retryTimes = 3; + while (true) { + try { + admin.namespaces().deleteNamespace(ns, force); + break; + } catch (PulsarAdminException ex) { + // Do retry only if topic fenced. + if (ex.getStatusCode() == 500 && ex.getMessage().contains("TopicFencedException")){ + if (--retryTimes > 0){ + continue; + } else { + throw ex; + } + } + throw ex; + } + } + } + private static final Logger LOG = LoggerFactory.getLogger(BrokerTestBase.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 5e27d9f647b93..8566a43f7f73d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -998,7 +998,7 @@ public void testMessageExpiry() throws Exception { consumer.close(); admin.topics().deleteSubscription(topicName, subName); admin.topics().delete(topicName); - admin.namespaces().deleteNamespace(namespaceName); + deleteNamespaceGraceFully(namespaceName, false); } @Test @@ -1090,7 +1090,7 @@ public void testMessageExpiryWithTopicMessageTTL() throws Exception { consumer.close(); admin.topics().deleteSubscription(topicName, subName); admin.topics().delete(topicName); - admin.namespaces().deleteNamespace(namespaceName, true); + deleteNamespaceGraceFully(namespaceName, true); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 500); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 48015c50cc67c..d43221a64e275 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -113,7 +113,7 @@ public void testDeleteNamespaceBeforeCommit() throws Exception { outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send(); try { - admin.namespaces().deleteNamespace(NAMESPACE1, true); + deleteNamespaceGraceFully(NAMESPACE1, true); } catch (Exception ignore) {} tnx.commit().get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index d7a828b1b9f6c..dd5c0f29237be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.NamespaceName; @@ -326,4 +327,12 @@ protected final void internalCleanup() { log.warn("Failed to clean up mocked pulsar service:", e); } } + + /** + * see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)} + */ + protected void deleteNamespaceGraceFully(String ns, boolean force) + throws Exception { + BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsarServiceList.get(0), admin); + } }