Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class NamespaceResources extends BaseResources<Policies> {
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
private final MetadataStore localStore;

private final MetadataCache<LocalPolicies> localPoliciesCache;

Expand All @@ -60,6 +61,7 @@ public class NamespaceResources extends BaseResources<Policies> {
public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
this.localStore = localStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);

Expand Down Expand Up @@ -317,13 +319,13 @@ public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
return getStore().deleteRecursive(namespaceBundlePath);
return getLocalStore().deleteRecursive(namespaceBundlePath);
}

// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
return getStore().deleteRecursive(tenantBundlePath);
return getLocalStore().deleteRecursive(tenantBundlePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,60 @@

package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;


import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NamespaceResourcesTest {
private MetadataStore localStore;
private MetadataStore configurationStore;
private NamespaceResources namespaceResources;

private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

@BeforeMethod
public void setup() {
localStore = mock(MetadataStore.class);
configurationStore = mock(MetadataStore.class);
namespaceResources = new NamespaceResources(localStore, configurationStore, 30);
}

@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}

/**
* Test that the bundle-data node is deleted from the local stores.
*/
@Test
public void testDeleteBundleDataAsync() {
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
namespaceResources.deleteBundleDataAsync(ns);

String tenant="my-tenant";
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
namespaceResources.deleteBundleDataTenantAsync(tenant);

verify(localStore).deleteRecursive(namespaceBundlePath);
verify(localStore).deleteRecursive(tenantBundlePath);

assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -1424,6 +1425,8 @@ public void testDeleteTenant() throws Exception {
@Test
public void testDeleteNamespace() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
pulsar.getConfiguration().setMetadataStoreUrl("127.0.0.1:2181");
pulsar.getConfiguration().setConfigurationMetadataStoreUrl("127.0.0.1:2182");

String tenant = "test-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));
Expand All @@ -1443,6 +1446,29 @@ public void testDeleteNamespace() throws Exception {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());

final String managedLedgersPath = "/managed-ledgers/" + namespace;
final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
// Trigger bundle owned by brokers.
pulsarClient.newProducer().topic(topic).create().close();
// Trigger bundle data write to ZK.
Awaitility.await().untilAsserted(() -> {
boolean bundleDataWereWriten = false;
for (PulsarService ps : new PulsarService[]{pulsar, mockPulsarSetup.getPulsar()}) {
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper) ps.getLoadManager().get();
ModularLoadManagerImpl loadManagerImpl = (ModularLoadManagerImpl) loadManager.getLoadManager();
ps.getBrokerService().updateRates();
loadManagerImpl.updateLocalBrokerData();
loadManagerImpl.writeBundleDataOnZooKeeper();
bundleDataWereWriten = bundleDataWereWriten || ps.getLocalMetadataStore().exists(bundleDataPath).join();
}
assertTrue(bundleDataWereWriten);
});

// assert znode exists in metadata store
assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());


try {
deleteNamespaceGraceFully(namespace, false);
fail("should have failed due to namespace not empty");
Expand All @@ -1459,12 +1485,9 @@ public void testDeleteNamespace() throws Exception {
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());


final String managedLedgersPath = "/managed-ledgers/" + namespace;
// assert znode deleted in metadata store
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());


final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}

Expand Down