From c52b6f52ac7cf715aebf447acc868f69e1c76185 Mon Sep 17 00:00:00 2001 From: xiangying Date: Fri, 20 Mar 2026 11:52:25 +0800 Subject: [PATCH 1/6] [improve][common] Optimize TopicName.get() to reduce lock contention on cache lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation `TopicName.get()` previously used `ConcurrentHashMap.computeIfAbsent()` to populate the topic-name cache. Although `computeIfAbsent` is atomic, it holds the internal bin-lock for the entire duration of the mapping function, which includes the non-trivial `TopicName` construction (string splitting, validation, etc.). Under high-concurrency workloads where many threads simultaneously encounter the same uncached topic name, this causes unnecessary lock contention and can degrade throughput. ### Modifications Replace `computeIfAbsent` with an explicit two-step pattern: 1. **Fast path**: call `cache.get(topic)` first — a single volatile read with no locking — and return immediately on a cache hit (steady-state case). 2. **Slow path** (cache miss): construct `TopicName` *outside* the lock, then use `cache.putIfAbsent()` to insert. If two threads race on the same key, one wins the `putIfAbsent` and the other's instance is discarded; this is safe because `TopicName` is immutable. Add a Javadoc comment on `get()` explaining the rationale. --- .../pulsar/common/naming/TopicName.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index a1c7055a8972c..4728e00d29446 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -71,12 +71,32 @@ public static TopicName get(String domain, String tenant, String namespace, Stri return TopicName.get(name); } + /** + * Get or create a TopicName from the cache. + * + *

Optimization over {@code computeIfAbsent}: avoids holding the ConcurrentHashMap bin-lock + * while constructing a new TopicName object. The construction (string splitting / parsing) is + * pure CPU work and can be done outside the lock. In the typical steady-state (cache hit) this + * method does a single volatile read via {@code get()} and returns immediately with no + * synchronization overhead. + * + *

In the cache-miss case, two threads racing on the same key may both construct a + * {@code TopicName} instance, but only one wins the {@code putIfAbsent} and the loser's + * instance is simply discarded. This is safe because {@code TopicName} is immutable and + * construction is cheap compared to the lock-contention / context-switch cost of + * {@code computeIfAbsent}. + */ public static TopicName get(String topic) { + // Fast path: already cached — single volatile read, no lock. TopicName tp = cache.get(topic); if (tp != null) { return tp; } - return cache.computeIfAbsent(topic, k -> new TopicName(k)); + // Slow path: construct outside the bin-lock to avoid blocking other threads. + TopicName newTp = new TopicName(topic); + TopicName existing = cache.putIfAbsent(topic, newTp); + // If another thread raced us and already inserted, use its instance (keeps identity stable). + return existing != null ? existing : newTp; } public static TopicName getPartitionedTopicName(String topic) { From 7e4af0dee5f220e6e6d14896be77eb8655f0e4d2 Mon Sep 17 00:00:00 2001 From: xiangying Date: Mon, 13 Apr 2026 15:29:20 +0800 Subject: [PATCH 2/6] just call cache .put() instead of putIfAbsent() --- .../main/java/org/apache/pulsar/common/naming/TopicName.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index fbb378383586f..2b7cb380bd873 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -100,7 +100,7 @@ public static TopicName get(String topic) { } // Slow path: construct outside the bin-lock to avoid blocking other threads. TopicName newTp = new TopicName(topic); - TopicName existing = cache.putIfAbsent(topic, newTp); + TopicName existing = cache.put(topic, newTp); // If another thread raced us and already inserted, use its instance (keeps identity stable). return existing != null ? existing : newTp; } From e923046d0de7645e50b424fe045d0f6970a9d528 Mon Sep 17 00:00:00 2001 From: xiangying Date: Mon, 20 Apr 2026 16:46:03 +0800 Subject: [PATCH 3/6] simply code comments --- .../pulsar/common/naming/TopicName.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 2b7cb380bd873..83d694add6762 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -77,31 +77,16 @@ public static TopicName get(String domain, String tenant, String namespace, Stri return TopicName.get(name); } - /** - * Get or create a TopicName from the cache. - * - *

Optimization over {@code computeIfAbsent}: avoids holding the ConcurrentHashMap bin-lock - * while constructing a new TopicName object. The construction (string splitting / parsing) is - * pure CPU work and can be done outside the lock. In the typical steady-state (cache hit) this - * method does a single volatile read via {@code get()} and returns immediately with no - * synchronization overhead. - * - *

