From d31b4a0b266239004c6059dcecc95aec1ae9ff52 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 13:41:37 +0800 Subject: [PATCH 01/10] [improve] [broker] high CPU usage caused by list topics under namespace --- .../broker/namespace/NamespaceService.java | 30 +++++++++++++++++++ .../pulsar/broker/service/ServerCnx.java | 4 +-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2a1584df961f7..d9cd3fa81a623 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -104,6 +105,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.stats.MetricsUtil; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataCache; @@ -187,6 +189,9 @@ public class NamespaceService implements AutoCloseable { .register(); private final DoubleHistogram lookupLatencyHistogram; + private ConcurrentHashMap>> inProgressQueryUserTopics = + new ConcurrentHashMap<>(); + /** * Default constructor. */ @@ -1509,6 +1514,31 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa } } + public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { + String key = String.format("%s://%s", mode, namespaceName) + CompletableFuture> queryRes = + inProgressQueryUserTopics.computeIfAbsent(key, k -> { + CompletableFuture> res = new CompletableFuture<>(); + // Switch thread to avoid blocking other threads who are calling the current method. + pulsar.getExecutor().execute(() -> { + getListOfTopics(namespaceName, mode).thenApply(list -> { + return TopicList.filterSystemTopic(list); + }).whenComplete((topics, ex) -> { + if (ex != null) { + res.completeExceptionally(ex); + } else { + res.complete(topics); + } + }); + }); + return res; + }); + queryRes.whenComplete((ignore, ex) -> { + inProgressQueryUserTopics.remove(key); + }); + return queryRes; + } + public CompletableFuture> getAllPartitions(NamespaceName namespaceName) { return getPartitions(namespaceName, TopicDomain.persistent) .thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4933aee974d08..9bca80c41bb49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2459,11 +2459,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet if (lookupSemaphore.tryAcquire()) { isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { - getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) .thenAccept(topics -> { boolean filterTopics = false; // filter system topic - List filteredTopics = TopicList.filterSystemTopic(topics); + List filteredTopics = topics; if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { From e70409ff5bd9deea8b04a75c09b575be50699ce0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 13:46:41 +0800 Subject: [PATCH 02/10] improve code --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d9cd3fa81a623..eb1205298aebc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1534,7 +1534,7 @@ public CompletableFuture> getListOfUserTopics(NamespaceName namespa return res; }); queryRes.whenComplete((ignore, ex) -> { - inProgressQueryUserTopics.remove(key); + inProgressQueryUserTopics.remove(key, queryRes); }); return queryRes; } From 9b975a1163c089cabfaabee6752ce579fe06e2bc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 13:51:41 +0800 Subject: [PATCH 03/10] improve code --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index eb1205298aebc..efb0582f2d665 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1515,7 +1515,7 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa } public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { - String key = String.format("%s://%s", mode, namespaceName) + String key = String.format("%s://%s", mode, namespaceName); CompletableFuture> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { CompletableFuture> res = new CompletableFuture<>(); From 65786f60100751c4e2fed0cffd6dababa7fb035f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 15:35:19 +0800 Subject: [PATCH 04/10] improve method isSystemTopic --- .../common/naming/SystemTopicNames.java | 38 ++++++++++++++++--- .../common/naming/SystemTopicNamesTest.java | 26 +++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java index 716d9bc31facb..d44f421bbacc1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.common.naming; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import com.google.common.collect.Sets; import java.util.Collections; import java.util.Set; +import lombok.extern.slf4j.Slf4j; /** * Encapsulate the parsing of the completeTopicName name. */ +@Slf4j public class SystemTopicNames { /** @@ -51,6 +54,12 @@ public class SystemTopicNames { public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; + public static final String TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME = "transaction_coordinator_assign"; + + public static final String TRANSACTION_COORDINATOR_LOG_LOCAL_NAME = "__transaction_log_"; + + public static final String RESOURCE_USAGE_TOPIC_LOCAL_NAME = "resource-usage"; + /** * The set of all local topic names declared above. */ @@ -60,18 +69,22 @@ public class SystemTopicNames { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME); public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_LOG_LOCAL_NAME); public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, "resource-usage"); + NamespaceName.SYSTEM_NAMESPACE, RESOURCE_USAGE_TOPIC_LOCAL_NAME); public static boolean isEventSystemTopic(TopicName topicName) { return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); } + public static boolean isEventSystemTopicLocalName(String localTopicName) { + return EVENTS_TOPIC_NAMES.contains(localTopicName); + } + public static boolean isTransactionCoordinatorAssign(TopicName topicName) { return topicName != null && topicName.toString() .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()); @@ -91,8 +104,23 @@ public static boolean isTransactionInternalName(TopicName topicName) { || topic.endsWith(PENDING_ACK_STORE_SUFFIX); } + public static boolean isTransactionInternalLocalName(String localTopicName) { + return TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME.equals(localTopicName) + || TRANSACTION_COORDINATOR_LOG_LOCAL_NAME.equals(localTopicName) + || PENDING_ACK_STORE_SUFFIX.equals(localTopicName); + } + public static boolean isSystemTopic(TopicName topicName) { - TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName); + int partition = topicName.getPartitionIndex(); + String localTopicName = topicName.getLocalName(); + if (partition >= 0) { + int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + Integer.valueOf(partition).toString().length(); + if (localTopicName.length() <= suffixLen) { + log.error("Found an error topic name: {}", topicName); + throw new IllegalArgumentException("Found an error topic name: " + topicName.toString()); + } + localTopicName = localTopicName.substring(0, localTopicName.length() - suffixLen); + } + return isEventSystemTopicLocalName(localTopicName) || isTransactionInternalLocalName(localTopicName); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java index 92d93021973b1..93e6041bf55d3 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.common.naming; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -44,4 +46,28 @@ public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedRes assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName))); assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName))); } + + @Test + public void testIsSystemTopic() { + String tp1 = "public/default/tp1"; + assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp1))); + String tp2 = "public/default/tp1-partition-0"; + assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp2))); + String tp3 = "public/default/__change_events"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp3))); + String tp4 = "public/default/__change_events-partition-0"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp4))); + String tp5 = "public/default/transaction_coordinator_assign"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp5))); + String tp6 = "public/default/transaction_coordinator_assign-partition-0"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp6))); + String tp7 = "persistent://public/default/__change_events"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp7))); + String tp8 = "persistent://public/default/__change_events-partition-0"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp8))); + String tp9 = "persistent://public/default/transaction_coordinator_assign"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp9))); + String tp10 = "persistent://public/default/transaction_coordinator_assign-partition-0"; + assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp10))); + } } From d9eda8e2c41da43b5672010db5e3541e4d6e17c0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 15:47:34 +0800 Subject: [PATCH 05/10] checkstyle --- .../java/org/apache/pulsar/common/naming/SystemTopicNames.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java index d44f421bbacc1..3e9c5d9d559a5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -114,7 +114,7 @@ public static boolean isSystemTopic(TopicName topicName) { int partition = topicName.getPartitionIndex(); String localTopicName = topicName.getLocalName(); if (partition >= 0) { - int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + Integer.valueOf(partition).toString().length(); + int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + String.valueOf(partition).length(); if (localTopicName.length() <= suffixLen) { log.error("Found an error topic name: {}", topicName); throw new IllegalArgumentException("Found an error topic name: " + topicName.toString()); From 865505e92c854777dc5bba30900a0d42a7633c55 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 15:54:19 +0800 Subject: [PATCH 06/10] address comments --- .../pulsar/broker/namespace/NamespaceService.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index efb0582f2d665..26fd085ad9327 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -56,6 +56,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -1516,8 +1517,10 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { String key = String.format("%s://%s", mode, namespaceName); + final MutableBoolean initializedByCurrentThread = new MutableBoolean(); CompletableFuture> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { + initializedByCurrentThread.setTrue(); CompletableFuture> res = new CompletableFuture<>(); // Switch thread to avoid blocking other threads who are calling the current method. pulsar.getExecutor().execute(() -> { @@ -1533,9 +1536,11 @@ public CompletableFuture> getListOfUserTopics(NamespaceName namespa }); return res; }); - queryRes.whenComplete((ignore, ex) -> { - inProgressQueryUserTopics.remove(key, queryRes); - }); + if (initializedByCurrentThread.getValue()) { + queryRes.whenComplete((ignore, ex) -> { + inProgressQueryUserTopics.remove(key, queryRes); + }); + } return queryRes; } From 0ebe34651bbdac7259d9584c636c324617f2ead0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 16:33:41 +0800 Subject: [PATCH 07/10] address comments --- .../main/java/org/apache/pulsar/common/naming/TopicName.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index d264eab9574ef..419a8bece4d9e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -55,7 +55,7 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public TopicName load(String name) throws Exception { return new TopicName(name); From 434a12ab3a80c91421f9112090e1ab98e8ef3727 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Jul 2024 22:29:24 +0800 Subject: [PATCH 08/10] address comments --- .../broker/namespace/NamespaceService.java | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 26fd085ad9327..ec4c907234ab6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1518,23 +1518,11 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { String key = String.format("%s://%s", mode, namespaceName); final MutableBoolean initializedByCurrentThread = new MutableBoolean(); - CompletableFuture> queryRes = - inProgressQueryUserTopics.computeIfAbsent(key, k -> { + CompletableFuture> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { initializedByCurrentThread.setTrue(); - CompletableFuture> res = new CompletableFuture<>(); - // Switch thread to avoid blocking other threads who are calling the current method. - pulsar.getExecutor().execute(() -> { - getListOfTopics(namespaceName, mode).thenApply(list -> { - return TopicList.filterSystemTopic(list); - }).whenComplete((topics, ex) -> { - if (ex != null) { - res.completeExceptionally(ex); - } else { - res.complete(topics); - } - }); - }); - return res; + return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> { + return TopicList.filterSystemTopic(list); + }, pulsar.getExecutor()); }); if (initializedByCurrentThread.getValue()) { queryRes.whenComplete((ignore, ex) -> { From 43e0e4ad411f18455a0ec285d85a1d77d4eba435 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 19 Jul 2024 01:43:34 +0800 Subject: [PATCH 09/10] remove the improvement for the method isSystemTopic --- .../common/naming/SystemTopicNames.java | 38 +++---------------- .../pulsar/common/naming/TopicName.java | 2 +- .../common/naming/SystemTopicNamesTest.java | 26 ------------- 3 files changed, 6 insertions(+), 60 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java index 3e9c5d9d559a5..716d9bc31facb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -18,16 +18,13 @@ */ package org.apache.pulsar.common.naming; -import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import com.google.common.collect.Sets; import java.util.Collections; import java.util.Set; -import lombok.extern.slf4j.Slf4j; /** * Encapsulate the parsing of the completeTopicName name. */ -@Slf4j public class SystemTopicNames { /** @@ -54,12 +51,6 @@ public class SystemTopicNames { public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; - public static final String TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME = "transaction_coordinator_assign"; - - public static final String TRANSACTION_COORDINATOR_LOG_LOCAL_NAME = "__transaction_log_"; - - public static final String RESOURCE_USAGE_TOPIC_LOCAL_NAME = "resource-usage"; - /** * The set of all local topic names declared above. */ @@ -69,22 +60,18 @@ public class SystemTopicNames { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_LOG_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, RESOURCE_USAGE_TOPIC_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "resource-usage"); public static boolean isEventSystemTopic(TopicName topicName) { return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); } - public static boolean isEventSystemTopicLocalName(String localTopicName) { - return EVENTS_TOPIC_NAMES.contains(localTopicName); - } - public static boolean isTransactionCoordinatorAssign(TopicName topicName) { return topicName != null && topicName.toString() .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()); @@ -104,23 +91,8 @@ public static boolean isTransactionInternalName(TopicName topicName) { || topic.endsWith(PENDING_ACK_STORE_SUFFIX); } - public static boolean isTransactionInternalLocalName(String localTopicName) { - return TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME.equals(localTopicName) - || TRANSACTION_COORDINATOR_LOG_LOCAL_NAME.equals(localTopicName) - || PENDING_ACK_STORE_SUFFIX.equals(localTopicName); - } - public static boolean isSystemTopic(TopicName topicName) { - int partition = topicName.getPartitionIndex(); - String localTopicName = topicName.getLocalName(); - if (partition >= 0) { - int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + String.valueOf(partition).length(); - if (localTopicName.length() <= suffixLen) { - log.error("Found an error topic name: {}", topicName); - throw new IllegalArgumentException("Found an error topic name: " + topicName.toString()); - } - localTopicName = localTopicName.substring(0, localTopicName.length() - suffixLen); - } - return isEventSystemTopicLocalName(localTopicName) || isTransactionInternalLocalName(localTopicName); + TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 419a8bece4d9e..d264eab9574ef 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -55,7 +55,7 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public TopicName load(String name) throws Exception { return new TopicName(name); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java index 93e6041bf55d3..92d93021973b1 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java @@ -19,8 +19,6 @@ package org.apache.pulsar.common.naming; import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -46,28 +44,4 @@ public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedRes assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName))); assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName))); } - - @Test - public void testIsSystemTopic() { - String tp1 = "public/default/tp1"; - assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp1))); - String tp2 = "public/default/tp1-partition-0"; - assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp2))); - String tp3 = "public/default/__change_events"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp3))); - String tp4 = "public/default/__change_events-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp4))); - String tp5 = "public/default/transaction_coordinator_assign"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp5))); - String tp6 = "public/default/transaction_coordinator_assign-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp6))); - String tp7 = "persistent://public/default/__change_events"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp7))); - String tp8 = "persistent://public/default/__change_events-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp8))); - String tp9 = "persistent://public/default/transaction_coordinator_assign"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp9))); - String tp10 = "persistent://public/default/transaction_coordinator_assign-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp10))); - } } From aedefd33b9bb74f37b706ff31152a28382cb797a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jul 2024 02:54:13 +0800 Subject: [PATCH 10/10] fix tests --- .../java/org/apache/pulsar/broker/service/ServerCnxTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 27afedd6b101e..58c6b96a0f346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -229,6 +229,8 @@ public void setup() throws Exception { doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); + doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( + NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics( NamespaceName.get("use", "ns-abc"));