From ccc20c1ea407b45895f6e4e59bade47775a2426d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 19 Jul 2024 00:40:05 +0800 Subject: [PATCH 1/4] [improve] [broker] Improve CPU resources usege of TopicName Cache --- .../pulsar/broker/ServiceConfiguration.java | 14 +++++++++ .../pulsar/broker/service/BrokerService.java | 10 +++++++ .../pulsar/common/naming/NamespaceName.java | 2 +- .../pulsar/common/naming/TopicName.java | 29 +++++++------------ 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index aba3ad3a669f5..3e9a4c77ef270 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -594,6 +594,20 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean backlogQuotaCheckEnabled = true; + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Max capacity of the topic name cache" + ) + private int topicNameCacheCaxCapacity = 100_000; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "A Specifies the minimum number of minutes that the system stays in the memory, to avoid clear cache" + + " frequently when there are too many topics are in use" + ) + private int maxMinutesToClearTopicNameCache = 120; + @FieldContext( category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6ecd0a1ba6075..2082f6b8f8627 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -625,6 +625,16 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); + this.startClearInvalidateTopicNameCacheTask(); + } + + protected void startClearInvalidateTopicNameCacheTask() { + final int maxMinutesToClearTopicNameCache = pulsar.getConfiguration().getMaxMinutesToClearTopicNameCache(); + statsUpdater.scheduleAtFixedRate( + () -> TopicName.clearIfFull(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), + maxMinutesToClearTopicNameCache, + maxMinutesToClearTopicNameCache, + TimeUnit.MINUTES); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index a804e7b6506ad..8739ed1dd2993 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -40,7 +40,7 @@ public class NamespaceName implements ServiceUnitId { private final String localName; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public NamespaceName load(String name) throws Exception { return new NamespaceName(name); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index d264eab9574ef..59c9c236e6352 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -19,16 +19,11 @@ package org.apache.pulsar.common.naming; import com.google.common.base.Splitter; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -54,13 +49,13 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public TopicName load(String name) throws Exception { - return new TopicName(name); - } - }); + private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + public static void clearIfFull(int maxCapacity) { + if (cache.size() > maxCapacity) { + cache.clear(); + } + } public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; @@ -79,11 +74,9 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - try { - return cache.get(topic); - } catch (ExecutionException | UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); - } + return cache.computeIfAbsent(topic, k -> { + return new TopicName(k); + }); } public static TopicName getPartitionedTopicName(String topic) { From 61a7595a28ac2bcc621a6f19883df84bbde2b7c6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jul 2024 01:52:23 +0800 Subject: [PATCH 2/4] rebase master --- conf/broker.conf | 8 +++++ .../pulsar/broker/ServiceConfiguration.java | 9 ++--- .../pulsar/broker/service/BrokerService.java | 12 +++---- .../pulsar/broker/PulsarServiceTest.java | 33 +++++++++++++++++++ .../pulsar/broker/service/StandaloneTest.java | 2 ++ .../naming/ServiceConfigurationTest.java | 13 ++++++++ .../configurations/pulsar_broker_test.conf | 2 ++ .../pulsar_broker_test_standalone.conf | 2 ++ .../pulsar/common/naming/NamespaceName.java | 2 +- .../pulsar/common/naming/TopicName.java | 14 +++++--- 10 files changed, 82 insertions(+), 15 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index b715c4e515bc8..2b934f8d3dd7f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheCaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3e9a4c77ef270..93e324fc5f4f1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -597,16 +597,17 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = true, category = CATEGORY_POLICIES, - doc = "Max capacity of the topic name cache" + doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." ) private int topicNameCacheCaxCapacity = 100_000; @FieldContext( category = CATEGORY_POLICIES, - doc = "A Specifies the minimum number of minutes that the system stays in the memory, to avoid clear cache" - + " frequently when there are too many topics are in use" + doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" + + " frequently when there are too many topics are in use." ) - private int maxMinutesToClearTopicNameCache = 120; + private int maxSecondsToClearTopicNameCache = 3600 * 2; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2082f6b8f8627..9e5ca1e5927eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -629,12 +629,12 @@ public void start() throws Exception { } protected void startClearInvalidateTopicNameCacheTask() { - final int maxMinutesToClearTopicNameCache = pulsar.getConfiguration().getMaxMinutesToClearTopicNameCache(); - statsUpdater.scheduleAtFixedRate( - () -> TopicName.clearIfFull(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), - maxMinutesToClearTopicNameCache, - maxMinutesToClearTopicNameCache, - TimeUnit.MINUTES); + final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); + inactivityMonitor.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index daa4393db55fd..4b42b2c93a6b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -24,11 +24,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setTopicNameCacheCaxCapacity(5000); + conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception { assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } + @Test + public void testTopicCacheConfiguration() throws Exception { + cleanup(); + setup(); + assertEquals(conf.getTopicNameCacheCaxCapacity(), 5000); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); + + List topicNameCached = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + topicNameCached.add(TopicName.get("public/default/tp_" + i)); + } + + // Verify: the cache does not clear since it is not larger than max capacity. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + + // Update max capacity. + admin.brokers().updateDynamicConfiguration("topicNameCacheCaxCapacity", "10"); + + // Verify: the cache were cleared. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + } + @Test public void testBacklogAndRetentionCheck() throws PulsarServerException { ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index b99f8d5338f60..0234501ebe4e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -63,5 +63,7 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(standalone.getConfig().getTopicNameCacheCaxCapacity(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index ebeaffc48e4b9..9aaa27949d448 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -74,6 +74,8 @@ public void testInit() throws Exception { assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(config.getTopicNameCacheCaxCapacity(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception { conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } + + @Test + public void testTopicNameCacheConfiguration() throws Exception { + ServiceConfiguration conf; + final Properties properties = new Properties(); + properties.setProperty("maxSecondsToClearTopicNameCache", "2"); + properties.setProperty("topicNameCacheCaxCapacity", "100"); + conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); + assertEquals(conf.getTopicNameCacheCaxCapacity(), 100); + } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index ddda30d0a4bd9..340c8143c6dd4 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 +topicNameCacheCaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 812c8dc9748f9..4deef66d44b89 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true +topicNameCacheCaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index 8739ed1dd2993..a804e7b6506ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -40,7 +40,7 @@ public class NamespaceName implements ServiceUnitId { private final String localName; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public NamespaceName load(String name) throws Exception { return new NamespaceName(name); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 59c9c236e6352..dd24c9a971210 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -51,7 +51,11 @@ public class TopicName implements ServiceUnitId { private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - public static void clearIfFull(int maxCapacity) { + public static void clearIfReachedMaxCapacity(int maxCapacity) { + if (maxCapacity < 0) { + // Unlimited cache. + return; + } if (cache.size() > maxCapacity) { cache.clear(); } @@ -74,9 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - return cache.computeIfAbsent(topic, k -> { - return new TopicName(k); - }); + TopicName tp = cache.get(topic); + if (tp != null) { + return tp; + } + return cache.computeIfAbsent(topic, k -> new TopicName(k)); } public static TopicName getPartitionedTopicName(String topic) { From bd60644a9e2338c93a15ff4ca1115ecf91a874fc Mon Sep 17 00:00:00 2001 From: fengyubiao <9947090@qq.com> Date: Mon, 22 Jul 2024 11:31:07 +0800 Subject: [PATCH 3/4] Update conf/broker.conf Co-authored-by: Zixuan Liu --- conf/broker.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2b934f8d3dd7f..3c956bdd86dab 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -161,7 +161,7 @@ topicFactoryClassName= # Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache # per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. -topicNameCacheCaxCapacity=100000 +topicNameCacheMaxCapacity=100000 # A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when # there are too many topics are in use. From 9fe8dad35cc0cec4dde53d862026712f5c318f94 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jul 2024 11:35:32 +0800 Subject: [PATCH 4/4] fix typo --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../org/apache/pulsar/broker/service/BrokerService.java | 2 +- .../java/org/apache/pulsar/broker/PulsarServiceTest.java | 6 +++--- .../org/apache/pulsar/broker/service/StandaloneTest.java | 2 +- .../pulsar/common/naming/ServiceConfigurationTest.java | 6 +++--- .../test/resources/configurations/pulsar_broker_test.conf | 2 +- .../configurations/pulsar_broker_test_standalone.conf | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 93e324fc5f4f1..2d2765287c0e0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -600,7 +600,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." ) - private int topicNameCacheCaxCapacity = 100_000; + private int topicNameCacheMaxCapacity = 100_000; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9e5ca1e5927eb..c0f44838ac680 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -631,7 +631,7 @@ public void start() throws Exception { protected void startClearInvalidateTopicNameCacheTask() { final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); inactivityMonitor.scheduleAtFixedRate( - () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), + () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()), maxSecondsToClearTopicNameCache, maxSecondsToClearTopicNameCache, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 4b42b2c93a6b6..3bbf423da6ef3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -59,7 +59,7 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); - conf.setTopicNameCacheCaxCapacity(5000); + conf.setTopicNameCacheMaxCapacity(5000); conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); @@ -196,7 +196,7 @@ public void testDynamicBrokerPort() throws Exception { public void testTopicCacheConfiguration() throws Exception { cleanup(); setup(); - assertEquals(conf.getTopicNameCacheCaxCapacity(), 5000); + assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000); assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); List topicNameCached = new ArrayList<>(); @@ -211,7 +211,7 @@ public void testTopicCacheConfiguration() throws Exception { } // Update max capacity. - admin.brokers().updateDynamicConfiguration("topicNameCacheCaxCapacity", "10"); + admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10"); // Verify: the cache were cleared. Thread.sleep(10 * 1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 0234501ebe4e0..e95b9410f4d12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -64,6 +64,6 @@ public void testInitialize() throws Exception { "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(standalone.getConfig().getTopicNameCacheCaxCapacity(), 200); + assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 9aaa27949d448..c64c54d2d191c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -75,7 +75,7 @@ public void testInit() throws Exception { assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(config.getTopicNameCacheCaxCapacity(), 200); + assertEquals(config.getTopicNameCacheMaxCapacity(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -383,9 +383,9 @@ public void testTopicNameCacheConfiguration() throws Exception { ServiceConfiguration conf; final Properties properties = new Properties(); properties.setProperty("maxSecondsToClearTopicNameCache", "2"); - properties.setProperty("topicNameCacheCaxCapacity", "100"); + properties.setProperty("topicNameCacheMaxCapacity", "100"); conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); - assertEquals(conf.getTopicNameCacheCaxCapacity(), 100); + assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 340c8143c6dd4..f344a3e3f63da 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,5 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 -topicNameCacheCaxCapacity=200 +topicNameCacheMaxCapacity=200 maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 4deef66d44b89..c520512e77bf9 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,5 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true -topicNameCacheCaxCapacity=200 +topicNameCacheMaxCapacity=200 maxSecondsToClearTopicNameCache=1