From 3d0a2adfb4d2e7749c0f16be0cfef7c9f8d930d3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Feb 2022 11:21:01 +0800 Subject: [PATCH 1/2] [branch-2.8.2] Fix CreateTopics failure when kafkaAdvertisedListeners is different with kafkaListeners --- .../pulsar/handlers/kop/EndPoint.java | 24 +++++ .../handlers/kop/KafkaRequestHandler.java | 10 +- .../kop/KafkaServiceConfiguration.java | 4 +- .../pulsar/handlers/kop/KafkaApisTest.java | 7 +- .../handlers/kop/KafkaListenerNameTest.java | 91 +++++++++++++++++++ 5 files changed, 132 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java index 3118f6d147..daa2952a7a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java @@ -110,11 +110,35 @@ private static Map parseListeners(final String listeners, return endPointMap; } + // listeners must be enable to be split into at least 1 token + private static String[] getListenerArray(final String listeners) { + if (StringUtils.isEmpty(listeners)) { + throw new IllegalStateException("listeners is empty"); + } + final String[] listenerArray = listeners.split(END_POINT_SEPARATOR); + if (listenerArray.length == 0) { + throw new IllegalStateException(listeners + " is split into 0 tokens by " + END_POINT_SEPARATOR); + } + return listenerArray; + } + @VisibleForTesting public static Map parseListeners(final String listeners, final String protocolMapString) { return parseListeners(listeners, parseProtocolMap(protocolMapString)); } + public static String findListener(final String listeners, final String name) { + if (name == null) { + return null; + } + for (String listener : getListenerArray(listeners)) { + if (listener.contains(":") && listener.substring(0, listener.indexOf(":")).equals(name)) { + return listener; + } + } + throw new IllegalStateException("listener \"" + name + "\" doesn't exist in " + listeners); + } + public static EndPoint getPlainTextEndPoint(final String listeners) { for (String listener : listeners.split(END_POINT_SEPARATOR)) { if (listener.startsWith(SecurityProtocol.PLAINTEXT.name()) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 58d948deb3..a96fb71f0c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.regex.Matcher; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Getter; @@ -2547,7 +2548,14 @@ static Node newNode(InetSocketAddress address) { } Node newSelfNode() { - return newNode(advertisedEndPoint.getInetAddress()); + String advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners(); + String listener = EndPoint.findListener(advertisedListeners, advertisedEndPoint.getListenerName()); + if (listener == null) { + return newNode(advertisedEndPoint.getInetAddress()); + } + final Matcher matcher = EndPoint.matcherListener(listener, + listener + " cannot be split into 3 parts"); + return newNode(new InetSocketAddress(matcher.group(2), Integer.parseInt(matcher.group(3)))); } static PartitionMetadata newPartitionMetadata(TopicName topicName, Node node) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index d02ec52645..6cea1d7d4f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -212,10 +212,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private String kafkaProtocolMap; - @Deprecated @FieldContext( category = CATEGORY_KOP, - doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead." + doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress if you want to use multiple listeners.\n" + + "Otherwise, it should be the listeners published to ZooKeeper for clients to use.\n" ) private String kafkaAdvertisedListeners; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 01d80ffd94..97c9282d43 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -148,7 +148,12 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { + private KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { + return buildRequest(builder, serviceAddress); + } + + static KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress) { AbstractRequest request = builder.build(); builder.apiKey(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index 5d3b75fa66..d33d0ea18a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -13,7 +13,21 @@ */ package io.streamnative.pulsar.handlers.kop; +import static io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import static org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -22,6 +36,11 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.testng.Assert; @@ -44,6 +63,78 @@ protected void cleanup() throws Exception { // Clean up in the test method } + @Test(timeOut = 30000) + public void testMetadataRequestForMultiListeners() throws Exception { + final Map bindPortToAdvertisedAddress = new HashMap<>(); + final int anotherKafkaPort = PortManager.nextFreePort(); + bindPortToAdvertisedAddress.put(kafkaBrokerPort, + InetSocketAddress.createUnresolved("192.168.0.1", PortManager.nextFreePort())); + bindPortToAdvertisedAddress.put(anotherKafkaPort, + InetSocketAddress.createUnresolved("192.168.0.2", PortManager.nextFreePort())); + + super.resetConfig(); + conf.setKafkaListeners("PLAINTEXT://0.0.0.0:" + kafkaBrokerPort + ",GW://0.0.0.0:" + anotherKafkaPort); + conf.setKafkaProtocolMap("PLAINTEXT:PLAINTEXT,GW:PLAINTEXT"); + conf.setKafkaAdvertisedListeners(String.format("PLAINTEXT://%s,GW://%s", + bindPortToAdvertisedAddress.get(kafkaBrokerPort), + bindPortToAdvertisedAddress.get(anotherKafkaPort))); + conf.setAdvertisedAddress(null); + conf.setAdvertisedListeners(String.format("pulsar:pulsar://192.168.0.3:%d,PLAINTEXT:pulsar://%s,GW:pulsar://%s", + brokerPort, + bindPortToAdvertisedAddress.get(kafkaBrokerPort), + bindPortToAdvertisedAddress.get(anotherKafkaPort))); + super.internalSetup(); + + final String topic = "persistent://public/default/test-metadata-request-for-multi-listeners"; + final int numPartitions = 3; + admin.topics().createPartitionedTopic(topic, numPartitions); + + final KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol(KafkaProtocolHandler.PROTOCOL_NAME); + protocolHandler.getChannelInitializerMap().forEach((inetSocketAddress, channelInitializer) -> { + try { + final KafkaRequestHandler requestHandler = ((KafkaChannelInitializer) channelInitializer).newCnx(); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + doReturn(mock(Channel.class)).when(mockCtx).channel(); + requestHandler.ctx = mockCtx; + + final InetSocketAddress expectedAddress = bindPortToAdvertisedAddress.get(inetSocketAddress.getPort()); + + final KafkaHeaderAndRequest metadataRequest = KafkaApisTest.buildRequest( + new MetadataRequest.Builder(Collections.singletonList(topic), true), + inetSocketAddress); + final CompletableFuture future = new CompletableFuture<>(); + requestHandler.handleTopicMetadataRequest(metadataRequest, future); + final MetadataResponse metadataResponse = (MetadataResponse) future.get(); + final List brokers = new ArrayList<>(metadataResponse.brokers()); + Assert.assertEquals(brokers.get(0).host(), expectedAddress.getHostName()); + Assert.assertEquals(brokers.get(0).port(), expectedAddress.getPort()); + + Node controller = metadataResponse.controller(); + Assert.assertEquals(controller.host(), expectedAddress.getHostName()); + Assert.assertEquals(controller.port(), expectedAddress.getPort()); + + final List topicMetadataList = + new ArrayList<>(metadataResponse.topicMetadata()); + Assert.assertEquals(topicMetadataList.size(), 1); + Assert.assertEquals(topicMetadataList.get(0).topic(), topic); + + final List partitionMetadataList = topicMetadataList.get(0).partitionMetadata(); + Assert.assertEquals(partitionMetadataList.size(), numPartitions); + for (int i = 0; i < numPartitions; i++) { + final PartitionMetadata partitionMetadata = partitionMetadataList.get(i); + Assert.assertEquals(partitionMetadata.error(), Errors.NONE); + Assert.assertEquals(partitionMetadata.leader().host(), expectedAddress.getHostName()); + Assert.assertEquals(partitionMetadata.leader().port(), expectedAddress.getPort()); + } + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + }); + + super.internalCleanup(); + } + @Test(timeOut = 30000) public void testListenerName() throws Exception { super.resetConfig(); From 47194e7b9aed28e72c5b278ed358da9eefb956f3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Feb 2022 15:11:13 +0800 Subject: [PATCH 2/2] Cache the self node --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index a96fb71f0c..e47f7972fd 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -198,6 +198,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final Boolean tlsEnabled; private final EndPoint advertisedEndPoint; private final String advertisedListeners; + private final Node selfNode; private final boolean skipMessagesWithoutIndex; private final int defaultNumPartitions; public final int maxReadEntriesNum; @@ -311,6 +312,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.tlsEnabled = tlsEnabled; this.advertisedEndPoint = advertisedEndPoint; this.advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners(); + this.selfNode = newSelfNode(); this.skipMessagesWithoutIndex = skipMessagesWithoutIndex; this.topicManager = new KafkaTopicManager(this); this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); @@ -768,7 +770,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar, AtomicInteger topicsCompleted = new AtomicInteger(0); // Each Pulsar broker can manage metadata like controller in Kafka, Kafka's AdminClient needs to find a // controller node for metadata management. So here we return the broker itself as a controller. - final int controllerId = newSelfNode().id(); + final int controllerId = selfNode.id(); pulsarTopicsFuture.whenComplete((pulsarTopics, e) -> { if (e != null) { log.warn("[{}] Request {}: Exception fetching metadata, will return null Response", @@ -2547,7 +2549,7 @@ static Node newNode(InetSocketAddress address) { address.getPort()); } - Node newSelfNode() { + private Node newSelfNode() { String advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners(); String listener = EndPoint.findListener(advertisedListeners, advertisedEndPoint.getListenerName()); if (listener == null) {