In the cache-miss case, two threads racing on the same key may both construct a - * {@code TopicName} instance, but only one wins the {@code putIfAbsent} and the loser's - * instance is simply discarded. This is safe because {@code TopicName} is immutable and - * construction is cheap compared to the lock-contention / context-switch cost of - * {@code computeIfAbsent}. - */ public static TopicName get(String topic) { // Fast path: already cached — single volatile read, no lock. TopicName tp = cache.get(topic); if (tp != null) { return tp; } - // Slow path: construct outside the bin-lock to avoid blocking other threads. + // Use get()+put() instead of computeIfAbsent() to keep construction outside the bin-lock. + // Duplicate instances from racing threads are safe to discard since TopicName is immutable. TopicName newTp = new TopicName(topic); TopicName existing = cache.put(topic, newTp); - // If another thread raced us and already inserted, use its instance (keeps identity stable). return existing != null ? existing : newTp; } From 010b8a89bccdfe9db3fbea5dbbe49355e5c6ea94 Mon Sep 17 00:00:00 2001 From: xiangying Date: Mon, 20 Apr 2026 17:13:49 +0800 Subject: [PATCH 4/6] JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance --- .../broker/naming/TopicNameGetBenchmark.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java diff --git a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java new file mode 100644 index 0000000000000..3709c50ed23a2 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.naming; + +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.naming.TopicName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance. + * + *

Each invocation calls {@code TopicName.get()} 1,000,000 times with distinct topic + * strings, forcing every call to go through the full parse-and-construct path. + * + *

Run with: + *

+ *   ./gradlew :microbench:shadowJar
+ *   java -jar microbench/build/libs/microbench-*-benchmarks.jar TopicNameGetBenchmark
+ * 
+ */ +@Fork(3) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +public class TopicNameGetBenchmark { + + private static final int TOPIC_COUNT = 1_000_000; + + private String[] topics; + + @Setup(Level.Invocation) + public void prepare() { + // Pre-generate 1M distinct topic strings (string-concat cost excluded from measurement). + topics = new String[TOPIC_COUNT]; + for (int i = 0; i < TOPIC_COUNT; i++) { + topics[i] = "persistent://public/default/topic-" + i; + } + // Clear cache to ensure a cold start. + TopicName.clearIfReachedMaxCapacity(0); + } + + @Benchmark + public void coldStartGet(Blackhole bh) { + for (int i = 0; i < TOPIC_COUNT; i++) { + bh.consume(TopicName.get(topics[i])); + } + } +} From 8accc71358232c9bcf95562eafa9627318847d39 Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 21 Apr 2026 14:20:03 +0800 Subject: [PATCH 5/6] improve jmh with more threads --- .../broker/naming/TopicNameGetBenchmark.java | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java index 3709c50ed23a2..74188e16d3aa9 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/TopicNameGetBenchmark.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.naming; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.common.naming.TopicName; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -26,18 +27,23 @@ import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; /** - * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance. + * JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance + * under 50-thread contention. * - *

Each invocation calls {@code TopicName.get()} 1,000,000 times with distinct topic - * strings, forcing every call to go through the full parse-and-construct path. + *

Uses {@code Mode.SingleShotTime} with {@code @Fork(10)} to measure + * the total time of a fixed batch of cold-start calls. No cache clearing is + * needed — each fork is a fresh JVM with an empty cache, and the batch size + * is bounded to avoid OOM. * *

Run with: *

@@ -45,32 +51,37 @@
  *   java -jar microbench/build/libs/microbench-*-benchmarks.jar TopicNameGetBenchmark
  * 
*/ -@Fork(3) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@Warmup(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(10) +@BenchmarkMode(Mode.SingleShotTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 2) +@Measurement(iterations = 5) +@Threads(50) @State(Scope.Thread) public class TopicNameGetBenchmark { - private static final int TOPIC_COUNT = 1_000_000; + /** + * Each thread processes 10,000 unique topics per invocation. + * 50 threads × 10,000 = 500,000 total entries per invocation — well within memory. + */ + private static final int BATCH_SIZE = 10_000; + private static final AtomicInteger COUNTER = new AtomicInteger(); private String[] topics; @Setup(Level.Invocation) public void prepare() { - // Pre-generate 1M distinct topic strings (string-concat cost excluded from measurement). - topics = new String[TOPIC_COUNT]; - for (int i = 0; i < TOPIC_COUNT; i++) { - topics[i] = "persistent://public/default/topic-" + i; + int base = COUNTER.getAndAdd(BATCH_SIZE); + topics = new String[BATCH_SIZE]; + for (int i = 0; i < BATCH_SIZE; i++) { + topics[i] = "persistent://public/default/topic-" + (base + i); } - // Clear cache to ensure a cold start. - TopicName.clearIfReachedMaxCapacity(0); } @Benchmark + @OperationsPerInvocation(BATCH_SIZE) public void coldStartGet(Blackhole bh) { - for (int i = 0; i < TOPIC_COUNT; i++) { + for (int i = 0; i < BATCH_SIZE; i++) { bh.consume(TopicName.get(topics[i])); } } From c87108f63556d3518392c73a175caa9e1ca04114 Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 21 Apr 2026 15:23:23 +0800 Subject: [PATCH 6/6] package-info.java file --- .../pulsar/broker/naming/package-info.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java diff --git a/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java new file mode 100644 index 0000000000000..f655bf0490db8 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/naming/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Benchmarks for {@link org.apache.pulsar.common.naming.TopicName#get(String)} cold-start (cache-miss) performance + */ +package org.apache.pulsar.broker.naming; \ No newline at end of file