Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.pulsar.broker.admin;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -47,7 +45,6 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
Expand Down Expand Up @@ -109,7 +106,6 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -1474,7 +1470,7 @@ public void testDeleteTenant() throws Exception {
assertFalse(admin.topics().getList(namespace).isEmpty());

try {
admin.namespaces().deleteNamespace(namespace, false);
deleteNamespaceGraceFully(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
Expand All @@ -1485,7 +1481,7 @@ public void testDeleteTenant() throws Exception {
assertTrue(admin.topics().getList(namespace).isEmpty());

// delete namespace
admin.namespaces().deleteNamespace(namespace, false);
deleteNamespaceGraceFully(namespace, false);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

Expand Down Expand Up @@ -1565,11 +1561,8 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());

// Wait for change event topic and compaction create finish.
awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic));

try {
admin.namespaces().deleteNamespace(namespace, false);
deleteNamespaceGraceFully(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
Expand All @@ -1580,7 +1573,7 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
assertTrue(admin.topics().getList(namespace).isEmpty());

// delete namespace
admin.namespaces().deleteNamespace(namespace, false);
deleteNamespaceGraceFully(namespace, false);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

Expand All @@ -1598,49 +1591,6 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
setup();
}

private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception {
if (!pulsar.getConfiguration().isSystemTopicEnabled()){
return;
}
// Trigger change event topic create.
SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
// Wait for change event topic and compaction create finish.
String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
ArrayList<String> expectChangeEventTopics = new ArrayList<>();
if ("non-partitioned".equals(allowAutoTopicCreationType)){
String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
expectChangeEventTopics.add(t);
} else {
for (int i = 0; i < defaultNumPartitions; i++){
String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
expectChangeEventTopics.add(t);
}
}
Awaitility.await().until(() -> {
boolean finished = true;
for (String changeEventTopicName : expectChangeEventTopics){
CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false);
if (completableFuture == null){
finished = false;
}
Optional<Topic> optionalTopic = completableFuture.get();
if (!optionalTopic.isPresent()){
finished = false;
}
PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
if (!changeEventTopic.isCompactionEnabled()){
continue;
}
if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
finished = false;
}
}
return finished;
});
}

