diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index d9df95154c..5920e55c02 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,16 +13,22 @@ */ package io.streamnative.pulsar.handlers.kop; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Collections; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import lombok.Getter; import lombok.NonNull; import lombok.Setter; @@ -57,6 +63,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @Category private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction"; + + private static final String END_POINT_SEPARATOR = ","; + private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; + private static final Pattern PATTERN = Pattern.compile(REGEX); // // --- Kafka on Pulsar Broker configuration --- // @@ -349,15 +359,45 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private Set kopAllowedNamespaces; + private String checkAdvertisedListeners(String advertisedListeners) { + StringBuilder listenersReBuilder = new StringBuilder(); + for (String listener : advertisedListeners.split(END_POINT_SEPARATOR)) { + final String errorMessage = "listener '" + listener + "' is invalid"; + final Matcher matcher = PATTERN.matcher(listener); + checkState(matcher.find(), errorMessage); + checkState(matcher.groupCount() == 3, errorMessage); + String hostname = matcher.group(2); + if (hostname.isEmpty()) { + try { + hostname = InetAddress.getLocalHost().getCanonicalHostName(); + listenersReBuilder.append(matcher.group(1)) + .append("://") + .append(hostname) + .append(":") + .append(matcher.group(3)); + } catch (UnknownHostException e) { + throw new IllegalStateException("hostname is empty and localhost is unknown: " + e.getMessage()); + } + } else { + listenersReBuilder.append(listener); + } + listenersReBuilder.append(END_POINT_SEPARATOR); + } + return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString(); + } + public @NonNull String getKafkaAdvertisedListeners() { + String advertisedListeners = getListeners(); + if (kafkaAdvertisedListeners != null) { - return kafkaAdvertisedListeners; - } else { - if (getListeners() == null) { - throw new IllegalStateException("listeners or kafkaListeners is required"); - } - return getListeners(); + advertisedListeners = kafkaAdvertisedListeners; } + + if (advertisedListeners == null) { + throw new IllegalStateException("listeners or kafkaListeners is required"); + } + + return checkAdvertisedListeners(advertisedListeners); } public String getListeners() { diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index 14b2131634..b9692844c1 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -26,6 +26,8 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.Properties; @@ -68,6 +70,16 @@ public void testKafkaListeners() { assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093"); } + @Test + public void testKafkaListenersWithoutHostname() throws UnknownHostException { + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setListeners("PLAINTEXT://:9092"); + assertEquals(configuration.getListeners(), "PLAINTEXT://:9092"); + String hostName = InetAddress.getLocalHost().getCanonicalHostName(); + String expectListeners = "PLAINTEXT://" + hostName + ":9092"; + assertEquals(configuration.getKafkaAdvertisedListeners(), expectListeners); + } + @Test public void testGroupIdZooKeeperPath() { String zkPathForKop = "/consumer_group_test";