From f91475dfa5e35947c10d380be4b056b0ee461512 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 11:53:26 +0300 Subject: [PATCH 01/41] Revert "[improve] [broker] Improve CPU resources usege of TopicName Cache (#23052)" This reverts commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9. --- conf/broker.conf | 8 ----- .../pulsar/broker/ServiceConfiguration.java | 15 --------- .../pulsar/broker/service/BrokerService.java | 10 ------ .../pulsar/broker/PulsarServiceTest.java | 33 ------------------- .../pulsar/broker/service/StandaloneTest.java | 2 -- .../naming/ServiceConfigurationTest.java | 14 -------- .../configurations/pulsar_broker_test.conf | 2 -- .../pulsar_broker_test_standalone.conf | 2 -- .../pulsar/common/naming/TopicName.java | 33 ++++++++++--------- 9 files changed, 17 insertions(+), 102 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 13cbb9528f462..423e61243e2dc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -159,14 +159,6 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= -# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache -# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. -topicNameCacheMaxCapacity=100000 - -# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when -# there are too many topics are in use. -maxSecondsToClearTopicNameCache=7200 - # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4ba326f31f6e9..829f094beb52f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -603,21 +603,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean backlogQuotaCheckEnabled = true; - @FieldContext( - dynamic = true, - category = CATEGORY_POLICIES, - doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" - + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." - ) - private int topicNameCacheMaxCapacity = 100_000; - - @FieldContext( - category = CATEGORY_POLICIES, - doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" - + " frequently when there are too many topics are in use." - ) - private int maxSecondsToClearTopicNameCache = 3600 * 2; - @FieldContext( category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 359c0daf5b8ea..f5dc5344cf5b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -641,16 +641,6 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); - this.startClearInvalidateTopicNameCacheTask(); - } - - protected void startClearInvalidateTopicNameCacheTask() { - final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); - inactivityMonitor.scheduleAtFixedRate( - () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()), - maxSecondsToClearTopicNameCache, - maxSecondsToClearTopicNameCache, - TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index dd2f9288071a5..5e44fe408112a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -24,14 +24,11 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -59,8 +56,6 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); - conf.setTopicNameCacheMaxCapacity(5000); - conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); @@ -195,34 +190,6 @@ public void testDynamicBrokerPort() throws Exception { assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } - @Test - public void testTopicCacheConfiguration() throws Exception { - cleanup(); - setup(); - assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000); - assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); - - List topicNameCached = new ArrayList<>(); - for (int i = 0; i < 20; i++) { - topicNameCached.add(TopicName.get("public/default/tp_" + i)); - } - - // Verify: the cache does not clear since it is not larger than max capacity. - Thread.sleep(10 * 1000); - for (int i = 0; i < 20; i++) { - assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); - } - - // Update max capacity. - admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10"); - - // Verify: the cache were cleared. - Thread.sleep(10 * 1000); - for (int i = 0; i < 20; i++) { - assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); - } - } - @Test public void testBacklogAndRetentionCheck() throws PulsarServerException { ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 541408b781be2..7089aa12d2cad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -63,8 +63,6 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); - assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 5972c6f724d8c..5f513938858ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.io.ByteArrayInputStream; @@ -75,8 +74,6 @@ public void testInit() throws Exception { assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); - assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(config.getTopicNameCacheMaxCapacity(), 200); assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); @@ -384,17 +381,6 @@ public void testAllowAutoTopicCreationType() throws Exception { assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } - @Test - public void testTopicNameCacheConfiguration() throws Exception { - ServiceConfiguration conf; - final Properties properties = new Properties(); - properties.setProperty("maxSecondsToClearTopicNameCache", "2"); - properties.setProperty("topicNameCacheMaxCapacity", "100"); - conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); - assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); - assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); - } - @Test public void testLookupProperties() throws Exception { var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2"; diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 0fdb29e06866f..6bd949e4c25c5 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,6 +104,4 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 -topicNameCacheMaxCapacity=200 -maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=false diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index d3f9430f29b48..085457067163b 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,6 +95,4 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true -topicNameCacheMaxCapacity=200 -maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=true diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b2f96bfe6e259..0c85aaa207007 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -19,11 +19,16 @@ package org.apache.pulsar.common.naming; import com.google.common.base.Splitter; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -49,17 +54,13 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - - public static void clearIfReachedMaxCapacity(int maxCapacity) { - if (maxCapacity < 0) { - // Unlimited cache. - return; - } - if (cache.size() > maxCapacity) { - cache.clear(); - } - } + private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { + @Override + public TopicName load(String name) throws Exception { + return new TopicName(name); + } + }); public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; @@ -78,11 +79,11 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - TopicName tp = cache.get(topic); - if (tp != null) { - return tp; + try { + return cache.get(topic); + } catch (ExecutionException | UncheckedExecutionException e) { + throw (RuntimeException) e.getCause(); } - return cache.computeIfAbsent(topic, k -> new TopicName(k)); } public static TopicName getPartitionedTopicName(String topic) { From 61f46d7b367fe561e09204e6d01f237eb650c536 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 12:09:37 +0300 Subject: [PATCH 02/41] Replace Guava Cache with Caffeine cache to fix original performance issue --- .../pulsar/common/naming/NamespaceName.java | 26 +++++-------------- .../pulsar/common/naming/TopicName.java | 24 +++++------------ 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index a804e7b6506ad..4b4b2a9191116 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -18,14 +18,11 @@ */ package org.apache.pulsar.common.naming; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -39,13 +36,10 @@ public class NamespaceName implements ServiceUnitId { private final String cluster; private final String localName; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public NamespaceName load(String name) throws Exception { - return new NamespaceName(name); - } - }); + private static final LoadingCache cache = Caffeine.newBuilder() + .maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES) + .build(name -> new NamespaceName(name)); public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); @@ -63,13 +57,7 @@ public static NamespaceName get(String namespace) { if (namespace == null || namespace.isEmpty()) { throw new IllegalArgumentException("Invalid null namespace: " + namespace); } - try { - return cache.get(namespace); - } catch (ExecutionException e) { - throw (RuntimeException) e.getCause(); - } catch (UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); - } + return cache.get(namespace); } public static Optional getIfValid(String namespace) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 0c85aaa207007..cfbf2eae9463e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -18,16 +18,13 @@ */ package org.apache.pulsar.common.naming; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Splitter; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -54,13 +51,10 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public TopicName load(String name) throws Exception { - return new TopicName(name); - } - }); + private static final LoadingCache cache = Caffeine.newBuilder() + .maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES) + .build(name -> new TopicName(name)); public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; @@ -79,11 +73,7 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - try { - return cache.get(topic); - } catch (ExecutionException | UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); - } + return cache.get(topic); } public static TopicName getPartitionedTopicName(String topic) { From e9228c0488473e9a93afcd7bad248ebf0ce5f6e1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 12:11:45 +0300 Subject: [PATCH 03/41] Deduplicate String instances to reduce heap usage --- .../pulsar/common/naming/NamespaceName.java | 13 ++++++----- .../pulsar/common/naming/TopicName.java | 23 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index 4b4b2a9191116..c2a474252e28e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.StringInterner; /** * Parser of a value from the namespace field provided in configuration. @@ -91,16 +92,16 @@ private NamespaceName(String namespace) { // New style namespace : / validateNamespaceName(parts[0], parts[1]); - tenant = parts[0]; + tenant = StringInterner.intern(parts[0]); cluster = null; - localName = parts[1]; + localName = StringInterner.intern(parts[1]); } else if (parts.length == 3) { // Old style namespace: // validateNamespaceName(parts[0], parts[1], parts[2]); - tenant = parts[0]; - cluster = parts[1]; - localName = parts[2]; + tenant = StringInterner.intern(parts[0]); + cluster = StringInterner.intern(parts[1]); + localName = StringInterner.intern(parts[2]); } else { throw new IllegalArgumentException("Invalid namespace format. namespace: " + namespace); } @@ -109,7 +110,7 @@ private NamespaceName(String namespace) { + " expected / or // " + "but got: " + namespace, e); } - this.namespace = namespace; + this.namespace = StringInterner.intern(namespace); } public String getTenant() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index cfbf2eae9463e..4b6fea42b3cf6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.StringInterner; /** * Encapsulate the parsing of the completeTopicName name. @@ -143,18 +144,18 @@ private TopicName(String completeTopicName) { parts = Splitter.on("/").limit(4).splitToList(rest); if (parts.size() == 3) { // New topic name without cluster name - this.tenant = parts.get(0); + this.tenant = StringInterner.intern(parts.get(0)); this.cluster = null; - this.namespacePortion = parts.get(1); - this.localName = parts.get(2); + this.namespacePortion = StringInterner.intern(parts.get(1)); + this.localName = StringInterner.intern(parts.get(2)); this.partitionIndex = getPartitionIndex(completeTopicName); this.namespaceName = NamespaceName.get(tenant, namespacePortion); } else if (parts.size() == 4) { // Legacy topic name that includes cluster name - this.tenant = parts.get(0); - this.cluster = parts.get(1); - this.namespacePortion = parts.get(2); - this.localName = parts.get(3); + this.tenant = StringInterner.intern(parts.get(0)); + this.cluster = StringInterner.intern(parts.get(1)); + this.namespacePortion = StringInterner.intern(parts.get(2)); + this.localName = StringInterner.intern(parts.get(3)); this.partitionIndex = getPartitionIndex(completeTopicName); this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); } else { @@ -170,12 +171,12 @@ private TopicName(String completeTopicName) { throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e); } if (isV2()) { - this.completeTopicName = String.format("%s://%s/%s/%s", - domain, tenant, namespacePortion, localName); + this.completeTopicName = StringInterner.intern(String.format("%s://%s/%s/%s", + domain, tenant, namespacePortion, localName)); } else { - this.completeTopicName = String.format("%s://%s/%s/%s/%s", + this.completeTopicName = StringInterner.intern(String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, - namespacePortion, localName); + namespacePortion, localName)); } } From 67b3c0df7d975805adb8872ed66f8b3e3f55614a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 12:23:49 +0300 Subject: [PATCH 04/41] Use interner to deduplicate TopicName and NamespaceName instances --- .../java/org/apache/pulsar/common/naming/NamespaceName.java | 5 ++++- .../main/java/org/apache/pulsar/common/naming/TopicName.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index c2a474252e28e..a0b1decb72f6f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -20,6 +20,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Objects; import java.util.Optional; @@ -37,10 +39,11 @@ public class NamespaceName implements ServiceUnitId { private final String cluster; private final String localName; + private static final Interner namespaceNameInterner = Interners.newWeakInterner(); private static final LoadingCache cache = Caffeine.newBuilder() .maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES) - .build(name -> new NamespaceName(name)); + .build(name -> namespaceNameInterner.intern(new NamespaceName(name))); public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 4b6fea42b3cf6..28376aa9dcf4b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -21,6 +21,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Splitter; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; @@ -52,10 +54,11 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; + private static final Interner topicNameInterner = Interners.newWeakInterner(); private static final LoadingCache cache = Caffeine.newBuilder() .maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES) - .build(name -> new TopicName(name)); + .build(name -> topicNameInterner.intern(new TopicName(name))); public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; From 6085a66f284019e4d98701ec21fb10a1cf1b5534 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 12:24:34 +0300 Subject: [PATCH 05/41] Use soft values in the cache to ensure that caches will be dropped on heavy memory pressur --- .../main/java/org/apache/pulsar/common/naming/NamespaceName.java | 1 + .../src/main/java/org/apache/pulsar/common/naming/TopicName.java | 1 + 2 files changed, 2 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index a0b1decb72f6f..0dc44a81acae5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -41,6 +41,7 @@ public class NamespaceName implements ServiceUnitId { private static final Interner namespaceNameInterner = Interners.newWeakInterner(); private static final LoadingCache cache = Caffeine.newBuilder() + .softValues() .maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES) .build(name -> namespaceNameInterner.intern(new NamespaceName(name))); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 28376aa9dcf4b..f53e2ed8d946a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -56,6 +56,7 @@ public class TopicName implements ServiceUnitId { private static final Interner topicNameInterner = Interners.newWeakInterner(); private static final LoadingCache cache = Caffeine.newBuilder() + .softValues() .maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES) .build(name -> topicNameInterner.intern(new TopicName(name))); From 1ad049bf0e15b83a49603d55ce29d7dd2bbcb313 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 12:59:28 +0300 Subject: [PATCH 06/41] Switch to use expireAfterWrite and configure scheduler to remove expired entries --- .../java/org/apache/pulsar/common/naming/NamespaceName.java | 4 +++- .../main/java/org/apache/pulsar/common/naming/TopicName.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index 0dc44a81acae5..67ca883b4d450 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -20,6 +20,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -43,7 +44,8 @@ public class NamespaceName implements ServiceUnitId { private static final LoadingCache cache = Caffeine.newBuilder() .softValues() .maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES) + .expireAfterWrite(30, TimeUnit.MINUTES) + .scheduler(Scheduler.systemScheduler()) .build(name -> namespaceNameInterner.intern(new NamespaceName(name))); public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index f53e2ed8d946a..d804379c74a5a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -20,6 +20,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.base.Splitter; import com.google.common.collect.Interner; import com.google.common.collect.Interners; @@ -58,7 +59,8 @@ public class TopicName implements ServiceUnitId { private static final LoadingCache cache = Caffeine.newBuilder() .softValues() .maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES) + .expireAfterWrite(30, TimeUnit.MINUTES) + .scheduler(Scheduler.systemScheduler()) .build(name -> topicNameInterner.intern(new TopicName(name))); public static TopicName get(String domain, NamespaceName namespaceName, String topic) { From 99c683f62c93dda021d933d7f736cd0f1e3d29be Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 13:23:00 +0300 Subject: [PATCH 07/41] Add comments --- .../java/org/apache/pulsar/common/naming/NamespaceName.java | 4 ++++ .../main/java/org/apache/pulsar/common/naming/TopicName.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index 67ca883b4d450..f79c4bca75ef9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -40,7 +40,11 @@ public class NamespaceName implements ServiceUnitId { private final String cluster; private final String localName; + // Deduplicates NamespaceName instances when the cached entry isn't in the actual cache. + // Holds weak references to NamespaceName so it won't prevent garbage collection. private static final Interner namespaceNameInterner = Interners.newWeakInterner(); + // Cache for NamespaceName instances that uses Caffeine to provide fast access and expiration. + // Soft references allow the garbage collector to reclaim memory when needed. private static final LoadingCache cache = Caffeine.newBuilder() .softValues() .maximumSize(100000) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index d804379c74a5a..6f403e16eeee6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -55,7 +55,11 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; + // Deduplicates TopicName instances when the cached entry isn't in the actual cache. + // Holds weak references to TopicName so it won't prevent garbage collection. private static final Interner topicNameInterner = Interners.newWeakInterner(); + // Cache for TopicName instances that uses Caffeine to provide fast access and expiration. + // Soft references allow the garbage collector to reclaim memory when needed. private static final LoadingCache cache = Caffeine.newBuilder() .softValues() .maximumSize(100000) From 3ff6954beaa23ed14e2d910906df061fd997c9eb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 13:30:36 +0300 Subject: [PATCH 08/41] Add simple microbenchmark --- .../common/naming/TopicNameBenchmark.java | 77 +++++++++++++++++++ .../pulsar/common/naming/package-info.java | 19 +++++ 2 files changed, 96 insertions(+) create mode 100644 microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java create mode 100644 microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java new file mode 100644 index 0000000000000..a06c54a3a5e5e --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.common.naming; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Running benchmarks on Linux is recommended due timer issues on MacOS. + */ +@Fork(3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class TopicNameBenchmark { + @Threads(1) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void topicCacheLookups001Threads(Blackhole blackhole) { + topicCacheLookups(blackhole); + } + + @Threads(10) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void topicCacheLookups010Threads(Blackhole blackhole) { + topicCacheLookups(blackhole); + } + + @Threads(100) + @Benchmark + @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) + public void topicCacheLookups100Threads(Blackhole blackhole) { + topicCacheLookups(blackhole); + } + + private void topicCacheLookups(Blackhole blackhole) { + for (int i = 0; i < 100_000; i++) { + blackhole.consume( + TopicName.get("persistent", "tenant-" + (i % 1000), "ns-" + (i % 10000), "my-topic-" + i)); + } + for (int i = 0; i < 100_000; i++) { + blackhole.consume( + TopicName.get("persistent", "tenant-" + (i % 1000), "ns-" + (i % 10000), "my-topic-" + i)); + } + } +} diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java b/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java new file mode 100644 index 0000000000000..f9843c1f09934 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; \ No newline at end of file From aea3e1d4541d4be984510335bab94481433d8e13 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 14:08:56 +0300 Subject: [PATCH 09/41] Add invalidateCache methods to TopicName and NamespaceName --- .../java/org/apache/pulsar/common/naming/NamespaceName.java | 4 ++++ .../main/java/org/apache/pulsar/common/naming/TopicName.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index f79c4bca75ef9..fc808ec320dfc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -52,6 +52,10 @@ public class NamespaceName implements ServiceUnitId { .scheduler(Scheduler.systemScheduler()) .build(name -> namespaceNameInterner.intern(new NamespaceName(name))); + public static void invalidateCache() { + cache.invalidateAll(); + } + public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); public static NamespaceName get(String tenant, String namespace) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 6f403e16eeee6..dc9282d26c18a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -67,6 +67,10 @@ public class TopicName implements ServiceUnitId { .scheduler(Scheduler.systemScheduler()) .build(name -> topicNameInterner.intern(new TopicName(name))); + public static void invalidateCache() { + cache.invalidateAll(); + } + public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); From db7c0f901c19cd081fae2273669cb93d90a66e0c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 14:09:22 +0300 Subject: [PATCH 10/41] Improve benchmark --- .../common/naming/TopicNameBenchmark.java | 75 ++++++++++++------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index a06c54a3a5e5e..b699c9477d112 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -23,55 +23,76 @@ import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; /** - * Running benchmarks on Linux is recommended due timer issues on MacOS. + * Benchmark TopicName.get performance. */ @Fork(3) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) public class TopicNameBenchmark { - @Threads(1) - @Benchmark - @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void topicCacheLookups001Threads(Blackhole blackhole) { - topicCacheLookups(blackhole); + @State(Scope.Thread) + public static class ColdLookupState { + private int counter = 0; + private String[] topicNames; + + @Setup(Level.Trial) + public void setup() { + topicNames = new String[100000]; + for (int i = 0; i < topicNames.length; i++) { + topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", + i % 100, i % 1000, i); + } + } + + @TearDown(Level.Iteration) + public void tearDown() { + TopicName.invalidateCache(); + NamespaceName.invalidateCache(); + } } - @Threads(10) @Benchmark - @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void topicCacheLookups010Threads(Blackhole blackhole) { - topicCacheLookups(blackhole); + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(1) + public TopicName coldTopicLookup001(ColdLookupState state) { + String topicName = state.topicNames[state.counter++ % state.topicNames.length]; + return TopicName.get(topicName); } - @Threads(100) @Benchmark - @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1) - public void topicCacheLookups100Threads(Blackhole blackhole) { - topicCacheLookups(blackhole); + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(10) + public TopicName coldTopicLookup010(ColdLookupState state) { + String topicName = state.topicNames[state.counter++ % state.topicNames.length]; + return TopicName.get(topicName); } - private void topicCacheLookups(Blackhole blackhole) { - for (int i = 0; i < 100_000; i++) { - blackhole.consume( - TopicName.get("persistent", "tenant-" + (i % 1000), "ns-" + (i % 10000), "my-topic-" + i)); - } - for (int i = 0; i < 100_000; i++) { - blackhole.consume( - TopicName.get("persistent", "tenant-" + (i % 1000), "ns-" + (i % 10000), "my-topic-" + i)); - } + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(100) + public TopicName coldTopicLookup100(ColdLookupState state) { + String topicName = state.topicNames[state.counter++ % state.topicNames.length]; + return TopicName.get(topicName); } } From a83960dd4e50fdb4e8875b5dbd0bf720d1dcbc75 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 14:42:55 +0300 Subject: [PATCH 11/41] Improve benchmark --- .../common/naming/TopicNameBenchmark.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index b699c9477d112..c4f2bfbf7e2b9 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -43,16 +43,15 @@ @State(Scope.Thread) public class TopicNameBenchmark { @State(Scope.Thread) - public static class ColdLookupState { - private int counter = 0; + public static class TestState { + private long counter = 0; private String[] topicNames; @Setup(Level.Trial) public void setup() { topicNames = new String[100000]; for (int i = 0; i < topicNames.length; i++) { - topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", - i % 100, i % 1000, i); + topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i); } } @@ -60,6 +59,12 @@ public void setup() { public void tearDown() { TopicName.invalidateCache(); NamespaceName.invalidateCache(); + counter = 0; + } + + public String getNextTopicName() { + String topicName = topicNames[(int) (counter++ % topicNames.length)]; + return topicName; } } @@ -69,9 +74,8 @@ public void tearDown() { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(1) - public TopicName coldTopicLookup001(ColdLookupState state) { - String topicName = state.topicNames[state.counter++ % state.topicNames.length]; - return TopicName.get(topicName); + public TopicName coldTopicLookup001(TestState state) { + return TopicName.get(state.getNextTopicName()); } @Benchmark @@ -80,9 +84,8 @@ public TopicName coldTopicLookup001(ColdLookupState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(10) - public TopicName coldTopicLookup010(ColdLookupState state) { - String topicName = state.topicNames[state.counter++ % state.topicNames.length]; - return TopicName.get(topicName); + public TopicName coldTopicLookup010(TestState state) { + return TopicName.get(state.getNextTopicName()); } @Benchmark @@ -91,8 +94,7 @@ public TopicName coldTopicLookup010(ColdLookupState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(100) - public TopicName coldTopicLookup100(ColdLookupState state) { - String topicName = state.topicNames[state.counter++ % state.topicNames.length]; - return TopicName.get(topicName); + public TopicName coldTopicLookup100(TestState state) { + return TopicName.get(state.getNextTopicName()); } } From 59725fed024814937c29d0077659bff94e893343 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 14:54:42 +0300 Subject: [PATCH 12/41] Test warm lookups to cache --- .../pulsar/common/naming/TopicNameBenchmark.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index c4f2bfbf7e2b9..761ab6c632d0f 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -27,6 +27,7 @@ import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; @@ -44,6 +45,8 @@ public class TopicNameBenchmark { @State(Scope.Thread) public static class TestState { + @Param({"false", "true"}) + private boolean invalidateCache; private long counter = 0; private String[] topicNames; @@ -57,8 +60,10 @@ public void setup() { @TearDown(Level.Iteration) public void tearDown() { - TopicName.invalidateCache(); - NamespaceName.invalidateCache(); + if (invalidateCache) { + TopicName.invalidateCache(); + NamespaceName.invalidateCache(); + } counter = 0; } @@ -74,7 +79,7 @@ public String getNextTopicName() { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(1) - public TopicName coldTopicLookup001(TestState state) { + public TopicName topicLookup001(TestState state) { return TopicName.get(state.getNextTopicName()); } @@ -84,7 +89,7 @@ public TopicName coldTopicLookup001(TestState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(10) - public TopicName coldTopicLookup010(TestState state) { + public TopicName topicLookup010(TestState state) { return TopicName.get(state.getNextTopicName()); } @@ -94,7 +99,7 @@ public TopicName coldTopicLookup010(TestState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(100) - public TopicName coldTopicLookup100(TestState state) { + public TopicName topicLookup100(TestState state) { return TopicName.get(state.getNextTopicName()); } } From 7f372a97dc4223cb729c88f8e5995b13017abfa7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 16:55:09 +0300 Subject: [PATCH 13/41] Refactor --- .../pulsar/common/naming/TopicName.java | 25 +-- .../pulsar/common/naming/TopicNameCache.java | 152 ++++++++++++++++++ 2 files changed, 155 insertions(+), 22 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index dc9282d26c18a..3c71f7106afd4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -18,17 +18,11 @@ */ package org.apache.pulsar.common.naming; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.base.Splitter; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.StringInterner; @@ -37,7 +31,6 @@ * Encapsulate the parsing of the completeTopicName name. */ public class TopicName implements ServiceUnitId { - public static final String PUBLIC_TENANT = "public"; public static final String DEFAULT_NAMESPACE = "default"; @@ -55,20 +48,8 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - // Deduplicates TopicName instances when the cached entry isn't in the actual cache. - // Holds weak references to TopicName so it won't prevent garbage collection. - private static final Interner topicNameInterner = Interners.newWeakInterner(); - // Cache for TopicName instances that uses Caffeine to provide fast access and expiration. - // Soft references allow the garbage collector to reclaim memory when needed. - private static final LoadingCache cache = Caffeine.newBuilder() - .softValues() - .maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES) - .scheduler(Scheduler.systemScheduler()) - .build(name -> topicNameInterner.intern(new TopicName(name))); - public static void invalidateCache() { - cache.invalidateAll(); + TopicNameCache.INSTANCE.invalidateCache(); } public static TopicName get(String domain, NamespaceName namespaceName, String topic) { @@ -88,7 +69,7 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - return cache.get(topic); + return TopicNameCache.INSTANCE.get(topic); } public static TopicName getPartitionedTopicName(String topic) { @@ -117,7 +98,7 @@ public static String getPattern(String topic) { } @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - private TopicName(String completeTopicName) { + TopicName(String completeTopicName) { try { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java new file mode 100644 index 0000000000000..3c67abcca37d2 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A cache for TopicName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused TopicName instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache. + */ +class TopicNameCache { + static final TopicNameCache INSTANCE = new TopicNameCache(); + private static int cacheMaxSize = 100000; + private static int reduceSizeByPercentage = 25; + + // Deduplicates TopicName instances when the cached entry isn't in the actual cache. + // Holds weak references to TopicName so it won't prevent garbage collection. + private final Interner topicNameInterner = Interners.newWeakInterner(); + // Cache for TopicName instances using ConcurrentHashMap and SoftReference to allow + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final LinkedHashSet keysInAddedOrder = new LinkedHashSet<>(); + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); + private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); + private static final long REFERENCE_QUEUE_PURGE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10); + + private static final class SoftReferenceTopicName extends SoftReference { + private final String topic; + + public SoftReferenceTopicName(String topic, TopicName referent, ReferenceQueue q) { + super(referent, q); + this.topic = topic; + } + + public String getTopic() { + return topic; + } + } + + public void invalidateCache() { + cache.clear(); + synchronized (keysInAddedOrder) { + keysInAddedOrder.clear(); + } + } + + public TopicName get(String topic) { + TopicName topicName = cache.computeIfAbsent(topic, __ -> { + return createSoftReferenceTopicName(topic); + }).get(); + // There has been a garbage collection and the soft reference has been cleared. + if (topicName == null) { + // remove the possible stale entry from the cache + topicName = cache.compute(topic, (key, existingRef) -> { + if (existingRef == null || existingRef.get() == null) { + return createSoftReferenceTopicName(key); + } + return existingRef; + }).get(); + } + // retry if the topicName is still null + if (topicName == null) { + return get(topic); + } + if (cacheShrinkNeeded.compareAndSet(true, false)) { + shrinkCacheSize(); + } + long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); + if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { + if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, + System.nanoTime() + REFERENCE_QUEUE_PURGE_INTERVAL_NANOS)) { + purgeReferenceQueue(); + } + } + return topicName; + } + + private SoftReferenceTopicName createSoftReferenceTopicName(String topic) { + TopicName topicName = topicNameInterner.intern(new TopicName(topic)); + synchronized (keysInAddedOrder) { + keysInAddedOrder.add(topic); + if (keysInAddedOrder.size() >= cacheMaxSize) { + cacheShrinkNeeded.compareAndSet(false, true); + } + } + return new SoftReferenceTopicName(topic, topicName, referenceQueue); + } + + private void shrinkCacheSize() { + synchronized (keysInAddedOrder) { + if (keysInAddedOrder.size() >= cacheMaxSize) { + // Reduce the cache size after reaching the maximum size + int reduceSizeBy = + keysInAddedOrder.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); + for (Iterator iterator = keysInAddedOrder.iterator(); iterator.hasNext(); ) { + if (reduceSizeBy == 0) { + break; + } + String oldestKey = iterator.next(); + SoftReferenceTopicName ref = cache.remove(oldestKey); + if (ref != null) { + ref.clear(); + } + iterator.remove(); + reduceSizeBy--; + } + } + } + } + + private void purgeReferenceQueue() { + // Clean up the reference queue to remove any cleared references + synchronized (keysInAddedOrder) { + while (true) { + SoftReferenceTopicName ref = (SoftReferenceTopicName) referenceQueue.poll(); + if (ref == null) { + break; + } + String topic = ref.getTopic(); + cache.remove(topic); + keysInAddedOrder.remove(topic); + } + } + } +} From 88fb5b632130723909f33ef53d2fca8bd1badd92 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 17:18:42 +0300 Subject: [PATCH 14/41] Remove keysInAddedOrder --- .../pulsar/common/naming/TopicNameCache.java | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 3c67abcca37d2..155b852992d9e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -23,7 +23,6 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -45,7 +44,6 @@ class TopicNameCache { private final Interner topicNameInterner = Interners.newWeakInterner(); // Cache for TopicName instances using ConcurrentHashMap and SoftReference to allow private final ConcurrentMap cache = new ConcurrentHashMap<>(); - private final LinkedHashSet keysInAddedOrder = new LinkedHashSet<>(); private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); @@ -66,9 +64,6 @@ public String getTopic() { public void invalidateCache() { cache.clear(); - synchronized (keysInAddedOrder) { - keysInAddedOrder.clear(); - } } public TopicName get(String topic) { @@ -104,49 +99,41 @@ public TopicName get(String topic) { private SoftReferenceTopicName createSoftReferenceTopicName(String topic) { TopicName topicName = topicNameInterner.intern(new TopicName(topic)); - synchronized (keysInAddedOrder) { - keysInAddedOrder.add(topic); - if (keysInAddedOrder.size() >= cacheMaxSize) { - cacheShrinkNeeded.compareAndSet(false, true); - } + if (cache.size() >= cacheMaxSize) { + cacheShrinkNeeded.compareAndSet(false, true); } return new SoftReferenceTopicName(topic, topicName, referenceQueue); } private void shrinkCacheSize() { - synchronized (keysInAddedOrder) { - if (keysInAddedOrder.size() >= cacheMaxSize) { - // Reduce the cache size after reaching the maximum size - int reduceSizeBy = - keysInAddedOrder.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); - for (Iterator iterator = keysInAddedOrder.iterator(); iterator.hasNext(); ) { - if (reduceSizeBy == 0) { - break; - } - String oldestKey = iterator.next(); - SoftReferenceTopicName ref = cache.remove(oldestKey); - if (ref != null) { - ref.clear(); - } - iterator.remove(); - reduceSizeBy--; + if (cache.size() >= cacheMaxSize) { + // Reduce the cache size after reaching the maximum size + int reduceSizeBy = + cache.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); + for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { + if (reduceSizeBy == 0) { + break; } + String oldestKey = iterator.next(); + SoftReferenceTopicName ref = cache.remove(oldestKey); + if (ref != null) { + ref.clear(); + } + iterator.remove(); + reduceSizeBy--; } } } private void purgeReferenceQueue() { // Clean up the reference queue to remove any cleared references - synchronized (keysInAddedOrder) { - while (true) { - SoftReferenceTopicName ref = (SoftReferenceTopicName) referenceQueue.poll(); - if (ref == null) { - break; - } - String topic = ref.getTopic(); - cache.remove(topic); - keysInAddedOrder.remove(topic); + while (true) { + SoftReferenceTopicName ref = (SoftReferenceTopicName) referenceQueue.poll(); + if (ref == null) { + break; } + String topic = ref.getTopic(); + cache.remove(topic); } } } From cf2a0d14b5861d296363cda9925036a899c961e0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 17:38:07 +0300 Subject: [PATCH 15/41] Add comment --- .../java/org/apache/pulsar/common/naming/TopicNameCache.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 155b852992d9e..ca944ef9ec43a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -32,7 +32,8 @@ /** * A cache for TopicName instances that allows deduplication and efficient memory usage. * It uses soft references to allow garbage collection of unused TopicName instances under heavy memory pressure. - * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for TopicName lookups. */ class TopicNameCache { static final TopicNameCache INSTANCE = new TopicNameCache(); From 07451c710188dcd0f5d2e09ec095af7fbe908cc2 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 17:41:17 +0300 Subject: [PATCH 16/41] Improve logic --- .../org/apache/pulsar/common/naming/TopicNameCache.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index ca944ef9ec43a..61d23a48e66b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -81,10 +81,6 @@ public TopicName get(String topic) { return existingRef; }).get(); } - // retry if the topicName is still null - if (topicName == null) { - return get(topic); - } if (cacheShrinkNeeded.compareAndSet(true, false)) { shrinkCacheSize(); } @@ -111,8 +107,10 @@ private void shrinkCacheSize() { // Reduce the cache size after reaching the maximum size int reduceSizeBy = cache.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); + // removes entries from the cache until the size is reduced + // this doesn't remove the oldest entries, but rather reduces the size by a percentage for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { - if (reduceSizeBy == 0) { + if (reduceSizeBy <= 0) { break; } String oldestKey = iterator.next(); From f1801ae896fb5a36a9f720650a67490844562b60 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 17:53:14 +0300 Subject: [PATCH 17/41] Improve logic --- .../pulsar/common/naming/TopicNameCache.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 61d23a48e66b8..b8005beb4c8fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.common.util.StringInterner; /** * A cache for TopicName instances that allows deduplication and efficient memory usage. @@ -50,6 +51,7 @@ class TopicNameCache { private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); private static final long REFERENCE_QUEUE_PURGE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10); + // Values are held as soft references to allow garbage collection when memory is low. private static final class SoftReferenceTopicName extends SoftReference { private final String topic; @@ -68,9 +70,18 @@ public void invalidateCache() { } public TopicName get(String topic) { - TopicName topicName = cache.computeIfAbsent(topic, __ -> { - return createSoftReferenceTopicName(topic); - }).get(); + // first do a quick lookup in the cache + TopicName topicName = cache.get(topic).get(); + if (topicName == null) { + // intern the topic name to deduplicate topic names used as keys + topic = StringInterner.intern(topic); + topicName = cache.computeIfAbsent(topic, key -> { + return createSoftReferenceTopicName(key); + }).get(); + if (cache.size() >= cacheMaxSize) { + cacheShrinkNeeded.compareAndSet(false, true); + } + } // There has been a garbage collection and the soft reference has been cleared. if (topicName == null) { // remove the possible stale entry from the cache @@ -80,7 +91,15 @@ public TopicName get(String topic) { } return existingRef; }).get(); + if (cache.size() >= cacheMaxSize) { + cacheShrinkNeeded.compareAndSet(false, true); + } } + doCacheMaintenance(); + return topicName; + } + + private void doCacheMaintenance() { if (cacheShrinkNeeded.compareAndSet(true, false)) { shrinkCacheSize(); } @@ -91,14 +110,10 @@ public TopicName get(String topic) { purgeReferenceQueue(); } } - return topicName; } private SoftReferenceTopicName createSoftReferenceTopicName(String topic) { TopicName topicName = topicNameInterner.intern(new TopicName(topic)); - if (cache.size() >= cacheMaxSize) { - cacheShrinkNeeded.compareAndSet(false, true); - } return new SoftReferenceTopicName(topic, topicName, referenceQueue); } @@ -107,8 +122,9 @@ private void shrinkCacheSize() { // Reduce the cache size after reaching the maximum size int reduceSizeBy = cache.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); - // removes entries from the cache until the size is reduced // this doesn't remove the oldest entries, but rather reduces the size by a percentage + // keeping the order of added entries would add more overhead and Caffeine Cache would be a better fit + // in that case. for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { if (reduceSizeBy <= 0) { break; @@ -125,7 +141,7 @@ private void shrinkCacheSize() { } private void purgeReferenceQueue() { - // Clean up the reference queue to remove any cleared references + // Clean up the reference queue to remove any references cleared by the garbage collector. while (true) { SoftReferenceTopicName ref = (SoftReferenceTopicName) referenceQueue.poll(); if (ref == null) { From d75d21369d22feb5f48392ee3a6e92ae162a2011 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 17:59:05 +0300 Subject: [PATCH 18/41] cache maxsize check --- .../org/apache/pulsar/common/naming/TopicNameCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index b8005beb4c8fa..ac0210085188c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -78,7 +78,7 @@ public TopicName get(String topic) { topicName = cache.computeIfAbsent(topic, key -> { return createSoftReferenceTopicName(key); }).get(); - if (cache.size() >= cacheMaxSize) { + if (cache.size() > cacheMaxSize) { cacheShrinkNeeded.compareAndSet(false, true); } } @@ -91,7 +91,7 @@ public TopicName get(String topic) { } return existingRef; }).get(); - if (cache.size() >= cacheMaxSize) { + if (cache.size() > cacheMaxSize) { cacheShrinkNeeded.compareAndSet(false, true); } } @@ -118,7 +118,7 @@ private SoftReferenceTopicName createSoftReferenceTopicName(String topic) { } private void shrinkCacheSize() { - if (cache.size() >= cacheMaxSize) { + if (cache.size() > cacheMaxSize) { // Reduce the cache size after reaching the maximum size int reduceSizeBy = cache.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); From 82c816756fbc116540a1a37dade02aacc170a61d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 18:02:52 +0300 Subject: [PATCH 19/41] Improve logic --- .../pulsar/common/naming/TopicNameCache.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index ac0210085188c..fb393b4d32712 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -71,20 +71,12 @@ public void invalidateCache() { public TopicName get(String topic) { // first do a quick lookup in the cache - TopicName topicName = cache.get(topic).get(); + SoftReferenceTopicName softReferenceTopicName = cache.get(topic); + TopicName topicName = softReferenceTopicName != null ? softReferenceTopicName.get() : null; if (topicName == null) { - // intern the topic name to deduplicate topic names used as keys + // intern the topic name to deduplicate topic names used as keys, since this will reduce heap memory usage topic = StringInterner.intern(topic); - topicName = cache.computeIfAbsent(topic, key -> { - return createSoftReferenceTopicName(key); - }).get(); - if (cache.size() > cacheMaxSize) { - cacheShrinkNeeded.compareAndSet(false, true); - } - } - // There has been a garbage collection and the soft reference has been cleared. - if (topicName == null) { - // remove the possible stale entry from the cache + // add new entry or replace the possible stale entry topicName = cache.compute(topic, (key, existingRef) -> { if (existingRef == null || existingRef.get() == null) { return createSoftReferenceTopicName(key); From 895e17617b0a217099a71dc5fb86fd72818bf339 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 18:27:20 +0300 Subject: [PATCH 20/41] Refactor both TopicName and NamespaceName to use the same generic cache implementation --- .../pulsar/common/naming/NameCache.java | 157 ++++++++++++++++++ .../pulsar/common/naming/NamespaceName.java | 26 +-- .../common/naming/NamespaceNameCache.java | 35 ++++ .../pulsar/common/naming/TopicNameCache.java | 123 +------------- 4 files changed, 202 insertions(+), 139 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java new file mode 100644 index 0000000000000..7d4634c5b4b45 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntSupplier; +import org.apache.pulsar.common.util.StringInterner; + +/** + * A cache for TopicName and NamespaceName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for TopicName lookups. + */ +abstract class NameCache { + private final IntSupplier cacheMaxSize; + private final IntSupplier reduceSizeByPercentage; + private final Function valueFactory; + + // Deduplicates TopicName instances when the cached entry isn't in the actual cache. + // Holds weak references to TopicName so it won't prevent garbage collection. + private final Interner valueInterner = Interners.newWeakInterner(); + // Cache for TopicName instances using ConcurrentHashMap and SoftReference to allow + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); + private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); + private static final long REFERENCE_QUEUE_PURGE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10); + + // Values are held as soft references to allow garbage collection when memory is low. + private final class SoftReferenceValue extends SoftReference { + private final String key; + + public SoftReferenceValue(String key, V referent, ReferenceQueue q) { + super(referent, q); + this.key = key; + } + + public String getKey() { + return key; + } + } + + NameCache(IntSupplier cacheMaxSize, IntSupplier reduceSizeByPercentage, Function valueFactory) { + this.cacheMaxSize = cacheMaxSize; + this.reduceSizeByPercentage = reduceSizeByPercentage; + this.valueFactory = valueFactory; + } + + public void invalidateCache() { + cache.clear(); + } + + public V getIfPresent(String keyParam) { + SoftReferenceValue softReferenceValue = cache.get(keyParam); + return softReferenceValue != null ? softReferenceValue.get() : null; + } + public V get(String keyParam) { + // first do a quick lookup in the cache + V valueInstance = getIfPresent(keyParam); + if (valueInstance == null) { + // intern the topic name to deduplicate topic names used as keys, since this will reduce heap memory usage + keyParam = StringInterner.intern(keyParam); + // add new entry or replace the possible stale entry + valueInstance = cache.compute(keyParam, (key, existingRef) -> { + if (existingRef == null || existingRef.get() == null) { + return createSoftReferenceValue(key); + } + return existingRef; + }).get(); + if (cache.size() > cacheMaxSize.getAsInt()) { + cacheShrinkNeeded.compareAndSet(false, true); + } + } + doCacheMaintenance(); + return valueInstance; + } + + private void doCacheMaintenance() { + if (cacheShrinkNeeded.compareAndSet(true, false)) { + shrinkCacheSize(); + } + long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); + if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { + if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, + System.nanoTime() + REFERENCE_QUEUE_PURGE_INTERVAL_NANOS)) { + purgeReferenceQueue(); + } + } + } + + private SoftReferenceValue createSoftReferenceValue(String key) { + V valueInstance = valueInterner.intern(valueFactory.apply(key)); + return new SoftReferenceValue(key, valueInstance, referenceQueue); + } + + private void shrinkCacheSize() { + int cacheMaxSizeAsInt = cacheMaxSize.getAsInt(); + if (cache.size() > cacheMaxSizeAsInt) { + // Reduce the cache size after reaching the maximum size + int reduceSizeBy = + cache.size() - (int) (cacheMaxSizeAsInt * ((100 - reduceSizeByPercentage.getAsInt()) / 100.0)); + // this doesn't remove the oldest entries, but rather reduces the size by a percentage + // keeping the order of added entries would add more overhead and Caffeine Cache would be a better fit + // in that case. + for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { + if (reduceSizeBy <= 0) { + break; + } + String oldestKey = iterator.next(); + SoftReferenceValue ref = cache.remove(oldestKey); + if (ref != null) { + ref.clear(); + } + iterator.remove(); + reduceSizeBy--; + } + } + } + + private void purgeReferenceQueue() { + // Clean up the reference queue to remove any references cleared by the garbage collector. + while (true) { + SoftReferenceValue ref = (SoftReferenceValue) referenceQueue.poll(); + if (ref == null) { + break; + } + cache.remove(ref.getKey()); + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index fc808ec320dfc..dccc4c4742fc5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -18,15 +18,9 @@ */ package org.apache.pulsar.common.naming; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.Scheduler; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; import org.apache.pulsar.common.util.StringInterner; /** @@ -40,20 +34,8 @@ public class NamespaceName implements ServiceUnitId { private final String cluster; private final String localName; - // Deduplicates NamespaceName instances when the cached entry isn't in the actual cache. - // Holds weak references to NamespaceName so it won't prevent garbage collection. - private static final Interner namespaceNameInterner = Interners.newWeakInterner(); - // Cache for NamespaceName instances that uses Caffeine to provide fast access and expiration. - // Soft references allow the garbage collector to reclaim memory when needed. - private static final LoadingCache cache = Caffeine.newBuilder() - .softValues() - .maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES) - .scheduler(Scheduler.systemScheduler()) - .build(name -> namespaceNameInterner.intern(new NamespaceName(name))); - public static void invalidateCache() { - cache.invalidateAll(); + NamespaceNameCache.INSTANCE.invalidateCache(); } public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); @@ -72,11 +54,11 @@ public static NamespaceName get(String namespace) { if (namespace == null || namespace.isEmpty()) { throw new IllegalArgumentException("Invalid null namespace: " + namespace); } - return cache.get(namespace); + return NamespaceNameCache.INSTANCE.get(namespace); } public static Optional getIfValid(String namespace) { - NamespaceName ns = cache.getIfPresent(namespace); + NamespaceName ns = NamespaceNameCache.INSTANCE.getIfPresent(namespace); if (ns != null) { return Optional.of(ns); } @@ -94,7 +76,7 @@ public static Optional getIfValid(String namespace) { } @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - private NamespaceName(String namespace) { + NamespaceName(String namespace) { // Verify it's a proper namespace // The namespace name is composed of / // or in the legacy format with the cluster name: diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java new file mode 100644 index 0000000000000..71ab795b6e987 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +/** + * An efficient cache for NamespaceName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused NamespaceName instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for cache lookups. + */ +class NamespaceNameCache extends NameCache { + static final NamespaceNameCache INSTANCE = new NamespaceNameCache(); + static int cacheMaxSize = 100000; + static int reduceSizeByPercentage = 25; + + NamespaceNameCache() { + super(() -> cacheMaxSize, () -> reduceSizeByPercentage, namespace -> new NamespaceName(namespace)); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index fb393b4d32712..cb236f5e0c19c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -18,129 +18,18 @@ */ package org.apache.pulsar.common.naming; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.SoftReference; -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.pulsar.common.util.StringInterner; - /** * A cache for TopicName instances that allows deduplication and efficient memory usage. * It uses soft references to allow garbage collection of unused TopicName instances under heavy memory pressure. * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache - * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for TopicName lookups. + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for cache lookups. */ -class TopicNameCache { +class TopicNameCache extends NameCache { static final TopicNameCache INSTANCE = new TopicNameCache(); - private static int cacheMaxSize = 100000; - private static int reduceSizeByPercentage = 25; - - // Deduplicates TopicName instances when the cached entry isn't in the actual cache. - // Holds weak references to TopicName so it won't prevent garbage collection. - private final Interner topicNameInterner = Interners.newWeakInterner(); - // Cache for TopicName instances using ConcurrentHashMap and SoftReference to allow - private final ConcurrentMap cache = new ConcurrentHashMap<>(); - private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); - private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); - private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); - private static final long REFERENCE_QUEUE_PURGE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10); - - // Values are held as soft references to allow garbage collection when memory is low. - private static final class SoftReferenceTopicName extends SoftReference { - private final String topic; - - public SoftReferenceTopicName(String topic, TopicName referent, ReferenceQueue q) { - super(referent, q); - this.topic = topic; - } - - public String getTopic() { - return topic; - } - } - - public void invalidateCache() { - cache.clear(); - } - - public TopicName get(String topic) { - // first do a quick lookup in the cache - SoftReferenceTopicName softReferenceTopicName = cache.get(topic); - TopicName topicName = softReferenceTopicName != null ? softReferenceTopicName.get() : null; - if (topicName == null) { - // intern the topic name to deduplicate topic names used as keys, since this will reduce heap memory usage - topic = StringInterner.intern(topic); - // add new entry or replace the possible stale entry - topicName = cache.compute(topic, (key, existingRef) -> { - if (existingRef == null || existingRef.get() == null) { - return createSoftReferenceTopicName(key); - } - return existingRef; - }).get(); - if (cache.size() > cacheMaxSize) { - cacheShrinkNeeded.compareAndSet(false, true); - } - } - doCacheMaintenance(); - return topicName; - } - - private void doCacheMaintenance() { - if (cacheShrinkNeeded.compareAndSet(true, false)) { - shrinkCacheSize(); - } - long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); - if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { - if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, - System.nanoTime() + REFERENCE_QUEUE_PURGE_INTERVAL_NANOS)) { - purgeReferenceQueue(); - } - } - } - - private SoftReferenceTopicName createSoftReferenceTopicName(String topic) { - TopicName topicName = topicNameInterner.intern(new TopicName(topic)); - return new SoftReferenceTopicName(topic, topicName, referenceQueue); - } - - private void shrinkCacheSize() { - if (cache.size() > cacheMaxSize) { - // Reduce the cache size after reaching the maximum size - int reduceSizeBy = - cache.size() - (int) (cacheMaxSize * ((100 - reduceSizeByPercentage) / 100.0)); - // this doesn't remove the oldest entries, but rather reduces the size by a percentage - // keeping the order of added entries would add more overhead and Caffeine Cache would be a better fit - // in that case. - for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { - if (reduceSizeBy <= 0) { - break; - } - String oldestKey = iterator.next(); - SoftReferenceTopicName ref = cache.remove(oldestKey); - if (ref != null) { - ref.clear(); - } - iterator.remove(); - reduceSizeBy--; - } - } - } + static int cacheMaxSize = 100000; + static int reduceSizeByPercentage = 25; - private void purgeReferenceQueue() { - // Clean up the reference queue to remove any references cleared by the garbage collector. - while (true) { - SoftReferenceTopicName ref = (SoftReferenceTopicName) referenceQueue.poll(); - if (ref == null) { - break; - } - String topic = ref.getTopic(); - cache.remove(topic); - } + TopicNameCache() { + super(() -> cacheMaxSize, () -> reduceSizeByPercentage, topic -> new TopicName(topic)); } } From 6ff60d3a092f0b02ae919314371f25b4780f07e5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 19:06:49 +0300 Subject: [PATCH 21/41] Remove redundant removal --- .../src/main/java/org/apache/pulsar/common/naming/NameCache.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index 7d4634c5b4b45..118e416a3761e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -138,7 +138,6 @@ private void shrinkCacheSize() { if (ref != null) { ref.clear(); } - iterator.remove(); reduceSizeBy--; } } From 7c3799947448b3766b23472597c5918f60fda250 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 19:07:22 +0300 Subject: [PATCH 22/41] Rename oldestKey -> key since keys are in hash order --- .../main/java/org/apache/pulsar/common/naming/NameCache.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index 118e416a3761e..a0b410d528fb5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -133,8 +133,8 @@ private void shrinkCacheSize() { if (reduceSizeBy <= 0) { break; } - String oldestKey = iterator.next(); - SoftReferenceValue ref = cache.remove(oldestKey); + String key = iterator.next(); + SoftReferenceValue ref = cache.remove(key); if (ref != null) { ref.clear(); } From 41b06ddc59e1f2d959767daf61dfb308c5b6cf8a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 20:51:22 +0300 Subject: [PATCH 23/41] Revisit test that used reflection to access the previous cache implementation --- .../broker/service/OneWayReplicatorTest.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 1244300378a8c..fb838e7efeabe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -50,7 +50,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,6 +67,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; @@ -89,8 +90,8 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -1296,10 +1297,10 @@ public void testReplicationCountMetrics() throws Exception { */ @Test public void testCloseTopicAfterStartReplicationFailed() throws Exception { - Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache"); + Field fieldTopicNameCache = + ClassUtils.getClass("org.apache.pulsar.common.naming.TopicNameCache").getDeclaredField("INSTANCE"); fieldTopicNameCache.setAccessible(true); - ConcurrentHashMap topicNameCache = - (ConcurrentHashMap) fieldTopicNameCache.get(null); + Object topicNameCache = fieldTopicNameCache.get(null); final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); // 1.Create topic, does not enable replication now. admin1.topics().createNonPartitionedTopic(topicName); @@ -1324,9 +1325,9 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception { // - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by // Replication again. Thread.sleep(10 * 1000); - topicNameCache.remove(topicName); + MethodUtils.invokeMethod(topicNameCache, "invalidateCache"); Thread.sleep(60 * 1000); - assertTrue(!topicNameCache.containsKey(topicName)); + assertNull(MethodUtils.invokeMethod(topicNameCache, "getIfPresent", topicName)); // cleanup. admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); From 98a58b68591ffbcf75fcfe7d15429bca63425fd8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 22:51:42 +0300 Subject: [PATCH 24/41] Intern "public" and "default" for consistency --- .../main/java/org/apache/pulsar/common/naming/TopicName.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 3c71f7106afd4..bb1923760a71e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -31,8 +31,8 @@ * Encapsulate the parsing of the completeTopicName name. */ public class TopicName implements ServiceUnitId { - public static final String PUBLIC_TENANT = "public"; - public static final String DEFAULT_NAMESPACE = "default"; + public static final String PUBLIC_TENANT = StringInterner.intern("public"); + public static final String DEFAULT_NAMESPACE = StringInterner.intern("default"); public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; From 097bf11b09416ae08a951a5757d5f857dd8ca0a4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 22:51:56 +0300 Subject: [PATCH 25/41] Add size method to NameCache --- .../main/java/org/apache/pulsar/common/naming/NameCache.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index a0b410d528fb5..d79423910a425 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -153,4 +153,8 @@ private void purgeReferenceQueue() { cache.remove(ref.getKey()); } } + + public int size() { + return cache.size(); + } } From 05bc206e51b1503c41b8e0286b5d3db3e9d1df78 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 22:58:21 +0300 Subject: [PATCH 26/41] Test shrinking the cache --- .../common/naming/TopicNameCacheTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java new file mode 100644 index 0000000000000..20d1b2e161314 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; + +public class TopicNameCacheTest { + + @Test + public void shrinkCache() { + // Test that the cache can shrink when the size exceeds the maximum limit + TopicNameCache cache = TopicNameCache.INSTANCE; + for (int i = 0; i < TopicNameCache.cacheMaxSize; i++) { + cache.get("persistent://tenant/namespace/topic" + i); + } + + // check that the cache size is at maximum + assertEquals(cache.size(), TopicNameCache.cacheMaxSize); + + // Add one more topic to trigger the cache shrink + cache.get("persistent://tenant/namespace/topic100101"); + + // The cache should have reduced its size by the configured percentage + assertThat(cache.size()).isEqualTo( + (int) (TopicNameCache.cacheMaxSize * ((100 - TopicNameCache.reduceSizeByPercentage) / 100.0))) + .as("Cache size should be reduced after adding an extra topic beyond the max size"); + } +} \ No newline at end of file From 74aa80b6d1be216d09e408a4c157928e8dbd6e0e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 23:11:53 +0300 Subject: [PATCH 27/41] Add test for soft reference handling --- .../apache/pulsar/common/naming/NameCache.java | 10 ++++++---- .../pulsar/common/naming/NamespaceNameCache.java | 6 +++++- .../pulsar/common/naming/TopicNameCache.java | 6 +++++- .../pulsar/common/naming/TopicNameCacheTest.java | 16 ++++++++++++++++ 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index d79423910a425..56f6c6c696d94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -25,11 +25,11 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.IntSupplier; +import java.util.function.LongSupplier; import org.apache.pulsar.common.util.StringInterner; /** @@ -41,6 +41,7 @@ abstract class NameCache { private final IntSupplier cacheMaxSize; private final IntSupplier reduceSizeByPercentage; + private final LongSupplier referenceQueuePurgeIntervalNanos; private final Function valueFactory; // Deduplicates TopicName instances when the cached entry isn't in the actual cache. @@ -51,7 +52,6 @@ abstract class NameCache { private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); - private static final long REFERENCE_QUEUE_PURGE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10); // Values are held as soft references to allow garbage collection when memory is low. private final class SoftReferenceValue extends SoftReference { @@ -67,9 +67,11 @@ public String getKey() { } } - NameCache(IntSupplier cacheMaxSize, IntSupplier reduceSizeByPercentage, Function valueFactory) { + NameCache(IntSupplier cacheMaxSize, IntSupplier reduceSizeByPercentage, + LongSupplier referenceQueuePurgeIntervalNanos, Function valueFactory) { this.cacheMaxSize = cacheMaxSize; this.reduceSizeByPercentage = reduceSizeByPercentage; + this.referenceQueuePurgeIntervalNanos = referenceQueuePurgeIntervalNanos; this.valueFactory = valueFactory; } @@ -109,7 +111,7 @@ private void doCacheMaintenance() { long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, - System.nanoTime() + REFERENCE_QUEUE_PURGE_INTERVAL_NANOS)) { + System.nanoTime() + referenceQueuePurgeIntervalNanos.getAsLong())) { purgeReferenceQueue(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java index 71ab795b6e987..22a758c797360 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.naming; +import java.util.concurrent.TimeUnit; + /** * An efficient cache for NamespaceName instances that allows deduplication and efficient memory usage. * It uses soft references to allow garbage collection of unused NamespaceName instances under heavy memory pressure. @@ -28,8 +30,10 @@ class NamespaceNameCache extends NameCache { static final NamespaceNameCache INSTANCE = new NamespaceNameCache(); static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; + static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); NamespaceNameCache() { - super(() -> cacheMaxSize, () -> reduceSizeByPercentage, namespace -> new NamespaceName(namespace)); + super(() -> cacheMaxSize, () -> reduceSizeByPercentage, () -> referenceQueuePurgeIntervalNanos, + namespace -> new NamespaceName(namespace)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index cb236f5e0c19c..0d448eba05715 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.naming; +import java.util.concurrent.TimeUnit; + /** * A cache for TopicName instances that allows deduplication and efficient memory usage. * It uses soft references to allow garbage collection of unused TopicName instances under heavy memory pressure. @@ -28,8 +30,10 @@ class TopicNameCache extends NameCache { static final TopicNameCache INSTANCE = new TopicNameCache(); static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; + static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); TopicNameCache() { - super(() -> cacheMaxSize, () -> reduceSizeByPercentage, topic -> new TopicName(topic)); + super(() -> cacheMaxSize, () -> reduceSizeByPercentage, () -> referenceQueuePurgeIntervalNanos, + topic -> new TopicName(topic)); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java index 20d1b2e161314..a038d4e67b7c2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.Test; public class TopicNameCacheTest { @@ -43,4 +45,18 @@ public void shrinkCache() { (int) (TopicNameCache.cacheMaxSize * ((100 - TopicNameCache.reduceSizeByPercentage) / 100.0))) .as("Cache size should be reduced after adding an extra topic beyond the max size"); } + + @Test + public void softReferenceHandling() { + TopicNameCache cache = TopicNameCache.INSTANCE; + TopicNameCache.cacheMaxSize = Integer.MAX_VALUE; + TopicNameCache.referenceQueuePurgeIntervalNanos = TimeUnit.MILLISECONDS.toNanos(10); + + for (int i = 0; i < 2_000_000; i++) { + cache.get("persistent://tenant/namespace/topic" + RandomStringUtils.randomAlphabetic(100)); + if (i % 100_000 == 0) { + System.out.println(i + " topics added to cache. Current size: " + cache.size()); + } + } + } } \ No newline at end of file From e9894b91bae451238344322e24953a7e1f33d96f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 23:15:34 +0300 Subject: [PATCH 28/41] Add comment about the cache configuration --- .../java/org/apache/pulsar/common/naming/NamespaceNameCache.java | 1 + .../java/org/apache/pulsar/common/naming/TopicNameCache.java | 1 + 2 files changed, 2 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java index 22a758c797360..13c17b5b70628 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -28,6 +28,7 @@ */ class NamespaceNameCache extends NameCache { static final NamespaceNameCache INSTANCE = new NamespaceNameCache(); + // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 0d448eba05715..ccc6090a1bf0e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -28,6 +28,7 @@ */ class TopicNameCache extends NameCache { static final TopicNameCache INSTANCE = new TopicNameCache(); + // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); From e4493d2414e473619939c9d9996daa72d9676dda Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 23:22:28 +0300 Subject: [PATCH 29/41] Improve comments --- .../org/apache/pulsar/common/naming/NameCache.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index 56f6c6c696d94..316269accec0e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -44,14 +44,18 @@ abstract class NameCache { private final LongSupplier referenceQueuePurgeIntervalNanos; private final Function valueFactory; - // Deduplicates TopicName instances when the cached entry isn't in the actual cache. - // Holds weak references to TopicName so it won't prevent garbage collection. - private final Interner valueInterner = Interners.newWeakInterner(); - // Cache for TopicName instances using ConcurrentHashMap and SoftReference to allow + // Cache instances using ConcurrentHashMap and SoftReference to allow garbage collection to clear unreferenced + // entries when heap memory is running low. private final ConcurrentMap cache = new ConcurrentHashMap<>(); + // Reference queue to hold cleared soft references, which will be used to remove entries from the cache. private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + // Flag to indicate if the cache size needs to be reduced. This is set when the cache exceeds the maximum size. private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); + // Next timestamp to run reference queue purging to remove cleared references. Handled when cache is accessed. private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); + // Deduplicates instances when the cached entry isn't in the actual cache. + // Holds weak references to the value so it won't prevent garbage collection. + private final Interner valueInterner = Interners.newWeakInterner(); // Values are held as soft references to allow garbage collection when memory is low. private final class SoftReferenceValue extends SoftReference { From 1aaf0a412e6a9ebf217be89a0d3d9b242f320f8f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 23:29:17 +0300 Subject: [PATCH 30/41] Refactor: replace functions with abstract methods in NameCache --- .../pulsar/common/naming/NameCache.java | 36 +++++++------------ .../common/naming/NamespaceNameCache.java | 21 +++++++++-- .../pulsar/common/naming/TopicNameCache.java | 21 +++++++++-- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index 316269accec0e..d9dfc59cb4130 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -22,14 +22,10 @@ import com.google.common.collect.Interners; import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; -import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; -import java.util.function.IntSupplier; -import java.util.function.LongSupplier; import org.apache.pulsar.common.util.StringInterner; /** @@ -39,11 +35,6 @@ * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for TopicName lookups. */ abstract class NameCache { - private final IntSupplier cacheMaxSize; - private final IntSupplier reduceSizeByPercentage; - private final LongSupplier referenceQueuePurgeIntervalNanos; - private final Function valueFactory; - // Cache instances using ConcurrentHashMap and SoftReference to allow garbage collection to clear unreferenced // entries when heap memory is running low. private final ConcurrentMap cache = new ConcurrentHashMap<>(); @@ -71,13 +62,13 @@ public String getKey() { } } - NameCache(IntSupplier cacheMaxSize, IntSupplier reduceSizeByPercentage, - LongSupplier referenceQueuePurgeIntervalNanos, Function valueFactory) { - this.cacheMaxSize = cacheMaxSize; - this.reduceSizeByPercentage = reduceSizeByPercentage; - this.referenceQueuePurgeIntervalNanos = referenceQueuePurgeIntervalNanos; - this.valueFactory = valueFactory; - } + protected abstract V createValue(String key); + + protected abstract int getCacheMaxSize(); + + protected abstract int getReduceSizeByPercentage(); + + protected abstract long getReferenceQueuePurgeIntervalNanos(); public void invalidateCache() { cache.clear(); @@ -100,7 +91,7 @@ public V get(String keyParam) { } return existingRef; }).get(); - if (cache.size() > cacheMaxSize.getAsInt()) { + if (cache.size() > getCacheMaxSize()) { cacheShrinkNeeded.compareAndSet(false, true); } } @@ -115,31 +106,30 @@ private void doCacheMaintenance() { long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, - System.nanoTime() + referenceQueuePurgeIntervalNanos.getAsLong())) { + System.nanoTime() + getReferenceQueuePurgeIntervalNanos())) { purgeReferenceQueue(); } } } private SoftReferenceValue createSoftReferenceValue(String key) { - V valueInstance = valueInterner.intern(valueFactory.apply(key)); + V valueInstance = valueInterner.intern(createValue(key)); return new SoftReferenceValue(key, valueInstance, referenceQueue); } private void shrinkCacheSize() { - int cacheMaxSizeAsInt = cacheMaxSize.getAsInt(); + int cacheMaxSizeAsInt = getCacheMaxSize(); if (cache.size() > cacheMaxSizeAsInt) { // Reduce the cache size after reaching the maximum size int reduceSizeBy = - cache.size() - (int) (cacheMaxSizeAsInt * ((100 - reduceSizeByPercentage.getAsInt()) / 100.0)); + cache.size() - (int) (cacheMaxSizeAsInt * ((100 - getReduceSizeByPercentage()) / 100.0)); // this doesn't remove the oldest entries, but rather reduces the size by a percentage // keeping the order of added entries would add more overhead and Caffeine Cache would be a better fit // in that case. - for (Iterator iterator = cache.keySet().iterator(); iterator.hasNext(); ) { + for (String key : cache.keySet()) { if (reduceSizeBy <= 0) { break; } - String key = iterator.next(); SoftReferenceValue ref = cache.remove(key); if (ref != null) { ref.clear(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java index 13c17b5b70628..c50331faba12f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -33,8 +33,23 @@ class NamespaceNameCache extends NameCache { static int reduceSizeByPercentage = 25; static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); - NamespaceNameCache() { - super(() -> cacheMaxSize, () -> reduceSizeByPercentage, () -> referenceQueuePurgeIntervalNanos, - namespace -> new NamespaceName(namespace)); + @Override + protected NamespaceName createValue(String key) { + return new NamespaceName(key); + } + + @Override + protected int getCacheMaxSize() { + return cacheMaxSize; + } + + @Override + protected int getReduceSizeByPercentage() { + return reduceSizeByPercentage; + } + + @Override + protected long getReferenceQueuePurgeIntervalNanos() { + return referenceQueuePurgeIntervalNanos; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index ccc6090a1bf0e..5973645677ecf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -33,8 +33,23 @@ class TopicNameCache extends NameCache { static int reduceSizeByPercentage = 25; static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); - TopicNameCache() { - super(() -> cacheMaxSize, () -> reduceSizeByPercentage, () -> referenceQueuePurgeIntervalNanos, - topic -> new TopicName(topic)); + @Override + protected TopicName createValue(String key) { + return new TopicName(key); + } + + @Override + protected int getCacheMaxSize() { + return cacheMaxSize; + } + + @Override + protected int getReduceSizeByPercentage() { + return reduceSizeByPercentage; + } + + @Override + protected long getReferenceQueuePurgeIntervalNanos() { + return referenceQueuePurgeIntervalNanos; } } From 86bdc301d7a59d9594d933d504d4388921cc5d53 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Jun 2025 23:40:12 +0300 Subject: [PATCH 31/41] Reset cache settings to defaults after the test --- .../common/naming/TopicNameCacheTest.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java index a038d4e67b7c2..32c2fe45964c2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java @@ -48,15 +48,23 @@ public void shrinkCache() { @Test public void softReferenceHandling() { - TopicNameCache cache = TopicNameCache.INSTANCE; - TopicNameCache.cacheMaxSize = Integer.MAX_VALUE; - TopicNameCache.referenceQueuePurgeIntervalNanos = TimeUnit.MILLISECONDS.toNanos(10); + int defaultCacheMaxSize = TopicNameCache.cacheMaxSize; + long defaultPurgeIntervalNanos = TopicNameCache.referenceQueuePurgeIntervalNanos; + try { + TopicNameCache.cacheMaxSize = Integer.MAX_VALUE; + TopicNameCache.referenceQueuePurgeIntervalNanos = TimeUnit.MILLISECONDS.toNanos(10); - for (int i = 0; i < 2_000_000; i++) { - cache.get("persistent://tenant/namespace/topic" + RandomStringUtils.randomAlphabetic(100)); - if (i % 100_000 == 0) { - System.out.println(i + " topics added to cache. Current size: " + cache.size()); + TopicNameCache cache = TopicNameCache.INSTANCE; + for (int i = 0; i < 2_000_000; i++) { + cache.get("persistent://tenant/namespace/topic" + RandomStringUtils.randomAlphabetic(100)); + if (i % 100_000 == 0) { + System.out.println(i + " topics added to cache. Current size: " + cache.size()); + } } + } finally { + // Reset the cache settings to default after the test + TopicNameCache.cacheMaxSize = defaultCacheMaxSize; + TopicNameCache.referenceQueuePurgeIntervalNanos = defaultPurgeIntervalNanos; } } } \ No newline at end of file From d3a002e6dcb6952355358bd11a9f0f7335938617 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 00:29:41 +0300 Subject: [PATCH 32/41] Remove duplicate check calling NamedEntity.checkName --- .../apache/pulsar/common/naming/NamespaceName.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index dccc4c4742fc5..2380162dcba10 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -41,12 +41,19 @@ public static void invalidateCache() { public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); public static NamespaceName get(String tenant, String namespace) { - validateNamespaceName(tenant, namespace); + if ((tenant == null || tenant.isEmpty()) || (namespace == null || namespace.isEmpty())) { + throw new IllegalArgumentException( + String.format("Invalid namespace format. namespace: %s/%s", tenant, namespace)); + } return get(tenant + '/' + namespace); } public static NamespaceName get(String tenant, String cluster, String namespace) { - validateNamespaceName(tenant, cluster, namespace); + if ((tenant == null || tenant.isEmpty()) || (cluster == null || cluster.isEmpty()) + || (namespace == null || namespace.isEmpty())) { + throw new IllegalArgumentException( + String.format("Invalid namespace format. namespace: %s/%s/%s", tenant, cluster, namespace)); + } return get(tenant + '/' + cluster + '/' + namespace); } From 73abc0b594f45277349246cb4faec0ae9ead96a1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 10:10:01 +0300 Subject: [PATCH 33/41] Improve benchmark, add test with strong references to avoid soft references being collected --- .../common/naming/TopicNameBenchmark.java | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index 761ab6c632d0f..37b7065836693 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -19,7 +19,10 @@ package org.apache.pulsar.common.naming; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -34,30 +37,51 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.runner.IterationType; /** * Benchmark TopicName.get performance. */ -@Fork(3) +@Fork(value = 3, jvmArgs = {"-Xms200M", "-Xmx200M", "-XX:+UseG1GC"}) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) public class TopicNameBenchmark { @State(Scope.Thread) public static class TestState { + public static final int MAX_TOPICS = 100000; + public static final int PAUSE_MILLIS_BEFORE_MEASUREMENT = 5000; @Param({"false", "true"}) private boolean invalidateCache; + @Param({"false", "true"}) + private boolean strongReferences; + // Used to hold strong references to TopicName objects when strongReferences is true. + // This is to prevent them from being garbage collected during the benchmark since the cache holds soft refs. + private List strongTopicNameReferences = new ArrayList<>(); private long counter = 0; private String[] topicNames; @Setup(Level.Trial) public void setup() { - topicNames = new String[100000]; + topicNames = new String[MAX_TOPICS]; for (int i = 0; i < topicNames.length; i++) { topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i); } } + private static final AtomicBoolean paused = new AtomicBoolean(false); + + @Setup(Level.Iteration) + public void pauseBetweenWarmupAndMeasurement(IterationParams params) throws InterruptedException { + if (params.getType() == IterationType.MEASUREMENT && paused.compareAndSet(false, true)) { + System.out.println("Pausing before starting measurement iterations..."); + // pause to allow JIT compilation to happen before measurement starts + Thread.sleep(PAUSE_MILLIS_BEFORE_MEASUREMENT); + System.out.println("Starting measurement iterations..."); + } + } + @TearDown(Level.Iteration) public void tearDown() { if (invalidateCache) { @@ -68,7 +92,14 @@ public void tearDown() { } public String getNextTopicName() { - String topicName = topicNames[(int) (counter++ % topicNames.length)]; + return topicNames[(int) (counter++ % topicNames.length)]; + } + + public TopicName runTest() { + TopicName topicName = TopicName.get(getNextTopicName()); + if (strongReferences) { + strongTopicNameReferences.add(topicName); + } return topicName; } } @@ -80,7 +111,7 @@ public String getNextTopicName() { @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(1) public TopicName topicLookup001(TestState state) { - return TopicName.get(state.getNextTopicName()); + return state.runTest(); } @Benchmark @@ -90,7 +121,7 @@ public TopicName topicLookup001(TestState state) { @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(10) public TopicName topicLookup010(TestState state) { - return TopicName.get(state.getNextTopicName()); + return state.runTest(); } @Benchmark @@ -100,6 +131,6 @@ public TopicName topicLookup010(TestState state) { @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(100) public TopicName topicLookup100(TestState state) { - return TopicName.get(state.getNextTopicName()); + return state.runTest(); } } From c4da22317dcc3c111c602e661ae184cd00746df8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:10:24 +0300 Subject: [PATCH 34/41] Fix benchmark --- .../common/naming/TopicNameBenchmark.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index 37b7065836693..6b96316074169 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -19,8 +19,6 @@ package org.apache.pulsar.common.naming; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.openjdk.jmh.annotations.Benchmark; @@ -43,24 +41,25 @@ /** * Benchmark TopicName.get performance. */ -@Fork(value = 3, jvmArgs = {"-Xms200M", "-Xmx200M", "-XX:+UseG1GC"}) +@Fork(value = 3, jvmArgs = {"-Xms2g", "-Xmx2g", "-XX:+UseG1GC"}) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) public class TopicNameBenchmark { - @State(Scope.Thread) - public static class TestState { - public static final int MAX_TOPICS = 100000; + public static final int MAX_TOPICS = 100000; + + @State(Scope.Benchmark) + public static class BenchmarkState { public static final int PAUSE_MILLIS_BEFORE_MEASUREMENT = 5000; + private static final AtomicBoolean paused = new AtomicBoolean(false); @Param({"false", "true"}) private boolean invalidateCache; + private String[] topicNames; @Param({"false", "true"}) private boolean strongReferences; // Used to hold strong references to TopicName objects when strongReferences is true. // This is to prevent them from being garbage collected during the benchmark since the cache holds soft refs. - private List strongTopicNameReferences = new ArrayList<>(); - private long counter = 0; - private String[] topicNames; + private TopicName[] strongTopicNameReferences; @Setup(Level.Trial) public void setup() { @@ -68,10 +67,14 @@ public void setup() { for (int i = 0; i < topicNames.length; i++) { topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i); } + if (strongReferences) { + strongTopicNameReferences = new TopicName[MAX_TOPICS]; + for (int i = 0; i < topicNames.length; i++) { + strongTopicNameReferences[i] = TopicName.get(topicNames[i]); + } + } } - private static final AtomicBoolean paused = new AtomicBoolean(false); - @Setup(Level.Iteration) public void pauseBetweenWarmupAndMeasurement(IterationParams params) throws InterruptedException { if (params.getType() == IterationType.MEASUREMENT && paused.compareAndSet(false, true)) { @@ -88,19 +91,24 @@ public void tearDown() { TopicName.invalidateCache(); NamespaceName.invalidateCache(); } - counter = 0; } - public String getNextTopicName() { - return topicNames[(int) (counter++ % topicNames.length)]; + public String getNextTopicName(long counter) { + return topicNames[(int) (counter % topicNames.length)]; } + } - public TopicName runTest() { - TopicName topicName = TopicName.get(getNextTopicName()); - if (strongReferences) { - strongTopicNameReferences.add(topicName); - } - return topicName; + @State(Scope.Thread) + public static class TestState { + private long counter = 0; + + @TearDown(Level.Iteration) + public void tearDown() { + counter = 0; + } + + public TopicName runTest(BenchmarkState benchmarkState) { + return TopicName.get(benchmarkState.getNextTopicName(counter++)); } } @@ -110,8 +118,8 @@ public TopicName runTest() { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(1) - public TopicName topicLookup001(TestState state) { - return state.runTest(); + public TopicName topicLookup001(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); } @Benchmark @@ -120,8 +128,8 @@ public TopicName topicLookup001(TestState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(10) - public TopicName topicLookup010(TestState state) { - return state.runTest(); + public TopicName topicLookup010(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); } @Benchmark @@ -130,7 +138,7 @@ public TopicName topicLookup010(TestState state) { @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @Threads(100) - public TopicName topicLookup100(TestState state) { - return state.runTest(); + public TopicName topicLookup100(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); } } From eb8510cda94537f2b895a997d7fb4aea28fca11f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:25:18 +0300 Subject: [PATCH 35/41] Improve async-profiler example --- microbench/README.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/microbench/README.md b/microbench/README.md index 5c69c3bba819a..d316162bef6fd 100644 --- a/microbench/README.md +++ b/microbench/README.md @@ -70,9 +70,12 @@ java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp Profiling benchmarks with [async-profiler](https://github.com/async-profiler/async-profiler): ```shell -# example of profiling with async-profiler -# download async-profiler from https://github.com/async-profiler/async-profiler/releases +# example of profiling with async-profiler 4.0 +# download async-profiler 4.0 from https://github.com/async-profiler/async-profiler/releases +# macos LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.dylib -java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results ".*BenchmarkName.*" +# linux +LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.so +java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results\;rawCommand=cstack=vmx ".*BenchmarkName.*" ``` From 12cb96b0eca8a9a9a0154d6894e92ff076b8d9fc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:26:31 +0300 Subject: [PATCH 36/41] Reduce max heap for benchmark --- .../org/apache/pulsar/common/naming/TopicNameBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index 6b96316074169..810335e5bd878 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -41,7 +41,7 @@ /** * Benchmark TopicName.get performance. */ -@Fork(value = 3, jvmArgs = {"-Xms2g", "-Xmx2g", "-XX:+UseG1GC"}) +@Fork(value = 3, jvmArgs = {"-Xms500m", "-Xmx500m", "-XX:+UseG1GC"}) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) From d63212d82f449a1cba1ea3c559cec1991670451a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:28:37 +0300 Subject: [PATCH 37/41] Invalidate caches when after adding strong references --- .../org/apache/pulsar/common/naming/TopicNameBenchmark.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index 810335e5bd878..5fb7ec4a4e3b3 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -72,6 +72,10 @@ public void setup() { for (int i = 0; i < topicNames.length; i++) { strongTopicNameReferences[i] = TopicName.get(topicNames[i]); } + if (invalidateCache) { + TopicName.invalidateCache(); + NamespaceName.invalidateCache(); + } } } From 3f24353c2c0dd3382f71d1109351c2f3ac58a489 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:32:22 +0300 Subject: [PATCH 38/41] Replace invalidateCache and strongReferences with TestMode enum --- .../common/naming/TopicNameBenchmark.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java index 5fb7ec4a4e3b3..535fc39e5f931 100644 --- a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -48,15 +48,20 @@ public class TopicNameBenchmark { public static final int MAX_TOPICS = 100000; + public enum TestMode { + basic, + invalidateCache, + strongReferences + } + + @State(Scope.Benchmark) public static class BenchmarkState { public static final int PAUSE_MILLIS_BEFORE_MEASUREMENT = 5000; private static final AtomicBoolean paused = new AtomicBoolean(false); - @Param({"false", "true"}) - private boolean invalidateCache; + @Param + private TestMode testMode; private String[] topicNames; - @Param({"false", "true"}) - private boolean strongReferences; // Used to hold strong references to TopicName objects when strongReferences is true. // This is to prevent them from being garbage collected during the benchmark since the cache holds soft refs. private TopicName[] strongTopicNameReferences; @@ -67,15 +72,11 @@ public void setup() { for (int i = 0; i < topicNames.length; i++) { topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i); } - if (strongReferences) { + if (testMode == TestMode.strongReferences) { strongTopicNameReferences = new TopicName[MAX_TOPICS]; for (int i = 0; i < topicNames.length; i++) { strongTopicNameReferences[i] = TopicName.get(topicNames[i]); } - if (invalidateCache) { - TopicName.invalidateCache(); - NamespaceName.invalidateCache(); - } } } @@ -91,7 +92,7 @@ public void pauseBetweenWarmupAndMeasurement(IterationParams params) throws Inte @TearDown(Level.Iteration) public void tearDown() { - if (invalidateCache) { + if (testMode == TestMode.invalidateCache) { TopicName.invalidateCache(); NamespaceName.invalidateCache(); } From 12a6b7c312c9e55e1beebcf02ec204754becb7d9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 11:43:30 +0300 Subject: [PATCH 39/41] Address review comment --- .../main/java/org/apache/pulsar/common/naming/NameCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index d9dfc59cb4130..3b734cbc6dff6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -92,7 +92,7 @@ public V get(String keyParam) { return existingRef; }).get(); if (cache.size() > getCacheMaxSize()) { - cacheShrinkNeeded.compareAndSet(false, true); + cacheShrinkNeeded.set(true); } } doCacheMaintenance(); From 10e1d601ceb54c0442c73065ebe6dca72fa17c43 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 12:52:18 +0300 Subject: [PATCH 40/41] Optimize performance of running maintenance tasks by using System.currentTimeMillis instead of nanoTime - postpone shrinking the cache until the maintenance task runs --- .../pulsar/common/naming/NameCache.java | 21 ++++++++++--------- .../common/naming/NamespaceNameCache.java | 6 +++--- .../pulsar/common/naming/TopicNameCache.java | 6 +++--- .../common/naming/TopicNameCacheTest.java | 7 +++---- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java index 3b734cbc6dff6..b801fe9f4564a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -42,8 +42,8 @@ abstract class NameCache { private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); // Flag to indicate if the cache size needs to be reduced. This is set when the cache exceeds the maximum size. private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); - // Next timestamp to run reference queue purging to remove cleared references. Handled when cache is accessed. - private final AtomicLong nextReferenceQueuePurge = new AtomicLong(); + // Next timestamp to run cache maintenance. Handled when cache is accessed. + private final AtomicLong nextCacheMaintenance = new AtomicLong(); // Deduplicates instances when the cached entry isn't in the actual cache. // Holds weak references to the value so it won't prevent garbage collection. private final Interner valueInterner = Interners.newWeakInterner(); @@ -68,7 +68,7 @@ public String getKey() { protected abstract int getReduceSizeByPercentage(); - protected abstract long getReferenceQueuePurgeIntervalNanos(); + protected abstract long getMaintenanceTaskIntervalMillis(); public void invalidateCache() { cache.clear(); @@ -100,13 +100,14 @@ public V get(String keyParam) { } private void doCacheMaintenance() { - if (cacheShrinkNeeded.compareAndSet(true, false)) { - shrinkCacheSize(); - } - long localNextReferenceQueuePurge = nextReferenceQueuePurge.get(); - if (localNextReferenceQueuePurge == 0 || System.nanoTime() > localNextReferenceQueuePurge) { - if (nextReferenceQueuePurge.compareAndSet(localNextReferenceQueuePurge, - System.nanoTime() + getReferenceQueuePurgeIntervalNanos())) { + long localNextCacheMaintenance = nextCacheMaintenance.get(); + long now = System.currentTimeMillis(); + if (now > localNextCacheMaintenance) { + if (cacheShrinkNeeded.compareAndSet(true, false)) { + shrinkCacheSize(); + } + if (nextCacheMaintenance.compareAndSet(localNextCacheMaintenance, + now + getMaintenanceTaskIntervalMillis())) { purgeReferenceQueue(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java index c50331faba12f..ec21150b54a9b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -31,7 +31,7 @@ class NamespaceNameCache extends NameCache { // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; - static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(10); @Override protected NamespaceName createValue(String key) { @@ -49,7 +49,7 @@ protected int getReduceSizeByPercentage() { } @Override - protected long getReferenceQueuePurgeIntervalNanos() { - return referenceQueuePurgeIntervalNanos; + protected long getMaintenanceTaskIntervalMillis() { + return cacheMaintenanceTaskIntervalMillis; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 5973645677ecf..8d42a6c6cc114 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -31,7 +31,7 @@ class TopicNameCache extends NameCache { // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; - static long referenceQueuePurgeIntervalNanos = TimeUnit.SECONDS.toNanos(10); + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(10); @Override protected TopicName createValue(String key) { @@ -49,7 +49,7 @@ protected int getReduceSizeByPercentage() { } @Override - protected long getReferenceQueuePurgeIntervalNanos() { - return referenceQueuePurgeIntervalNanos; + protected long getMaintenanceTaskIntervalMillis() { + return cacheMaintenanceTaskIntervalMillis; } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java index 32c2fe45964c2..65c6d7003234f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.Test; @@ -49,10 +48,10 @@ public void shrinkCache() { @Test public void softReferenceHandling() { int defaultCacheMaxSize = TopicNameCache.cacheMaxSize; - long defaultPurgeIntervalNanos = TopicNameCache.referenceQueuePurgeIntervalNanos; + long defaultCacheMaintenceTaskIntervalMillis = TopicNameCache.cacheMaintenanceTaskIntervalMillis; try { TopicNameCache.cacheMaxSize = Integer.MAX_VALUE; - TopicNameCache.referenceQueuePurgeIntervalNanos = TimeUnit.MILLISECONDS.toNanos(10); + TopicNameCache.cacheMaintenanceTaskIntervalMillis = 10L; TopicNameCache cache = TopicNameCache.INSTANCE; for (int i = 0; i < 2_000_000; i++) { @@ -64,7 +63,7 @@ public void softReferenceHandling() { } finally { // Reset the cache settings to default after the test TopicNameCache.cacheMaxSize = defaultCacheMaxSize; - TopicNameCache.referenceQueuePurgeIntervalNanos = defaultPurgeIntervalNanos; + TopicNameCache.cacheMaintenanceTaskIntervalMillis = defaultCacheMaintenceTaskIntervalMillis; } } } \ No newline at end of file From 75ce52794451fed4581c890df4b15e03e213dde6 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 25 Jun 2025 12:53:25 +0300 Subject: [PATCH 41/41] Reduce default maintenance task interval to 5 seconds --- .../org/apache/pulsar/common/naming/NamespaceNameCache.java | 2 +- .../java/org/apache/pulsar/common/naming/TopicNameCache.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java index ec21150b54a9b..06552344a1622 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -31,7 +31,7 @@ class NamespaceNameCache extends NameCache { // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; - static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(10); + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(5); @Override protected NamespaceName createValue(String key) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java index 8d42a6c6cc114..cd076336721ae 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -31,7 +31,7 @@ class TopicNameCache extends NameCache { // Configuration for the cache. These settings aren't currently exposed to end users. static int cacheMaxSize = 100000; static int reduceSizeByPercentage = 25; - static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(10); + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(5); @Override protected TopicName createValue(String key) {