From d18fcd014472ef0d8158efbe3f785a435511dcd6 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Thu, 20 Jul 2023 17:23:46 +0800 Subject: [PATCH 1/3] [fix][broker] Inconsistent behavior for topic auto creation --- .../pulsar/broker/service/BrokerService.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 668195c4b8065..8dc94ca874094 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3023,12 +3023,9 @@ public CompletableFuture fetchPartitionedTopicMetadata if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } - Optional policies = - pulsar.getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(topicName.getNamespaceObject()); - return pulsar.getNamespaceService().checkTopicExists(topicName) - .thenCompose(topicExists -> { - return fetchPartitionedTopicMetadataAsync(topicName) + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> pulsar.getNamespaceService().checkTopicExists(topicName) + .thenCompose(topicExists -> fetchPartitionedTopicMetadataAsync(topicName) .thenCompose(metadata -> { CompletableFuture future = new CompletableFuture<>(); @@ -3041,7 +3038,7 @@ public CompletableFuture fetchPartitionedTopicMetadata && !topicExists && !topicName.isPartitioned() && pulsar.getBrokerService() - .isDefaultTopicTypePartitioned(topicName, policies)) { + .isDefaultTopicTypePartitioned(topicName, policies)) { isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> { if (allowed) { pulsar.getBrokerService() @@ -3050,7 +3047,7 @@ public CompletableFuture fetchPartitionedTopicMetadata .exceptionally(ex -> { if (ex.getCause() instanceof MetadataStoreException - .AlreadyExistsException) { + .AlreadyExistsException) { // The partitioned topic might be created concurrently fetchPartitionedTopicMetadataAsync(topicName) .whenComplete((metadata2, ex2) -> { @@ -3078,8 +3075,7 @@ public CompletableFuture fetchPartitionedTopicMetadata }); return future; - }); - }); + }))); } @SuppressWarnings("deprecation") From 7990f01c83d194f0aee5ce1e585e793292cf3043 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Fri, 21 Jul 2023 01:25:43 +0800 Subject: [PATCH 2/3] Add Test --- .../admin/HierarchyTopicAutoCreationTest.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java new file mode 100644 index 0000000000000..5421878722a96 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import lombok.Cleanup; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.UUID; + +@Test(groups = "broker-admin") +@Slf4j +public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase { + + @Override + @BeforeMethod + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(invocationCount = 3) + @SneakyThrows + public void testPartitionedTopicAutoCreation() { + // Create namespace + final String namespace = "public/testPartitionedTopicAutoCreation"; + admin.namespaces().createNamespace(namespace); + // Set policies + final AutoTopicCreationOverride expectedPolicies = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(1) + .build(); + admin.namespaces().setAutoTopicCreation(namespace, expectedPolicies); + // Double-check the policies + final AutoTopicCreationOverride nsAutoTopicCreationOverride = admin.namespaces() + .getAutoTopicCreation(namespace); + Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies); + // Background invalidate cache + final MetadataCache nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache(); + final Thread t1 = new Thread(() -> { + while (true) { + nsCache.invalidate("/admin/policies/" + namespace); + } + }); + t1.start(); + + // trigger auto-creation + final String topicName = "persistent://" + namespace + "/test-" + UUID.randomUUID(); + @Cleanup final Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + final List topics = admin.topics().getList(namespace); + Assert.assertEquals(topics.size(), 1); // expect only one topic + Assert.assertEquals(topics.get(0), + TopicName.get(topicName).getPartition(0).toString()); // expect partitioned topic + + // double-check policies + final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace); + Assert.assertEquals(actualPolicies2, expectedPolicies); + + t1.interrupt(); + } +} From 6d43e5644d38604d5508fb244ced7412389dc2d0 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Fri, 21 Jul 2023 01:27:32 +0800 Subject: [PATCH 3/3] Fix checkstyle --- .../impl}/HierarchyTopicAutoCreationTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/admin => client/impl}/HierarchyTopicAutoCreationTest.java (98%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java index 5421878722a96..8c93b293c41a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/HierarchyTopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.admin; +package org.apache.pulsar.client.impl; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.UUID; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.common.naming.TopicName; @@ -32,10 +34,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.List; -import java.util.UUID; - -@Test(groups = "broker-admin") +@Test(groups = "broker-impl") @Slf4j public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {