From 933bcac92a403b745642a5cb0a82730148443965 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Jun 2025 22:19:32 +0800 Subject: [PATCH 1/5] [fix][proxy] Fix proxy OOM by using a topic cache per channel --- .../pulsar/common/api/raw/MessageParser.java | 19 +++- .../pulsar/common/naming/TopicNameUtils.java | 104 ++++++++++++++++++ .../pulsar/common/naming/TopicNameTest.java | 37 ++++++- .../proxy/server/LookupProxyHandler.java | 14 ++- .../proxy/server/ParserProxyHandler.java | 21 ++-- 5 files changed, 178 insertions(+), 17 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java index af516fa75342c..0e9aae4603d25 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java @@ -55,11 +55,17 @@ public interface MessageProcessor { void process(RawMessage message) throws IOException; } + @Deprecated + public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + MessageProcessor processor, int maxMessageSize) throws IOException { + parseMessage(topicName.toString(), ledgerId, entryId, headersAndPayload, processor, maxMessageSize); + } + /** * Parse a raw Pulsar entry payload and extract all the individual message that may be included in the batch. The * provided {@link MessageProcessor} will be invoked for each individual message. */ - public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + public static void parseMessage(String topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, MessageProcessor processor, int maxMessageSize) throws IOException { ByteBuf payload = headersAndPayload; ByteBuf uncompressedPayload = null; @@ -117,7 +123,7 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId } } - public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { + public static boolean verifyChecksum(String topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { if (hasChecksum(headersAndPayload)) { int checksum = readChecksum(headersAndPayload); int computedChecksum = computeChecksum(headersAndPayload); @@ -132,7 +138,14 @@ public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, return true; } - public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata, + @Deprecated + public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, MessageMetadata msgMetadata, + ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { + return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata, payload, ledgerId, entryId, + maxMessageSize); + } + + public static ByteBuf uncompressPayloadIfNeeded(String topic, MessageMetadata msgMetadata, ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); int uncompressedSize = msgMetadata.getUncompressedSize(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java new file mode 100644 index 0000000000000..62ade0ee13e00 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.StringUtils; + +/** + * {@link TopicName} is over heavy in many simple cases. This util class provides many methods to perform topic name + * conversions quickly without a global cache. + */ +public class TopicNameUtils { + + /** + * Convert a topic name to a full topic name. + * In Pulsar, a full topic name is ":////" (v2) or + * "://///" (v1). However, for convenient, it's allowed for clients + * to pass a short topic name with v2 format: + * - "", which represents "persistent://public/default/" + * - "//, which represents "persistent:////" + * + * @param topic the topic name from client + * @return the full topic name. + */ + public static String toFullTopicName(String topic) { + final int index = topic.indexOf("://"); + if (index > 0) { + TopicDomain.getEnum(topic.substring(0, index)); + final List parts = splitBySlash(topic, 4); + if (parts.size() != 3 && parts.size() != 4) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 3) { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2)); + if (StringUtils.isBlank(parts.get(3))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } + return topic; // it's a valid full topic name + } else { + List parts = splitBySlash(topic, 0); + if (parts.size() != 1 && parts.size() != 3) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 1) { + if (StringUtils.isBlank(parts.get(0))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://public/default/" + parts.get(0); + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://" + topic; + } + } + } + + private static List splitBySlash(String topic, int limit) { + final List tokens = new ArrayList<>(3); + final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1; + int beginIndex = 0; + for (int i = 0; i < loopCount; i++) { + final int endIndex = topic.indexOf('/', beginIndex); + if (endIndex < 0) { + tokens.add(topic.substring(beginIndex)); + return tokens; + } else if (endIndex > beginIndex) { + tokens.add(topic.substring(beginIndex, endIndex)); + } else { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + beginIndex = endIndex + 1; + } + if (beginIndex >= topic.length()) { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + tokens.add(topic.substring(beginIndex)); + return tokens; + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 27eb82d15af0d..2e7920b9f0ce2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import org.apache.pulsar.common.util.Codec; @@ -52,9 +53,8 @@ public void topic() { assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(), "persistent://tenant/cluster/namespace/topic"); - - assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"), - "persistent://tenant/cluster/namespace/topic"); + assertEquals(TopicNameUtils.toFullTopicName("persistent://tenant/cluster/namespace/topic"), + "persistent://tenant/cluster/namespace/topic"); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(), TopicDomain.persistent); @@ -103,6 +103,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("://tenant.namespace:my-topic")); try { TopicName.get("://tenant.namespace"); @@ -110,6 +112,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("://tenant.namespace")); try { TopicName.get("invalid://tenant/cluster/namespace/topic"); @@ -117,6 +120,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("invalid://tenant/cluster/namespace/topic")); try { TopicName.get("tenant/cluster/namespace/topic"); @@ -124,6 +129,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("tenant/cluster/namespace/topic")); try { TopicName.get("persistent:///cluster/namespace/mydest-1"); @@ -131,6 +138,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("persistent:///cluster/namespace/mydest-1")); try { TopicName.get("persistent://pulsar//namespace/mydest-1"); @@ -138,6 +147,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("persistent://pulsar//namespace/mydest-1")); try { TopicName.get("persistent://pulsar/cluster//mydest-1"); @@ -145,6 +156,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("persistent://pulsar/cluster//mydest-1")); try { TopicName.get("persistent://pulsar/cluster/namespace/"); @@ -152,6 +165,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("persistent://pulsar/cluster/namespace/")); try { TopicName.get("://pulsar/cluster/namespace/"); @@ -159,6 +174,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("://pulsar/cluster/namespace/")); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic") .getPersistenceNamingEncoding(), "tenant/cluster/namespace/persistent/topic"); @@ -169,6 +186,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("://tenant.namespace")); try { TopicName.get("://tenant/cluster/namespace"); @@ -176,6 +194,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicNameUtils.toFullTopicName("://tenant//cluster/namespace")); try { TopicName.get(" "); @@ -183,6 +203,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName(" ")); TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1"); assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1")); @@ -344,4 +365,14 @@ public void testTwoKeyWordPartition(){ assertNotEquals(tp2.toString(), tp1.toString()); assertEquals(tp2.toString(), "persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0"); } + + @Test + public void testToFullTopicName() { + // There is no constraint for local topic name + assertEquals("persistent://public/default/tp???xx=", TopicNameUtils.toFullTopicName("tp???xx=")); + assertEquals("persistent://tenant/ns/tp???xx=", TopicNameUtils.toFullTopicName("tenant/ns/tp???xx=")); + assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("ns/topic")); + // v1 format is not supported when the domain is not included + assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("tenant/cluster/ns/topic")); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 75109883f9888..fe98f99be91f4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -25,6 +25,8 @@ import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Semaphore; import org.apache.commons.lang3.StringUtils; @@ -36,7 +38,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicNameUtils; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -85,6 +87,9 @@ public class LookupProxyHandler { .create().register(); private final Semaphore lookupRequestSemaphore; + // Maps the topic name from the request to the full topic name + private final Map topicNameCache = new HashMap<>(); + public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { this.discoveryProvider = proxy.getDiscoveryProvider(); this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore(); @@ -221,7 +226,8 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { - TopicName topicName = TopicName.get(partitionMetadata.getTopic()); + String topicName = topicNameCache.computeIfAbsent(partitionMetadata.getTopic(), + TopicNameUtils::toFullTopicName); String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { @@ -235,7 +241,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, - topicName.getPartitionedTopicName(), clientRequestId); + topicName, clientRequestId); } proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { // Connected to backend broker @@ -245,7 +251,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par partitionMetadata.isMetadataAutoCreationEnabled()); clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { if (t != null) { - log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), + log.warn("[{}] failed to get Partitioned metadata : {}", topicName, t.getMessage(), t); PulsarClientException pce = PulsarClientException.unwrap(t); writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce), diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 22957c9599f18..6b14c1971c93f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,7 +36,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicNameUtils; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { private final int maxMessageSize; private final ChannelId peerChannelId; private final ProxyService service; + // Maps the topic name from the request to the full topic name + private final Map topicNameCache = new HashMap<>(); /** @@ -101,7 +104,8 @@ private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List messages = new ArrayList<>(); ByteBuf buffer = (ByteBuf) (msg); @@ -130,8 +134,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," - + ctx.channel().id())); + key = ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," + + ctx.channel().id()); + topicName = topicNameCache.computeIfAbsent(key, TopicNameUtils::toFullTopicName); MutableLong msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> { @@ -139,7 +144,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { msgBytes.add(message.getData().readableBytes()); }, maxMessageSize); // update topic stats - TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), + TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName, topic -> new TopicStats()); topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); logging(ctx.channel(), cmd.getType(), "", messages); @@ -158,8 +163,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() - + "," + peerChannelId)); + key = ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() + "," + + peerChannelId); + topicName = topicNameCache.computeIfAbsent(key, TopicNameUtils::toFullTopicName); + msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> { From 2018066cf742383a7fa9e3c937f3a7a0c6806363 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Jun 2025 22:58:14 +0800 Subject: [PATCH 2/5] Avoid splitting the topic when it starts with :// --- .../java/org/apache/pulsar/common/naming/TopicNameUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java index 62ade0ee13e00..7ddc08e5662e7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java @@ -41,7 +41,7 @@ public class TopicNameUtils { */ public static String toFullTopicName(String topic) { final int index = topic.indexOf("://"); - if (index > 0) { + if (index >= 0) { TopicDomain.getEnum(topic.substring(0, index)); final List parts = splitBySlash(topic, 4); if (parts.size() != 3 && parts.size() != 4) { From 71ca6742c6f1dfd8ca10bfa4c1f5e1b01cf8a932 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 26 Jun 2025 10:53:19 +0800 Subject: [PATCH 3/5] Fix incorrect implementation --- .../java/org/apache/pulsar/common/naming/TopicNameUtils.java | 2 +- .../java/org/apache/pulsar/common/naming/TopicNameTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java index 7ddc08e5662e7..37561c8ca771d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java @@ -43,7 +43,7 @@ public static String toFullTopicName(String topic) { final int index = topic.indexOf("://"); if (index >= 0) { TopicDomain.getEnum(topic.substring(0, index)); - final List parts = splitBySlash(topic, 4); + final List parts = splitBySlash(topic.substring(index + "://".length()), 4); if (parts.size() != 3 && parts.size() != 4) { throw new IllegalArgumentException(topic + " is invalid"); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 2e7920b9f0ce2..91c356531d39d 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -371,6 +371,8 @@ public void testToFullTopicName() { // There is no constraint for local topic name assertEquals("persistent://public/default/tp???xx=", TopicNameUtils.toFullTopicName("tp???xx=")); assertEquals("persistent://tenant/ns/tp???xx=", TopicNameUtils.toFullTopicName("tenant/ns/tp???xx=")); + assertEquals("persistent://tenant/ns/test", + TopicNameUtils.toFullTopicName("persistent://tenant/ns/test")); assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("ns/topic")); // v1 format is not supported when the domain is not included assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("tenant/cluster/ns/topic")); From 161f6f8a25edad2192be4a9eca2539a65e11d498 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 26 Jun 2025 11:07:09 +0800 Subject: [PATCH 4/5] Remove cache --- .../pulsar/proxy/server/LookupProxyHandler.java | 8 +------- .../pulsar/proxy/server/ParserProxyHandler.java | 13 ++++--------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index fe98f99be91f4..3b529b5eac5f4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -25,8 +25,6 @@ import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.Semaphore; import org.apache.commons.lang3.StringUtils; @@ -87,9 +85,6 @@ public class LookupProxyHandler { .create().register(); private final Semaphore lookupRequestSemaphore; - // Maps the topic name from the request to the full topic name - private final Map topicNameCache = new HashMap<>(); - public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { this.discoveryProvider = proxy.getDiscoveryProvider(); this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore(); @@ -226,8 +221,7 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { - String topicName = topicNameCache.computeIfAbsent(partitionMetadata.getTopic(), - TopicNameUtils::toFullTopicName); + String topicName = TopicNameUtils.toFullTopicName(partitionMetadata.getTopic()); String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 6b14c1971c93f..df05daf485932 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,8 +54,6 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { private final int maxMessageSize; private final ChannelId peerChannelId; private final ProxyService service; - // Maps the topic name from the request to the full topic name - private final Map topicNameCache = new HashMap<>(); /** @@ -134,9 +131,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - key = ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," - + ctx.channel().id()); - topicName = topicNameCache.computeIfAbsent(key, TopicNameUtils::toFullTopicName); + topicName = TopicNameUtils.toFullTopicName(ParserProxyHandler.producerHashMap.get( + cmd.getSend().getProducerId() + "," + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> { @@ -163,9 +159,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - key = ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() + "," - + peerChannelId); - topicName = topicNameCache.computeIfAbsent(key, TopicNameUtils::toFullTopicName); + topicName = TopicNameUtils.toFullTopicName(ParserProxyHandler.consumerHashMap.get( + cmd.getMessage().getConsumerId() + "," + peerChannelId)); msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, From 83d9e829d84e325101a2e6ae0b8660438b436e61 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 26 Jun 2025 16:36:11 +0800 Subject: [PATCH 5/5] Move toFullTopicName to TopicName --- .../pulsar/common/naming/TopicName.java | 75 +++++++++++++ .../pulsar/common/naming/TopicNameUtils.java | 104 ------------------ .../pulsar/common/naming/TopicNameTest.java | 41 +++---- .../proxy/server/LookupProxyHandler.java | 4 +- .../proxy/server/ParserProxyHandler.java | 6 +- 5 files changed, 98 insertions(+), 132 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b2f96bfe6e259..4d9b28df91be1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -21,6 +21,7 @@ import com.google.common.base.Splitter; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -442,4 +443,78 @@ public boolean includes(TopicName otherTopicName) { public boolean isV2() { return cluster == null; } + + /** + * Convert a topic name to a full topic name. + * In Pulsar, a full topic name is ":////" (v2) or + * "://///" (v1). However, for convenient, it's allowed for clients + * to pass a short topic name with v2 format: + * - "", which represents "persistent://public/default/" + * - "//, which represents "persistent:////" + * + * @param topic the topic name from client + * @return the full topic name. + */ + public static String toFullTopicName(String topic) { + final int index = topic.indexOf("://"); + if (index >= 0) { + TopicDomain.getEnum(topic.substring(0, index)); + final List parts = splitBySlash(topic.substring(index + "://".length()), 4); + if (parts.size() != 3 && parts.size() != 4) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 3) { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2)); + if (StringUtils.isBlank(parts.get(3))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } + return topic; // it's a valid full topic name + } else { + List parts = splitBySlash(topic, 0); + if (parts.size() != 1 && parts.size() != 3) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 1) { + if (StringUtils.isBlank(parts.get(0))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://public/default/" + parts.get(0); + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://" + topic; + } + } + } + + private static List splitBySlash(String topic, int limit) { + final List tokens = new ArrayList<>(3); + final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1; + int beginIndex = 0; + for (int i = 0; i < loopCount; i++) { + final int endIndex = topic.indexOf('/', beginIndex); + if (endIndex < 0) { + tokens.add(topic.substring(beginIndex)); + return tokens; + } else if (endIndex > beginIndex) { + tokens.add(topic.substring(beginIndex, endIndex)); + } else { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + beginIndex = endIndex + 1; + } + if (beginIndex >= topic.length()) { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + tokens.add(topic.substring(beginIndex)); + return tokens; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java deleted file mode 100644 index 37561c8ca771d..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameUtils.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.naming; - -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.lang3.StringUtils; - -/** - * {@link TopicName} is over heavy in many simple cases. This util class provides many methods to perform topic name - * conversions quickly without a global cache. - */ -public class TopicNameUtils { - - /** - * Convert a topic name to a full topic name. - * In Pulsar, a full topic name is ":////" (v2) or - * "://///" (v1). However, for convenient, it's allowed for clients - * to pass a short topic name with v2 format: - * - "", which represents "persistent://public/default/" - * - "//, which represents "persistent:////" - * - * @param topic the topic name from client - * @return the full topic name. - */ - public static String toFullTopicName(String topic) { - final int index = topic.indexOf("://"); - if (index >= 0) { - TopicDomain.getEnum(topic.substring(0, index)); - final List parts = splitBySlash(topic.substring(index + "://".length()), 4); - if (parts.size() != 3 && parts.size() != 4) { - throw new IllegalArgumentException(topic + " is invalid"); - } - if (parts.size() == 3) { - NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); - if (StringUtils.isBlank(parts.get(2))) { - throw new IllegalArgumentException(topic + " has blank local topic"); - } - } else { - NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2)); - if (StringUtils.isBlank(parts.get(3))) { - throw new IllegalArgumentException(topic + " has blank local topic"); - } - } - return topic; // it's a valid full topic name - } else { - List parts = splitBySlash(topic, 0); - if (parts.size() != 1 && parts.size() != 3) { - throw new IllegalArgumentException(topic + " is invalid"); - } - if (parts.size() == 1) { - if (StringUtils.isBlank(parts.get(0))) { - throw new IllegalArgumentException(topic + " has blank local topic"); - } - return "persistent://public/default/" + parts.get(0); - } else { - NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); - if (StringUtils.isBlank(parts.get(2))) { - throw new IllegalArgumentException(topic + " has blank local topic"); - } - return "persistent://" + topic; - } - } - } - - private static List splitBySlash(String topic, int limit) { - final List tokens = new ArrayList<>(3); - final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1; - int beginIndex = 0; - for (int i = 0; i < loopCount; i++) { - final int endIndex = topic.indexOf('/', beginIndex); - if (endIndex < 0) { - tokens.add(topic.substring(beginIndex)); - return tokens; - } else if (endIndex > beginIndex) { - tokens.add(topic.substring(beginIndex, endIndex)); - } else { - throw new IllegalArgumentException("Invalid topic name " + topic); - } - beginIndex = endIndex + 1; - } - if (beginIndex >= topic.length()) { - throw new IllegalArgumentException("Invalid topic name " + topic); - } - tokens.add(topic.substring(beginIndex)); - return tokens; - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 91c356531d39d..bb4798fca4617 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -53,7 +53,7 @@ public void topic() { assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(), "persistent://tenant/cluster/namespace/topic"); - assertEquals(TopicNameUtils.toFullTopicName("persistent://tenant/cluster/namespace/topic"), + assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"), "persistent://tenant/cluster/namespace/topic"); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(), @@ -103,8 +103,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("://tenant.namespace:my-topic")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace:my-topic")); try { TopicName.get("://tenant.namespace"); @@ -112,7 +111,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("://tenant.namespace")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("invalid://tenant/cluster/namespace/topic"); @@ -121,7 +120,7 @@ public void topic() { // Ok } assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("invalid://tenant/cluster/namespace/topic")); + () -> TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic")); try { TopicName.get("tenant/cluster/namespace/topic"); @@ -129,8 +128,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("tenant/cluster/namespace/topic")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/namespace/topic")); try { TopicName.get("persistent:///cluster/namespace/mydest-1"); @@ -139,7 +137,7 @@ public void topic() { // Ok } assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("persistent:///cluster/namespace/mydest-1")); + () -> TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1")); try { TopicName.get("persistent://pulsar//namespace/mydest-1"); @@ -148,7 +146,7 @@ public void topic() { // Ok } assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("persistent://pulsar//namespace/mydest-1")); + () -> TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1")); try { TopicName.get("persistent://pulsar/cluster//mydest-1"); @@ -157,7 +155,7 @@ public void topic() { // Ok } assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("persistent://pulsar/cluster//mydest-1")); + () -> TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1")); try { TopicName.get("persistent://pulsar/cluster/namespace/"); @@ -166,7 +164,7 @@ public void topic() { // Ok } assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("persistent://pulsar/cluster/namespace/")); + () -> TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/")); try { TopicName.get("://pulsar/cluster/namespace/"); @@ -174,8 +172,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("://pulsar/cluster/namespace/")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://pulsar/cluster/namespace/")); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic") .getPersistenceNamingEncoding(), "tenant/cluster/namespace/persistent/topic"); @@ -186,7 +183,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("://tenant.namespace")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("://tenant/cluster/namespace"); @@ -194,8 +191,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, - () -> TopicNameUtils.toFullTopicName("://tenant//cluster/namespace")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant//cluster/namespace")); try { TopicName.get(" "); @@ -203,7 +199,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } - assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName(" ")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName(" ")); TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1"); assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1")); @@ -369,12 +365,11 @@ public void testTwoKeyWordPartition(){ @Test public void testToFullTopicName() { // There is no constraint for local topic name - assertEquals("persistent://public/default/tp???xx=", TopicNameUtils.toFullTopicName("tp???xx=")); - assertEquals("persistent://tenant/ns/tp???xx=", TopicNameUtils.toFullTopicName("tenant/ns/tp???xx=")); - assertEquals("persistent://tenant/ns/test", - TopicNameUtils.toFullTopicName("persistent://tenant/ns/test")); - assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("ns/topic")); + assertEquals("persistent://public/default/tp???xx=", TopicName.toFullTopicName("tp???xx=")); + assertEquals("persistent://tenant/ns/tp???xx=", TopicName.toFullTopicName("tenant/ns/tp???xx=")); + assertEquals("persistent://tenant/ns/test", TopicName.toFullTopicName("persistent://tenant/ns/test")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("ns/topic")); // v1 format is not supported when the domain is not included - assertThrows(IllegalArgumentException.class, () -> TopicNameUtils.toFullTopicName("tenant/cluster/ns/topic")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/ns/topic")); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 3b529b5eac5f4..c38e2ba08c16c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -36,7 +36,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.naming.TopicNameUtils; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -221,7 +221,7 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { - String topicName = TopicNameUtils.toFullTopicName(partitionMetadata.getTopic()); + String topicName = TopicName.toFullTopicName(partitionMetadata.getTopic()); String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index df05daf485932..3a98311eb15e7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -35,7 +35,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; -import org.apache.pulsar.common.naming.TopicNameUtils; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +131,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicNameUtils.toFullTopicName(ParserProxyHandler.producerHashMap.get( + topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get( cmd.getSend().getProducerId() + "," + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, @@ -159,7 +159,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicNameUtils.toFullTopicName(ParserProxyHandler.consumerHashMap.get( + topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get( cmd.getMessage().getConsumerId() + "," + peerChannelId)); msgBytes = new MutableLong(0);