From 0f5c031f805a50f777264b9acad44af820a96ed7 Mon Sep 17 00:00:00 2001 From: congbo Date: Thu, 11 Nov 2021 18:01:06 +0800 Subject: [PATCH 1/5] [Transaction] Fix transaction system topic create in loop. --- .../apache/pulsar/broker/PulsarService.java | 9 +++++- .../admin/impl/PersistentTopicsBase.java | 8 +++++ .../broker/admin/v2/PersistentTopics.java | 2 ++ .../pulsar/broker/service/BrokerService.java | 9 ++++++ .../broker/transaction/TransactionTest.java | 32 +++++++++++++++++++ 5 files changed, 59 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index bca21ae476206..d37194828e21e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1649,7 +1649,7 @@ public void shutdownNow() { } - private static boolean isTransactionSystemTopic(TopicName topicName) { + public static boolean isTransactionSystemTopic(TopicName topicName) { String topic = topicName.toString(); return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) || topic.startsWith(TopicName.get(TopicDomain.persistent.value(), @@ -1657,6 +1657,13 @@ private static boolean isTransactionSystemTopic(TopicName topicName) { || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } + public static boolean isNotAllowedToCreateTopic(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); + } + @VisibleForTesting protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { return new BrokerService(pulsar, ioEventLoopGroup); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ef4510cd6ef51..3fac9ffbb6ae4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.broker.PulsarService.isNotAllowedToCreateTopic; import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign; import com.fasterxml.jackson.core.JsonProcessingException; @@ -244,6 +245,13 @@ protected void validateAdminAndClientPermission() { } } + protected void validateTopicAllowdToCreate(TopicName topicName) { + if (isNotAllowedToCreateTopic(topicName)) { + log.warn("Try to create a topic in the system topic format! {}", topicName); + throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!"); + } + } + public void validateAdminOperationOnTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); validateTopicOwnership(topicName, authoritative); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1318057839955..7185ced0cc58b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -234,6 +234,7 @@ public void createPartitionedTopic( validateGlobalNamespaceOwnership(); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); + validateTopicAllowToCreate(topicName); internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); @@ -267,6 +268,7 @@ public void createNonPartitionedTopic( validateNamespaceName(tenant, namespace); validateGlobalNamespaceOwnership(); validateTopicName(tenant, namespace, encodedTopic); + validateTopicAllowToCreate(topicName); internalCreateNonPartitionedTopic(authoritative); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 51843bbf56c21..91b8be9e4d261 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -1284,6 +1285,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, return; } + if (isTransactionSystemTopic(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new NotAllowedException(msg)); + return; + } + CompletableFuture maxTopicsCheck = createIfMissing ? checkMaxTopicsPerNamespace(topicName, 1) : CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index bc8bef96be1ba..d79cb196bb11d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -26,6 +26,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + import com.google.common.collect.Sets; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; @@ -101,6 +104,35 @@ protected void setup() throws Exception { setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0); } + @Test + public void testCreateTransactionSystemTopic() throws Exception { + String subName = "test"; + String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString(); + topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName); + + try { + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + fail(); + } catch (PulsarClientException.NotAllowedException e) { + assertTrue(e.getMessage().contains("Can not create transaction system topic")); + } + + try { + admin.topics().createPartitionedTopic(topicName, 3); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + + try { + admin.topics().createNonPartitionedTopic(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + } + @Test public void brokerNotInitTxnManagedLedgerTopic() throws Exception { String subName = "test"; From 97583477ee6840828b06f4967a48d642e6b407b3 Mon Sep 17 00:00:00 2001 From: congbo Date: Thu, 11 Nov 2021 18:19:22 +0800 Subject: [PATCH 2/5] Fix some code format --- .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 7185ced0cc58b..bffa13ddafe45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -234,7 +234,7 @@ public void createPartitionedTopic( validateGlobalNamespaceOwnership(); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); - validateTopicAllowToCreate(topicName); + validateTopicAllowdToCreate(topicName); internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); @@ -268,7 +268,7 @@ public void createNonPartitionedTopic( validateNamespaceName(tenant, namespace); validateGlobalNamespaceOwnership(); validateTopicName(tenant, namespace, encodedTopic); - validateTopicAllowToCreate(topicName); + validateTopicAllowdToCreate(topicName); internalCreateNonPartitionedTopic(authoritative); } From f4cfa0a86a84d24eaa7071b8f70c7c6c6af7cbd1 Mon Sep 17 00:00:00 2001 From: congbo Date: Fri, 12 Nov 2021 18:06:35 +0800 Subject: [PATCH 3/5] Add some getTopicList check --- .../apache/pulsar/broker/PulsarService.java | 9 ++---- .../admin/impl/PersistentTopicsBase.java | 11 +++++-- .../broker/admin/v2/PersistentTopics.java | 4 +-- .../broker/transaction/TransactionTest.java | 32 +++++++++++++++++++ .../pulsar/common/naming/TopicName.java | 3 ++ 5 files changed, 48 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d37194828e21e..a6b2fc7d97f42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,7 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; -import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -132,7 +132,6 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -1652,15 +1651,13 @@ public void shutdownNow() { public static boolean isTransactionSystemTopic(TopicName topicName) { String topic = topicName.toString(); return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) - || topic.startsWith(TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } public static boolean isNotAllowedToCreateTopic(TopicName topicName) { String topic = topicName.toString(); - return topic.startsWith(TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3fac9ffbb6ae4..259f1c766cb6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -162,7 +162,9 @@ protected List internalGetList() { } try { - return topicResources().listPersistentTopicsAsync(namespaceName).join(); + return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics -> + topics.stream().filter(topic -> + !isNotAllowedToCreateTopic(TopicName.get(topic))).collect(Collectors.toList())).join(); } catch (Exception e) { log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -245,7 +247,7 @@ protected void validateAdminAndClientPermission() { } } - protected void validateTopicAllowdToCreate(TopicName topicName) { + protected void validateCreateTopic(TopicName topicName) { if (isNotAllowedToCreateTopic(topicName)) { log.warn("Try to create a topic in the system topic format! {}", topicName); throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!"); @@ -3691,7 +3693,10 @@ private Topic getTopicReference(TopicName topicName) { } catch (RestException e) { throw e; } catch (Exception e) { - throw new RestException(e); + if (e.getCause() instanceof NotAllowedException) { + throw new RestException(Status.CONFLICT, e.getCause()); + } + throw new RestException(e.getCause()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index bffa13ddafe45..bf6c64712c851 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -234,7 +234,7 @@ public void createPartitionedTopic( validateGlobalNamespaceOwnership(); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); - validateTopicAllowdToCreate(topicName); + validateCreateTopic(topicName); internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); @@ -268,7 +268,7 @@ public void createNonPartitionedTopic( validateNamespaceName(tenant, namespace); validateGlobalNamespaceOwnership(); validateTopicName(tenant, namespace, encodedTopic); - validateTopicAllowdToCreate(topicName); + validateCreateTopic(topicName); internalCreateNonPartitionedTopic(authoritative); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index d79cb196bb11d..0676791285abb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -26,16 +27,19 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -108,9 +112,27 @@ protected void setup() throws Exception { public void testCreateTransactionSystemTopic() throws Exception { String subName = "test"; String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString(); + + try { + // init pending ack + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + } topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName); + // getList does not include transaction system topic + List list = admin.topics().getList(NAMESPACE1); + assertEquals(list.size(), 4); + list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX))); + try { + // can't create transaction system topic @Cleanup Consumer consumer = getConsumer(topicName, subName); fail(); @@ -118,6 +140,15 @@ public void testCreateTransactionSystemTopic() throws Exception { assertTrue(e.getMessage().contains("Can not create transaction system topic")); } + // can't create transaction system topic + try { + admin.topics().getSubscriptions(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName); + } + + // can't create transaction system topic try { admin.topics().createPartitionedTopic(topicName, 3); fail(); @@ -125,6 +156,7 @@ public void testCreateTransactionSystemTopic() throws Exception { assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); } + // can't create transaction system topic try { admin.topics().createNonPartitionedTopic(topicName); fail(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 257d27c206943..efbd8c0f87359 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -63,6 +63,9 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); From a46ab12fc11e8b58b7e11a60ea5dbcc8880cba3d Mon Sep 17 00:00:00 2001 From: congbo Date: Fri, 12 Nov 2021 22:15:50 +0800 Subject: [PATCH 4/5] Fix some comment --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 2 +- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a6b2fc7d97f42..28ccbb1013426 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1655,7 +1655,7 @@ public static boolean isTransactionSystemTopic(TopicName topicName) { || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } - public static boolean isNotAllowedToCreateTopic(TopicName topicName) { + public static boolean isTransactionInternalName(TopicName topicName) { String topic = topicName.toString(); return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 259f1c766cb6e..fc33ad4ea5673 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; -import static org.apache.pulsar.broker.PulsarService.isNotAllowedToCreateTopic; +import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName; import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign; import com.fasterxml.jackson.core.JsonProcessingException; @@ -164,7 +164,7 @@ protected List internalGetList() { try { return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics -> topics.stream().filter(topic -> - !isNotAllowedToCreateTopic(TopicName.get(topic))).collect(Collectors.toList())).join(); + !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join(); } catch (Exception e) { log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -248,7 +248,7 @@ protected void validateAdminAndClientPermission() { } protected void validateCreateTopic(TopicName topicName) { - if (isNotAllowedToCreateTopic(topicName)) { + if (isTransactionInternalName(topicName)) { log.warn("Try to create a topic in the system topic format! {}", topicName); throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!"); } From 55fbbd669c9d822e41dd46cd10eb75a1583bd2e6 Mon Sep 17 00:00:00 2001 From: congbo Date: Fri, 12 Nov 2021 23:16:18 +0800 Subject: [PATCH 5/5] Fix some test --- .../java/org/apache/pulsar/common/events/EventsTopicNames.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 2aa9e122d63c5..f82c9ae8519a4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -49,6 +49,7 @@ public static boolean checkTopicIsEventsNames(TopicName topicName) { } public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) { - return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString()); + return topicName != null && topicName.toString() + .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); } }