From bc0fdfd9722506728f48f92ac13fa3c995141aa1 Mon Sep 17 00:00:00 2001 From: wangjialing Date: Fri, 28 Jan 2022 14:48:35 +0800 Subject: [PATCH] fix create topic fail when using advertised listener --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 10 +++++++++- .../pulsar/handlers/kop/KafkaListenerNameTest.java | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 08363ce92e..9aa495a922 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.regex.Matcher; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Getter; @@ -2592,7 +2593,14 @@ static Node newNode(InetSocketAddress address) { } Node newSelfNode() { - return newNode(advertisedEndPoint.getInetAddress()); + String advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners(); + String listener = EndPoint.findListener(advertisedListeners, advertisedEndPoint.getListenerName()); + if (listener == null) { + return newNode(advertisedEndPoint.getInetAddress()); + } + final Matcher matcher = EndPoint.matcherListener(listener, + listener + " cannot be split into 3 parts"); + return newNode(new InetSocketAddress(matcher.group(2), Integer.parseInt(matcher.group(3)))); } static PartitionMetadata newPartitionMetadata(TopicName topicName, Node node) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index c1ff9631a1..702a243216 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -108,6 +108,10 @@ public void testMetadataRequestForMultiListeners() throws Exception { Assert.assertEquals(brokers.get(0).host(), expectedAddress.getHostName()); Assert.assertEquals(brokers.get(0).port(), expectedAddress.getPort()); + Node controller = metadataResponse.controller(); + Assert.assertEquals(controller.host(), expectedAddress.getHostName()); + Assert.assertEquals(controller.port(), expectedAddress.getPort()); + final List topicMetadataList = new ArrayList<>(metadataResponse.topicMetadata()); Assert.assertEquals(topicMetadataList.size(), 1);