From 325cada6fad73797be968f7d59f63fb506ffcbea Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Tue, 3 Aug 2021 20:43:35 +0800 Subject: [PATCH 1/5] add test for auto-created partitioned system topic --- .../systopic/PartitionedSystemTopicTest.java | 69 +++++++++++++++++++ .../common/events/EventsTopicNames.java | 8 +-- 2 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java new file mode 100644 index 0000000000000..bd4ef7870da9f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.systopic; + +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.NamespaceName; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class PartitionedSystemTopicTest extends BrokerTestBase { + + static final int PARTITIONS = 5; + + @BeforeMethod + @Override + protected void setup() throws Exception { + resetConfig(); + conf.setAllowAutoTopicCreation(false); + conf.setAllowAutoTopicCreationType("partitioned"); + conf.setDefaultNumPartitions(PARTITIONS); + + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testAutoCreatedPartitionedSystemTopic() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); + SystemTopicClient.Reader reader = systemTopicClientForNamespace.newReader(); + + int partitions = admin.topics().getPartitionedTopicMetadata( + String.format("persistent://%s/%s", ns, EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions; + Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1); + Assert.assertEquals(partitions, PARTITIONS); + Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 3bbf64ca5c9ec..56564043aa91f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -45,12 +45,6 @@ public class EventsTopicNames { Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT)); public static boolean checkTopicIsEventsNames(TopicName topicName) { - String name; - if (topicName.isPartitioned()) { - name = TopicName.get(topicName.getPartitionedTopicName()).getLocalName(); - } else { - name = topicName.getLocalName(); - } - return EVENTS_TOPIC_NAMES.contains(name); + return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); } } From 18800c70e9b2c2e8d31538c4bec150f0f3551b84 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Wed, 11 Aug 2021 19:13:50 +0800 Subject: [PATCH 2/5] avoid duplicate deletion of the same schema --- .../pulsar/broker/admin/impl/NamespacesBase.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4048b4ac77627..d000a35bef160 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -415,9 +415,23 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo try { // firstly remove all topics including system topics if (!topics.isEmpty()) { + Set partitionedTopics = new HashSet<>(); + for (String topic : topics) { try { - futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + TopicName topicName = TopicName.get(topic); + if (topicName.isPartitioned()) { + String partitionedTopic = topicName.getPartitionedTopicName(); + if (!partitionedTopics.contains(partitionedTopic)) { + // Distinguish partitioned topic to avoid duplicate deletion of the same schema + futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync( + partitionedTopic, true, true)); + partitionedTopics.add(partitionedTopic); + } + } else { + futures.add(pulsar().getAdminClient().topics().deleteAsync( + topic, true, true)); + } } catch (Exception e) { log.error("[{}] Failed to force delete topic {}", clientAppId(), topic, e); asyncResponse.resume(new RestException(e)); From 8de0b3d185b080311b63404db8e5ba19cfb918df Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Fri, 13 Aug 2021 11:50:18 +0800 Subject: [PATCH 3/5] supplement a test case for forced ns deletion --- .../broker/admin/impl/NamespacesBase.java | 15 ++++- .../pulsar/broker/admin/AdminApiTest2.java | 56 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d000a35bef160..91803e33e9685 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -416,6 +416,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo // firstly remove all topics including system topics if (!topics.isEmpty()) { Set partitionedTopics = new HashSet<>(); + Set nonPartitionedTopics = new HashSet<>(); for (String topic : topics) { try { @@ -431,12 +432,22 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } else { futures.add(pulsar().getAdminClient().topics().deleteAsync( topic, true, true)); + nonPartitionedTopics.add(topic); } } catch (Exception e) { - log.error("[{}] Failed to force delete topic {}", clientAppId(), topic, e); - asyncResponse.resume(new RestException(e)); + String errorMessage = String.format("Failed to force delete topic %s, " + + "but the previous deletion command of partitioned-topics:%s " + + "and non-partitioned-topics:%s have been sent out asynchronously. Reason: %s", + topic, partitionedTopics, nonPartitionedTopics, e.getCause()); + log.error("[{}] {}", clientAppId(), errorMessage, e); + asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, errorMessage)); + return; } } + + log.info("Successfully send deletion command of partitioned-topics:{} " + + "and non-partitioned-topics:{} in namespace:{}.", + partitionedTopics, nonPartitionedTopics, namespaceName); } // forcefully delete namespace bundles NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index ce6814cea3672..556305df71ca2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -36,6 +36,7 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import javax.ws.rs.core.Response.Status; import lombok.Cleanup; @@ -1472,6 +1474,60 @@ public void testForceDeleteNamespace() throws Exception { } } + @Test + public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception { + conf.setForceDeleteNamespaceAllowed(true); + final String ns = "prop-xyz/distinguish-topic-type-ns"; + final String exNs = "prop-xyz/ex-distinguish-topic-type-ns"; + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().createNamespace(exNs, 2); + + final String p1 = "persistent://" + ns + "/p1"; + final String p5 = "persistent://" + ns + "/p5"; + final String np = "persistent://" + ns + "/np"; + + admin.topics().createPartitionedTopic(p1, 1); + admin.topics().createPartitionedTopic(p5, 5); + admin.topics().createNonPartitionedTopic(np); + + final String exNp = "persistent://" + exNs + "/np"; + admin.topics().createNonPartitionedTopic(exNp); + // insert an invalid topic name + pulsar.getLocalMetadataStore().put( + "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join(); + + List topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get(); + List exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get(); + + // ensure that the topic list contains all the topics + List allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString())); + for (int i = 0; i < 5; i++) { + allTopics.add(TopicName.get(p5).getPartition(i).toString()); + } + Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0); + Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/")); + // partition num = p1 + p5 + np + Assert.assertEquals(topics.size(), 1 + 5 + 1); + Assert.assertEquals(exTopics.size(), 1 + 1); + + admin.namespaces().deleteNamespace(ns, true); + Arrays.asList(p1, p5, np).forEach(t -> { + try { + admin.schemas().getSchemaInfo(t); + } catch (PulsarAdminException e) { + // all the normal topics' schemas have been deleted + Assert.assertEquals(e.getStatusCode(), 404); + } + }); + + try { + admin.namespaces().deleteNamespace(exNs, true); + fail("Should fail due to invalid topic"); + } catch (Exception e) { + //ok + } + } + @Test public void testUpdateClusterWithProxyUrl() throws Exception { ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); From 01af2130cb8d5d47599244cc7856073b855c0904 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Mon, 16 Aug 2021 16:03:01 +0800 Subject: [PATCH 4/5] fix checkstyle --- .../pulsar/broker/admin/impl/NamespacesBase.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 91803e33e9685..f523c925da848 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -435,9 +435,10 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo nonPartitionedTopics.add(topic); } } catch (Exception e) { - String errorMessage = String.format("Failed to force delete topic %s, " + - "but the previous deletion command of partitioned-topics:%s " + - "and non-partitioned-topics:%s have been sent out asynchronously. Reason: %s", + String errorMessage = String.format("Failed to force delete topic %s, " + + "but the previous deletion command of partitioned-topics:%s " + + "and non-partitioned-topics:%s have been sent out asynchronously. " + + "Reason: %s", topic, partitionedTopics, nonPartitionedTopics, e.getCause()); log.error("[{}] {}", clientAppId(), errorMessage, e); asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, errorMessage)); @@ -445,8 +446,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } } - log.info("Successfully send deletion command of partitioned-topics:{} " + - "and non-partitioned-topics:{} in namespace:{}.", + log.info("Successfully send deletion command of partitioned-topics:{} " + + "and non-partitioned-topics:{} in namespace:{}.", partitionedTopics, nonPartitionedTopics, namespaceName); } // forcefully delete namespace bundles From 43af5c33cf6e3ab9d349952aa019f724fd307e07 Mon Sep 17 00:00:00 2001 From: wuzhanpeng Date: Fri, 20 Aug 2021 14:08:53 +0800 Subject: [PATCH 5/5] modify log level --- .../apache/pulsar/broker/admin/impl/NamespacesBase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f523c925da848..eb2752187cc6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -446,9 +446,11 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } } - log.info("Successfully send deletion command of partitioned-topics:{} " - + "and non-partitioned-topics:{} in namespace:{}.", - partitionedTopics, nonPartitionedTopics, namespaceName); + if (log.isDebugEnabled()) { + log.debug("Successfully send deletion command of partitioned-topics:{} " + + "and non-partitioned-topics:{} in namespace:{}.", + partitionedTopics, nonPartitionedTopics, namespaceName); + } } // forcefully delete namespace bundles NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()