@Test
public void testDeleteNamespaceWithTopicPolicies() throws Exception {
cleanup();
Expand Down Expand Up @@ -1675,7 +1625,7 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception {
});
producer.close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
deleteNamespaceGraceFully(namespace, false);
Awaitility.await().untilAsserted(() -> {
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
});
Expand Down Expand Up @@ -1886,7 +1836,7 @@ public void testForceDeleteNamespace() throws Exception {
final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
pulsarClient.newProducer(Schema.DOUBLE).topic(topic).create().close();
Awaitility.await().untilAsserted(() -> assertNotNull(admin.schemas().getSchemaInfo(topic)));
admin.namespaces().deleteNamespace(namespaceName, true);
deleteNamespaceGraceFully(namespaceName, true);
try {
admin.schemas().getSchemaInfo(topic);
} catch (PulsarAdminException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void clusters() throws Exception {
Awaitility.await()
.untilAsserted(() -> assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")));

admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
admin.clusters().deleteCluster("test");
assertEquals(admin.clusters().getClusters(), new ArrayList<>());

Expand Down Expand Up @@ -500,7 +500,7 @@ public void brokers() throws Exception {
String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get()));
Assert.assertEquals(nsMap2.size(), 2);

admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
admin.clusters().deleteCluster("test");
assertEquals(admin.clusters().getClusters(), new ArrayList<>());
}
Expand Down Expand Up @@ -702,7 +702,7 @@ public void testGetDynamicLocalConfiguration() throws Exception {
}

@Test
public void properties() throws PulsarAdminException {
public void properties() throws Exception {
try {
admin.tenants().getTenantInfo("does-not-exist");
fail("should have failed");
Expand Down Expand Up @@ -731,7 +731,7 @@ public void properties() throws PulsarAdminException {
assertEquals(e.getStatusCode(), 409);
assertEquals(e.getMessage(), "The tenant still has active namespaces");
}
admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
admin.tenants().deleteTenant("prop-xyz");
assertEquals(admin.tenants().getTenants(), new ArrayList<>());

Expand Down Expand Up @@ -760,7 +760,7 @@ public void namespaces() throws Exception {
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getNumBundles(), 4);
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getBoundaries().size(), 5);

admin.namespaces().deleteNamespace("prop-xyz/ns3");
deleteNamespaceGraceFully("prop-xyz/ns3", false);

try {
admin.namespaces().createNamespace("non-existing/ns1");
Expand Down Expand Up @@ -834,7 +834,7 @@ public void namespaces() throws Exception {
}
assertTrue(i < 10);

admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
assertEquals(admin.namespaces().getNamespaces("prop-xyz"), Lists.newArrayList("prop-xyz/ns2"));

try {
Expand Down Expand Up @@ -1260,7 +1260,7 @@ public void testGetPartitionedStatsInternal() throws Exception {

@Test(dataProvider = "numBundles")
public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {
admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));

Expand All @@ -1272,7 +1272,7 @@ public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {

assertEquals(admin.namespaces().getTopics("prop-xyz/ns1-bundles"), new ArrayList<>());

admin.namespaces().deleteNamespace("prop-xyz/ns1-bundles");
deleteNamespaceGraceFully("prop-xyz/ns1-bundles", false);
assertEquals(admin.namespaces().getNamespaces("prop-xyz", "test"), new ArrayList<>());
}

Expand Down Expand Up @@ -1359,14 +1359,14 @@ public void testDeleteNamespaceForcefully() throws Exception {
assertFalse(admin.topics().getList(namespace).isEmpty());

try {
admin.namespaces().deleteNamespace(namespace, false);
deleteNamespaceGraceFully(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}

// delete namespace forcefully
admin.namespaces().deleteNamespace(namespace, true);
deleteNamespaceGraceFully(namespace, true);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

Expand Down Expand Up @@ -2233,7 +2233,7 @@ public void testBackwardCompatibility() throws Exception {
assertEquals(result.someNewIntField, 0);
assertNull(result.someNewString);

admin.namespaces().deleteNamespace("prop-xyz/ns1");
deleteNamespaceGraceFully("prop-xyz/ns1", false);
admin.tenants().deleteTenant("prop-xyz");
assertEquals(admin.tenants().getTenants(), new ArrayList<>());
}
Expand Down Expand Up @@ -3144,9 +3144,9 @@ public void testSubscriptionExpiry() throws Exception {
admin.topics().delete(topic1);
admin.topics().delete(topic2);
admin.topics().delete(topic3);
admin.namespaces().deleteNamespace(namespace1);
admin.namespaces().deleteNamespace(namespace2);
admin.namespaces().deleteNamespace(namespace3);
deleteNamespaceGraceFully(namespace1, false);
deleteNamespaceGraceFully(namespace2, false);
deleteNamespaceGraceFully(namespace3, false);
}

@Test
Expand All @@ -3159,11 +3159,11 @@ public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");

admin.namespaces().createNamespace(ns, 24);
admin.namespaces().deleteNamespace(ns);
deleteNamespaceGraceFully(ns, false);

// Re-create and re-delete
admin.namespaces().createNamespace(ns, 32);
admin.namespaces().deleteNamespace(ns);
deleteNamespaceGraceFully(ns, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void testDeleteNamespace() throws Exception {
admin.topics().delete("tenant1/ns1/foobar", true);

log.info("Deleting namespace");
admin.namespaces().deleteNamespace("tenant1/ns1");
deleteNamespaceGraceFully("tenant1/ns1", false, admin);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected void setup() throws Exception {
protected void cleanup() throws Exception {
// cleanup.
admin.topics().delete(topicName);
admin.namespaces().deleteNamespace(namespaceName);
deleteNamespaceGraceFully(namespaceName, false);
admin.tenants().deleteTenant(tenantName);
admin.clusters().deleteCluster(clusterName);
// super cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
Expand Down Expand Up @@ -652,5 +653,21 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c

}

/**
* see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)}
*/
protected void deleteNamespaceGraceFully(String ns, boolean force)
throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsar, admin);
}

/**
* see {@link BrokerTestBase#deleteNamespaceGraceFully(String, boolean, PulsarService, PulsarAdmin)}
*/
protected void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdmin admin)
throws Exception {
BrokerTestBase.deleteNamespaceGraceFully(ns, force, pulsar, admin);
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Sets;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -34,7 +33,6 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertTrue;
Expand All @@ -55,7 +53,7 @@ protected void cleanup() throws Exception {
}

@Test
public void testNamespaceBundleOwnershipListener() throws PulsarAdminException, InterruptedException, PulsarClientException {
public void testNamespaceBundleOwnershipListener() throws Exception {

final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicBoolean onLoad = new AtomicBoolean(false);
Expand Down Expand Up @@ -101,11 +99,11 @@ public void unLoad(NamespaceBundle bundle) {
Assert.assertTrue(onLoad.get());
Assert.assertTrue(unLoad.get());
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
deleteNamespaceGraceFully(namespace, false);
}

@Test
public void testGetAllPartitions() throws PulsarAdminException, ExecutionException, InterruptedException {
public void testGetAllPartitions() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
Expand All @@ -122,11 +120,11 @@ public void testGetAllPartitions() throws PulsarAdminException, ExecutionExcepti
}

admin.topics().deletePartitionedTopic(topicName);
admin.namespaces().deleteNamespace(namespace);
deleteNamespaceGraceFully(namespace, false);
}

@Test
public void testNamespaceBundleLookupOnwershipListener() throws PulsarAdminException, InterruptedException,
public void testNamespaceBundleLookupOnwershipListener() throws Exception,
PulsarClientException {
final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicInteger onLoad = new AtomicInteger(0);
Expand Down Expand Up @@ -172,6 +170,6 @@ public boolean test(NamespaceBundle namespaceBundle) {
Assert.assertEquals(onLoad.get(), 1);
Assert.assertEquals(unLoad.get(), 1);
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
deleteNamespaceGraceFully(namespace, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testResourceGroupAttachToNamespace() throws Exception {
assertNull(pulsar.getResourceGroupServiceManager()
.getNamespaceResourceGroup(NamespaceName.get(namespaceName))));

admin.namespaces().deleteNamespace(namespaceName);
deleteNamespaceGraceFully(namespaceName, false);
deleteResourceGroup(rgName);
}

Expand Down
Loading