From ce118bd96e21d8778c632fe2ddeeeb489c4177ff Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Thu, 12 Aug 2021 14:50:39 +0800 Subject: [PATCH 1/2] Support pulsar to obtain the correct unconfigured hostname listeners of KafkaProtocolHandler. Pulsar will access the getProtocolDataToAdvertise of the protocol handler when constructing LocalBrokerData. If kafkaListeners is not configured with a host name, the protocols of Pulsar LocalBrokerData will not have the host name information of the kop protocol, which will cause kop to return the wrong partition leader. --- .../kop/KafkaServiceConfiguration.java | 49 ++++++++++++++++--- .../kop/KafkaServiceConfigurationTest.java | 12 +++++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index d9df95154c..99ce7a71cf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -19,18 +19,26 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Collections; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import lombok.Getter; import lombok.NonNull; import lombok.Setter; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.configuration.Category; import org.apache.pulsar.common.configuration.FieldContext; +import static com.google.common.base.Preconditions.checkState; + /** * Kafka on Pulsar service configuration object. */ @@ -57,6 +65,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @Category private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction"; + + private static final String END_POINT_SEPARATOR = ","; + private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; + private static final Pattern PATTERN = Pattern.compile(REGEX); // // --- Kafka on Pulsar Broker configuration --- // @@ -349,15 +361,40 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private Set kopAllowedNamespaces; + private String checkAdvertisedListeners(String advertisedListeners) { + StringBuilder listenersReBuilder = new StringBuilder(); + for (String listener : advertisedListeners.split(END_POINT_SEPARATOR)) { + final String errorMessage = "listener '" + listener + "' is invalid"; + final Matcher matcher = PATTERN.matcher(listener); + checkState(matcher.find(), errorMessage); + checkState(matcher.groupCount() == 3, errorMessage); + String hostname = matcher.group(2); + if (hostname.isEmpty()) { + try { + hostname = InetAddress.getLocalHost().getCanonicalHostName(); + listenersReBuilder.append(matcher.group(1)).append("://").append(hostname).append(":").append(matcher.group(3)); + } catch (UnknownHostException e) { + throw new IllegalStateException("hostname is empty and localhost is unknown: " + e.getMessage()); + } + } else + listenersReBuilder.append(listener); + listenersReBuilder.append(END_POINT_SEPARATOR); + } + return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString(); + } + public @NonNull String getKafkaAdvertisedListeners() { + String advertisedListeners = getListeners(); + if (kafkaAdvertisedListeners != null) { - return kafkaAdvertisedListeners; - } else { - if (getListeners() == null) { - throw new IllegalStateException("listeners or kafkaListeners is required"); - } - return getListeners(); + advertisedListeners = kafkaAdvertisedListeners; } + + if (advertisedListeners == null) { + throw new IllegalStateException("listeners or kafkaListeners is required"); + } + + return checkAdvertisedListeners(advertisedListeners); } public String getListeners() { diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index 14b2131634..b9692844c1 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -26,6 +26,8 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.Properties; @@ -68,6 +70,16 @@ public void testKafkaListeners() { assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093"); } + @Test + public void testKafkaListenersWithoutHostname() throws UnknownHostException { + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setListeners("PLAINTEXT://:9092"); + assertEquals(configuration.getListeners(), "PLAINTEXT://:9092"); + String hostName = InetAddress.getLocalHost().getCanonicalHostName(); + String expectListeners = "PLAINTEXT://" + hostName + ":9092"; + assertEquals(configuration.getKafkaAdvertisedListeners(), expectListeners); + } + @Test public void testGroupIdZooKeeperPath() { String zkPathForKop = "/consumer_group_test"; From 3b73caf2c0012568ddc3ced42623a3e2556ea005 Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Thu, 12 Aug 2021 16:30:28 +0800 Subject: [PATCH 2/2] fix code style --- .../handlers/kop/KafkaServiceConfiguration.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 99ce7a71cf..5920e55c02 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; @@ -27,18 +29,14 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; - import lombok.Getter; import lombok.NonNull; import lombok.Setter; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.configuration.Category; import org.apache.pulsar.common.configuration.FieldContext; -import static com.google.common.base.Preconditions.checkState; - /** * Kafka on Pulsar service configuration object. */ @@ -372,12 +370,17 @@ private String checkAdvertisedListeners(String advertisedListeners) { if (hostname.isEmpty()) { try { hostname = InetAddress.getLocalHost().getCanonicalHostName(); - listenersReBuilder.append(matcher.group(1)).append("://").append(hostname).append(":").append(matcher.group(3)); + listenersReBuilder.append(matcher.group(1)) + .append("://") + .append(hostname) + .append(":") + .append(matcher.group(3)); } catch (UnknownHostException e) { throw new IllegalStateException("hostname is empty and localhost is unknown: " + e.getMessage()); } - } else + } else { listenersReBuilder.append(listener); + } listenersReBuilder.append(END_POINT_SEPARATOR); } return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString();