Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1138,13 +1138,6 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (TopicName.get(topic).isPartitioned()
&& (getProducers().size() > 0 || getNumberOfConsumers() > 0)
&& getBrokerService().isAllowAutoTopicCreation(topic)) {
// to avoid inconsistent metadata as a result
return FutureUtil.failedFuture(
new TopicBusyException("Partitioned topic has active consumers or producers and "
+ "auto-creation of topic is allowed"));
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

Expand All @@ -28,25 +26,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -65,7 +54,6 @@ protected int numberOfAdditionalBrokers() {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}

@Override
Expand Down Expand Up @@ -134,90 +122,4 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws
Assert.assertEquals(lookupResultSet.size(), 1);
}

@Test
public void testForceDeletePartitionedTopicWithSub() throws Exception {
final int numPartitions = 10;
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));

admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(5)
.build());

RetentionPolicies retention = new RetentionPolicies(10, 10);
admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
final String topic = "persistent://tenant-xyz/ns-abc/topic-"
+ RandomStringUtils.randomAlphabetic(5)
+ "-testDeletePartitionedTopicWithSub";
final String subscriptionName = "sub";
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();

log.info("Creating producer and consumer");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subscriptionName)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();

log.info("producing messages");
for (int i = 0; i < numPartitions * 100; ++i) {
producer.newMessage()
.key("" + i)
.value("value-" + i)
.send();
}
producer.flush();
producer.close();

log.info("consuming some messages");
for (int i = 0; i < numPartitions * 5; i++) {
Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
}

log.info("trying to delete the topic");
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected PulsarAdminException.NotFoundException");
} catch (PulsarAdminException e) {
assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers"));
}

// check that metadata is still consistent
assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(numPartitions,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertTrue(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("closing producer and consumer");
producer.close();
consumer.close();

log.info("trying to delete the topic again");
admin.topics().deletePartitionedTopic(topic, true);

assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(0,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertFalse(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("trying to create the topic again");
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,6 @@ public void testSubscribeRate() throws Exception {
pulsarClient.updateServiceUrl(lookupUrl.toString());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
pulsar.getConfiguration().setAuthorizationEnabled(true);
consumer.close();
admin.topics().deletePartitionedTopic(topicName, true);
admin.namespaces().deleteNamespace(namespace);
admin.tenants().deleteTenant("my-tenants");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -332,12 +331,7 @@ public void topicDeleted(String ignored, boolean partitioned) throws Exception {
p1.send("msg-1");

if (partitioned) {
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected error because partitioned topic has active producer");
} catch (PulsarAdminException.ServerSideErrorException e) {
// expected
}
admin.topics().deletePartitionedTopic(topic, true);
} else {
admin.topics().delete(topic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,34 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t
}
}

@Test(timeOut = testTimeout)
public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
final String topicName = "persistent://public/default/issue-9585";
admin.topics().createPartitionedTopic(topicName, 3);
PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(topicName)
.subscriptionName("sub-issue-9585")
.subscribe();

Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
Assert.assertEquals(consumer.getConsumers().size(), 3);

admin.topics().deletePartitionedTopic(topicName, true);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
Assert.assertEquals(consumer.getConsumers().size(), 0);
});

admin.topics().createPartitionedTopic(topicName, 7);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
Assert.assertEquals(consumer.getConsumers().size(), 7);
});
}


@Test(timeOut = testTimeout)
public void testPartitionsUpdatesForMultipleTopics() throws Exception {
final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
Expand Down

This file was deleted.