diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java index af516fa75342c..0e9aae4603d25 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java @@ -55,11 +55,17 @@ public interface MessageProcessor { void process(RawMessage message) throws IOException; } + @Deprecated + public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + MessageProcessor processor, int maxMessageSize) throws IOException { + parseMessage(topicName.toString(), ledgerId, entryId, headersAndPayload, processor, maxMessageSize); + } + /** * Parse a raw Pulsar entry payload and extract all the individual message that may be included in the batch. The * provided {@link MessageProcessor} will be invoked for each individual message. */ - public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + public static void parseMessage(String topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, MessageProcessor processor, int maxMessageSize) throws IOException { ByteBuf payload = headersAndPayload; ByteBuf uncompressedPayload = null; @@ -117,7 +123,7 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId } } - public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { + public static boolean verifyChecksum(String topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { if (hasChecksum(headersAndPayload)) { int checksum = readChecksum(headersAndPayload); int computedChecksum = computeChecksum(headersAndPayload); @@ -132,7 +138,14 @@ public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, return true; } - public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata, + @Deprecated + public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, MessageMetadata msgMetadata, + ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { + return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata, payload, ledgerId, entryId, + maxMessageSize); + } + + public static ByteBuf uncompressPayloadIfNeeded(String topic, MessageMetadata msgMetadata, ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); int uncompressedSize = msgMetadata.getUncompressedSize(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b2f96bfe6e259..4d9b28df91be1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -21,6 +21,7 @@ import com.google.common.base.Splitter; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -442,4 +443,78 @@ public boolean includes(TopicName otherTopicName) { public boolean isV2() { return cluster == null; } + + /** + * Convert a topic name to a full topic name. + * In Pulsar, a full topic name is ":////" (v2) or + * "://///" (v1). However, for convenient, it's allowed for clients + * to pass a short topic name with v2 format: + * - "", which represents "persistent://public/default/" + * - "//, which represents "persistent:////" + * + * @param topic the topic name from client + * @return the full topic name. + */ + public static String toFullTopicName(String topic) { + final int index = topic.indexOf("://"); + if (index >= 0) { + TopicDomain.getEnum(topic.substring(0, index)); + final List parts = splitBySlash(topic.substring(index + "://".length()), 4); + if (parts.size() != 3 && parts.size() != 4) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 3) { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2)); + if (StringUtils.isBlank(parts.get(3))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } + return topic; // it's a valid full topic name + } else { + List parts = splitBySlash(topic, 0); + if (parts.size() != 1 && parts.size() != 3) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 1) { + if (StringUtils.isBlank(parts.get(0))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://public/default/" + parts.get(0); + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://" + topic; + } + } + } + + private static List splitBySlash(String topic, int limit) { + final List tokens = new ArrayList<>(3); + final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1; + int beginIndex = 0; + for (int i = 0; i < loopCount; i++) { + final int endIndex = topic.indexOf('/', beginIndex); + if (endIndex < 0) { + tokens.add(topic.substring(beginIndex)); + return tokens; + } else if (endIndex > beginIndex) { + tokens.add(topic.substring(beginIndex, endIndex)); + } else { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + beginIndex = endIndex + 1; + } + if (beginIndex >= topic.length()) { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + tokens.add(topic.substring(beginIndex)); + return tokens; + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 27eb82d15af0d..bb4798fca4617 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import org.apache.pulsar.common.util.Codec; @@ -52,9 +53,8 @@ public void topic() { assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(), "persistent://tenant/cluster/namespace/topic"); - - assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"), - "persistent://tenant/cluster/namespace/topic"); + assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"), + "persistent://tenant/cluster/namespace/topic"); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(), TopicDomain.persistent); @@ -103,6 +103,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace:my-topic")); try { TopicName.get("://tenant.namespace"); @@ -110,6 +111,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("invalid://tenant/cluster/namespace/topic"); @@ -117,6 +119,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic")); try { TopicName.get("tenant/cluster/namespace/topic"); @@ -124,6 +128,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/namespace/topic")); try { TopicName.get("persistent:///cluster/namespace/mydest-1"); @@ -131,6 +136,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1")); try { TopicName.get("persistent://pulsar//namespace/mydest-1"); @@ -138,6 +145,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1")); try { TopicName.get("persistent://pulsar/cluster//mydest-1"); @@ -145,6 +154,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1")); try { TopicName.get("persistent://pulsar/cluster/namespace/"); @@ -152,6 +163,8 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/")); try { TopicName.get("://pulsar/cluster/namespace/"); @@ -159,6 +172,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://pulsar/cluster/namespace/")); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic") .getPersistenceNamingEncoding(), "tenant/cluster/namespace/persistent/topic"); @@ -169,6 +183,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("://tenant/cluster/namespace"); @@ -176,6 +191,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant//cluster/namespace")); try { TopicName.get(" "); @@ -183,6 +199,7 @@ public void topic() { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName(" ")); TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1"); assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1")); @@ -344,4 +361,15 @@ public void testTwoKeyWordPartition(){ assertNotEquals(tp2.toString(), tp1.toString()); assertEquals(tp2.toString(), "persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0"); } + + @Test + public void testToFullTopicName() { + // There is no constraint for local topic name + assertEquals("persistent://public/default/tp???xx=", TopicName.toFullTopicName("tp???xx=")); + assertEquals("persistent://tenant/ns/tp???xx=", TopicName.toFullTopicName("tenant/ns/tp???xx=")); + assertEquals("persistent://tenant/ns/test", TopicName.toFullTopicName("persistent://tenant/ns/test")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("ns/topic")); + // v1 format is not supported when the domain is not included + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/ns/topic")); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 75109883f9888..c38e2ba08c16c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -221,7 +221,7 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { - TopicName topicName = TopicName.get(partitionMetadata.getTopic()); + String topicName = TopicName.toFullTopicName(partitionMetadata.getTopic()); String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { @@ -235,7 +235,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, - topicName.getPartitionedTopicName(), clientRequestId); + topicName, clientRequestId); } proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { // Connected to backend broker @@ -245,7 +245,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par partitionMetadata.isMetadataAutoCreationEnabled()); clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { if (t != null) { - log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), + log.warn("[{}] failed to get Partitioned metadata : {}", topicName, t.getMessage(), t); PulsarClientException pce = PulsarClientException.unwrap(t); writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce), diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 22957c9599f18..3a98311eb15e7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -101,7 +101,8 @@ private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List messages = new ArrayList<>(); ByteBuf buffer = (ByteBuf) (msg); @@ -130,8 +131,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," - + ctx.channel().id())); + topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get( + cmd.getSend().getProducerId() + "," + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> { @@ -139,7 +140,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { msgBytes.add(message.getData().readableBytes()); }, maxMessageSize); // update topic stats - TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), + TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName, topic -> new TopicStats()); topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); logging(ctx.channel(), cmd.getType(), "", messages); @@ -158,8 +159,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() - + "," + peerChannelId)); + topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get( + cmd.getMessage().getConsumerId() + "," + peerChannelId)); + msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> {