diff --git a/conf/broker.conf b/conf/broker.conf index b715c4e515bc8..3c956bdd86dab 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheMaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index aba3ad3a669f5..2d2765287c0e0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -594,6 +594,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean backlogQuotaCheckEnabled = true; + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." + ) + private int topicNameCacheMaxCapacity = 100_000; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" + + " frequently when there are too many topics are in use." + ) + private int maxSecondsToClearTopicNameCache = 3600 * 2; + @FieldContext( category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6ecd0a1ba6075..c0f44838ac680 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -625,6 +625,16 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); + this.startClearInvalidateTopicNameCacheTask(); + } + + protected void startClearInvalidateTopicNameCacheTask() { + final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); + inactivityMonitor.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index daa4393db55fd..3bbf423da6ef3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -24,11 +24,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setTopicNameCacheMaxCapacity(5000); + conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception { assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } + @Test + public void testTopicCacheConfiguration() throws Exception { + cleanup(); + setup(); + assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); + + List topicNameCached = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + topicNameCached.add(TopicName.get("public/default/tp_" + i)); + } + + // Verify: the cache does not clear since it is not larger than max capacity. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + + // Update max capacity. + admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10"); + + // Verify: the cache were cleared. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + } + @Test public void testBacklogAndRetentionCheck() throws PulsarServerException { ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index b99f8d5338f60..e95b9410f4d12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -63,5 +63,7 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index ebeaffc48e4b9..c64c54d2d191c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -74,6 +74,8 @@ public void testInit() throws Exception { assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(config.getTopicNameCacheMaxCapacity(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception { conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } + + @Test + public void testTopicNameCacheConfiguration() throws Exception { + ServiceConfiguration conf; + final Properties properties = new Properties(); + properties.setProperty("maxSecondsToClearTopicNameCache", "2"); + properties.setProperty("topicNameCacheMaxCapacity", "100"); + conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); + assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); + } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index ddda30d0a4bd9..f344a3e3f63da 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 +topicNameCacheMaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 812c8dc9748f9..c520512e77bf9 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true +topicNameCacheMaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index d264eab9574ef..dd24c9a971210 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -19,16 +19,11 @@ package org.apache.pulsar.common.naming; import com.google.common.base.Splitter; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public TopicName load(String name) throws Exception { - return new TopicName(name); - } - }); + private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + public static void clearIfReachedMaxCapacity(int maxCapacity) { + if (maxCapacity < 0) { + // Unlimited cache. + return; + } + if (cache.size() > maxCapacity) { + cache.clear(); + } + } public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; @@ -79,11 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - try { - return cache.get(topic); - } catch (ExecutionException | UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); + TopicName tp = cache.get(topic); + if (tp != null) { + return tp; } + return cache.computeIfAbsent(topic, k -> new TopicName(k)); } public static TopicName getPartitionedTopicName(String topic) {