diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 7926677e5d29d..712846b6c9499 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -50,6 +50,7 @@ public class NamespaceResources extends BaseResources { private final IsolationPolicyResources isolationPolicies; private final PartitionedTopicResources partitionedTopicResources; private final MetadataStore configurationStore; + private final MetadataStore localStore; private final MetadataCache localPoliciesCache; @@ -60,6 +61,7 @@ public class NamespaceResources extends BaseResources { public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) { super(configurationStore, Policies.class, operationTimeoutSec); this.configurationStore = configurationStore; + this.localStore = localStore; isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec); partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec); @@ -317,13 +319,13 @@ public CompletableFuture clearPartitionedTopicTenantAsync(String tenant) { // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store public CompletableFuture deleteBundleDataAsync(NamespaceName ns) { final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString()); - return getStore().deleteRecursive(namespaceBundlePath); + return getLocalStore().deleteRecursive(namespaceBundlePath); } // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store public CompletableFuture deleteBundleDataTenantAsync(String tenant) { final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant); - return getStore().deleteRecursive(tenantBundlePath); + return getLocalStore().deleteRecursive(tenantBundlePath); } } \ No newline at end of file diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java index ba755572c214e..77e80a92026d0 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java @@ -19,12 +19,34 @@ package org.apache.pulsar.broker.resources; +import static org.apache.pulsar.broker.resources.BaseResources.joinPath; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; + + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class NamespaceResourcesTest { + private MetadataStore localStore; + private MetadataStore configurationStore; + private NamespaceResources namespaceResources; + + private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data"; + + @BeforeMethod + public void setup() { + localStore = mock(MetadataStore.class); + configurationStore = mock(MetadataStore.class); + namespaceResources = new NamespaceResources(localStore, configurationStore, 30); + } + @Test public void test_pathIsFromNamespace() { assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters")); @@ -32,4 +54,25 @@ public void test_pathIsFromNamespace() { assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant")); assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns")); } + + /** + * Test that the bundle-data node is deleted from the local stores. + */ + @Test + public void testDeleteBundleDataAsync() { + NamespaceName ns = NamespaceName.get("my-tenant/my-ns"); + String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString()); + namespaceResources.deleteBundleDataAsync(ns); + + String tenant="my-tenant"; + String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant); + namespaceResources.deleteBundleDataTenantAsync(tenant); + + verify(localStore).deleteRecursive(namespaceBundlePath); + verify(localStore).deleteRecursive(tenantBundlePath); + + assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath)); + assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath)); + } + } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 45b5134de6bb6..ab076b3e2fe85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -60,6 +60,7 @@ import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -1424,6 +1425,8 @@ public void testDeleteTenant() throws Exception { @Test public void testDeleteNamespace() throws Exception { pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + pulsar.getConfiguration().setMetadataStoreUrl("127.0.0.1:2181"); + pulsar.getConfiguration().setConfigurationMetadataStoreUrl("127.0.0.1:2182"); String tenant = "test-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); @@ -1443,6 +1446,29 @@ public void testDeleteNamespace() throws Exception { admin.topics().createPartitionedTopic(topic, 10); assertFalse(admin.topics().getList(namespace).isEmpty()); + final String managedLedgersPath = "/managed-ledgers/" + namespace; + final String bundleDataPath = "/loadbalance/bundle-data/" + namespace; + // Trigger bundle owned by brokers. + pulsarClient.newProducer().topic(topic).create().close(); + // Trigger bundle data write to ZK. + Awaitility.await().untilAsserted(() -> { + boolean bundleDataWereWriten = false; + for (PulsarService ps : new PulsarService[]{pulsar, mockPulsarSetup.getPulsar()}) { + ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper) ps.getLoadManager().get(); + ModularLoadManagerImpl loadManagerImpl = (ModularLoadManagerImpl) loadManager.getLoadManager(); + ps.getBrokerService().updateRates(); + loadManagerImpl.updateLocalBrokerData(); + loadManagerImpl.writeBundleDataOnZooKeeper(); + bundleDataWereWriten = bundleDataWereWriten || ps.getLocalMetadataStore().exists(bundleDataPath).join(); + } + assertTrue(bundleDataWereWriten); + }); + + // assert znode exists in metadata store + assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join()); + assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join()); + + try { deleteNamespaceGraceFully(namespace, false); fail("should have failed due to namespace not empty"); @@ -1459,12 +1485,9 @@ public void testDeleteNamespace() throws Exception { assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); - - final String managedLedgersPath = "/managed-ledgers/" + namespace; + // assert znode deleted in metadata store assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join()); - - final String bundleDataPath = "/loadbalance/bundle-data/" + namespace; assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join()); }