diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 31dda5539a3e2..4d48f9c627e76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1138,13 +1138,6 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, .map(PersistentSubscription::getName).toList(); return FutureUtil.failedFuture( new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); - } else if (TopicName.get(topic).isPartitioned() - && (getProducers().size() > 0 || getNumberOfConsumers() > 0) - && getBrokerService().isAllowAutoTopicCreation(topic)) { - // to avoid inconsistent metadata as a result - return FutureUtil.failedFuture( - new TopicBusyException("Partitioned topic has active consumers or producers and " - + "auto-creation of topic is allowed")); } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 5a0bde6f91386..2cbff955ecff1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -28,8 +26,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -37,16 +33,9 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.TopicsImpl; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -65,7 +54,6 @@ protected int numberOfAdditionalBrokers() { @Override protected void doInitConf() throws Exception { super.doInitConf(); - this.conf.setManagedLedgerMaxEntriesPerLedger(10); } @Override @@ -134,90 +122,4 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws Assert.assertEquals(lookupResultSet.size(), 1); } - @Test - public void testForceDeletePartitionedTopicWithSub() throws Exception { - final int numPartitions = 10; - TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); - admin.tenants().createTenant("tenant-xyz", tenantInfo); - admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); - - admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc", - AutoTopicCreationOverride.builder() - .allowAutoTopicCreation(true) - .topicType("partitioned") - .defaultNumPartitions(5) - .build()); - - RetentionPolicies retention = new RetentionPolicies(10, 10); - admin.namespaces().setRetention("tenant-xyz/ns-abc", retention); - final String topic = "persistent://tenant-xyz/ns-abc/topic-" - + RandomStringUtils.randomAlphabetic(5) - + "-testDeletePartitionedTopicWithSub"; - final String subscriptionName = "sub"; - ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); - - log.info("Creating producer and consumer"); - Consumer consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName(subscriptionName) - .subscribe(); - - Producer producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create(); - - log.info("producing messages"); - for (int i = 0; i < numPartitions * 100; ++i) { - producer.newMessage() - .key("" + i) - .value("value-" + i) - .send(); - } - producer.flush(); - producer.close(); - - log.info("consuming some messages"); - for (int i = 0; i < numPartitions * 5; i++) { - Message m = consumer.receive(1, TimeUnit.MINUTES); - } - - log.info("trying to delete the topic"); - try { - admin.topics().deletePartitionedTopic(topic, true); - fail("expected PulsarAdminException.NotFoundException"); - } catch (PulsarAdminException e) { - assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers")); - } - - // check that metadata is still consistent - assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc") - .stream().filter(t -> t.contains(topic)).count()); - assertEquals(numPartitions, - pulsar.getPulsarResources().getTopicResources() - .getExistingPartitions(TopicName.getPartitionedTopicName(topic)) - .get() - .stream().filter(t -> t.contains(topic)).count()); - assertTrue(admin.topics() - .getPartitionedTopicList("tenant-xyz/ns-abc") - .contains(topic)); - - log.info("closing producer and consumer"); - producer.close(); - consumer.close(); - - log.info("trying to delete the topic again"); - admin.topics().deletePartitionedTopic(topic, true); - - assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc") - .stream().filter(t -> t.contains(topic)).count()); - assertEquals(0, - pulsar.getPulsarResources().getTopicResources() - .getExistingPartitions(TopicName.getPartitionedTopicName(topic)) - .get() - .stream().filter(t -> t.contains(topic)).count()); - assertFalse(admin.topics() - .getPartitionedTopicList("tenant-xyz/ns-abc") - .contains(topic)); - - log.info("trying to create the topic again"); - ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 8caf3a47dd3d8..c23407bb447d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1262,7 +1262,6 @@ public void testSubscribeRate() throws Exception { pulsarClient.updateServiceUrl(lookupUrl.toString()); Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); pulsar.getConfiguration().setAuthorizationEnabled(true); - consumer.close(); admin.topics().deletePartitionedTopic(topicName, true); admin.namespaces().deleteNamespace(namespace); admin.tenants().deleteTenant("my-tenants"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 338ffc0180702..604abd8d7095f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -30,7 +30,6 @@ import io.netty.util.HashedWheelTimer; import lombok.Cleanup; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.PulsarClient; @@ -332,12 +331,7 @@ public void topicDeleted(String ignored, boolean partitioned) throws Exception { p1.send("msg-1"); if (partitioned) { - try { - admin.topics().deletePartitionedTopic(topic, true); - fail("expected error because partitioned topic has active producer"); - } catch (PulsarAdminException.ServerSideErrorException e) { - // expected - } + admin.topics().deletePartitionedTopic(topic, true); } else { admin.topics().delete(topic, true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 35679711fde8d..8d068d6511426 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -1171,6 +1171,34 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t } } + @Test(timeOut = testTimeout) + public void testAutoDiscoverMultiTopicsPartitions() throws Exception { + final String topicName = "persistent://public/default/issue-9585"; + admin.topics().createPartitionedTopic(topicName, 3); + PatternMultiTopicsConsumerImpl consumer = (PatternMultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topicsPattern(topicName) + .subscriptionName("sub-issue-9585") + .subscribe(); + + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3); + Assert.assertEquals(consumer.getConsumers().size(), 3); + + admin.topics().deletePartitionedTopic(topicName, true); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0); + Assert.assertEquals(consumer.getConsumers().size(), 0); + }); + + admin.topics().createPartitionedTopic(topicName, 7); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7); + Assert.assertEquals(consumer.getConsumers().size(), 7); + }); + } + + @Test(timeOut = testTimeout) public void testPartitionsUpdatesForMultipleTopics() throws Exception { final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java deleted file mode 100644 index 0adb414e8f4f6..0000000000000 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.topics; - -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.tests.integration.docker.ContainerExecException; -import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; -import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; -import org.testng.annotations.Test; - -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.fail; - -/** - * Test cases for compaction. - */ -@Slf4j -public class TestTopicDeletion extends PulsarTestSuite { - - final private boolean unload = false; - final private int numBrokers = 2; - - public void setupCluster() throws Exception { - brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10"); - brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false"); - brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false"); - this.setupCluster(""); - } - - protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster( - String clusterName, - PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { - specBuilder.numBrokers(numBrokers); - specBuilder.enableContainerLog(true); - return specBuilder; - } - - @Test(dataProvider = "ServiceUrls", timeOut=300_000) - public void testPartitionedTopicForceDeletion(Supplier serviceUrl) throws Exception { - - log.info("Creating tenant and namespace"); - - final String tenant = "test-partitioned-topic-" + randomName(4); - final String namespace = tenant + "/ns1"; - final String topic = "persistent://" + namespace + "/partitioned-topic"; - final int numPartitions = numBrokers * 3; - final int numKeys = numPartitions * 50; - final String subscriptionName = "sub1"; - - this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin"); - - this.createNamespace(namespace); - - pulsarCluster.runAdminCommandOnAnyBroker("namespaces", - "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace); - - pulsarCluster.runAdminCommandOnAnyBroker("namespaces", - "set-retention", "--size", "100M", "--time", "100m", namespace); - - this.createPartitionedTopic(topic, numPartitions); - - try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { - - log.info("Creating consumer"); - Consumer consumer = client.newConsumer() - .topic(topic) - .subscriptionName(subscriptionName) - .subscribe(); - - log.info("Producing messages"); - try(Producer producer = client.newProducer() - .topic(topic) - .create() - ) { - for (int i = 0; i < numKeys; i++) { - producer.newMessage() - .key("" + i) - .value(("value-" + i).getBytes(UTF_8)) - .sendAsync(); - } - producer.flush(); - log.info("Successfully wrote {} values", numKeys); - } - - log.info("Consuming half of the messages"); - for (int i = 0; i < numKeys / 2; i++) { - Message m = consumer.receive(1, TimeUnit.MINUTES); - log.info("Read value {}", m.getKey()); - } - - if (unload) { - log.info("Unloading topic"); - pulsarCluster.runAdminCommandOnAnyBroker("topics", - "unload", topic); - } - - ContainerExecResult res; - log.info("Deleting the topic"); - try { - res = pulsarCluster.runAdminCommandOnAnyBroker("topics", - "delete-partitioned-topic", "--force", topic); - assertNotEquals(0, res.getExitCode()); - } catch (ContainerExecException e) { - log.info("Second delete failed with ContainerExecException, could be ok", e); - if (!e.getMessage().contains("with error code 1")) { - fail("Expected different error code"); - } - } - - log.info("Close the consumer and delete the topic again"); - consumer.close(); - - res = pulsarCluster.runAdminCommandOnAnyBroker("topics", - "delete-partitioned-topic", "--force", topic); - assertNotEquals(0, res.getExitCode()); - - Thread.sleep(5000); - // should succeed - log.info("Creating the topic again"); - this.createPartitionedTopic(topic, numBrokers * 2); - } - } - - - private ContainerExecResult createTenantName(final String tenantName, - final String allowedClusterName, - final String adminRoleName) throws Exception { - ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( - "tenants", "create", "--allowed-clusters", allowedClusterName, - "--admin-roles", adminRoleName, tenantName); - assertEquals(0, result.getExitCode()); - return result; - } - - private ContainerExecResult createNamespace(final String Ns) throws Exception { - ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( - "namespaces", - "create", - "--clusters", - pulsarCluster.getClusterName(), Ns); - assertEquals(0, result.getExitCode()); - return result; - } - - private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions) - throws Exception { - ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( - "topics", - "create-partitioned-topic", - "--partitions", "" + numPartitions, - partitionedTopicName); - assertEquals(0, result.getExitCode()); - return result; - } - - -}