diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java index 865c799339..dfd978ce36 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java @@ -31,26 +31,43 @@ public class EndPoint { private static final String END_POINT_SEPARATOR = ","; + private static final String PROTOCOL_MAP_SEPARATOR = ","; + private static final String PROTOCOL_SEPARATOR = ":"; private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; private static final Pattern PATTERN = Pattern.compile(REGEX); @Getter private final String originalListener; @Getter + private final String listenerName; + @Getter private final SecurityProtocol securityProtocol; @Getter private final String hostname; @Getter private final int port; + @Getter + private final boolean multiListener; - public EndPoint(final String listener) { + public EndPoint(final String listener, final Map protocolMap) { this.originalListener = listener; final String errorMessage = "listener '" + listener + "' is invalid"; final Matcher matcher = PATTERN.matcher(listener); checkState(matcher.find(), errorMessage); checkState(matcher.groupCount() == 3, errorMessage); - this.securityProtocol = SecurityProtocol.forName(matcher.group(1)); + this.listenerName = matcher.group(1); + if (protocolMap == null || protocolMap.isEmpty()) { + multiListener = false; + this.securityProtocol = SecurityProtocol.forName(matcher.group(1)); + } else { + multiListener = true; + this.securityProtocol = protocolMap.get(this.listenerName); + if (this.securityProtocol == null) { + throw new IllegalStateException(this.listenerName + " is not set in kafkaProtocolMap"); + } + } + final String originalHostname = matcher.group(2); if (originalHostname.isEmpty()) { try { @@ -65,19 +82,26 @@ public EndPoint(final String listener) { checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid"); } + + public InetSocketAddress getInetAddress() { return new InetSocketAddress(hostname, port); } - public static Map parseListeners(final String listeners) { - final Map endPointMap = new HashMap<>(); + public static Map parseListeners(final String listeners) { + return parseListeners(listeners, null); + } + + public static Map parseListeners(final String listeners, final String kafkaProtocolMap) { + final Map endPointMap = new HashMap<>(); + final Map protocolMap = parseProtocolMap(kafkaProtocolMap); for (String listener : listeners.split(END_POINT_SEPARATOR)) { - final EndPoint endPoint = new EndPoint(listener); - if (endPointMap.containsKey(endPoint.securityProtocol)) { + final EndPoint endPoint = new EndPoint(listener, protocolMap); + if (endPointMap.containsKey(endPoint.listenerName)) { throw new IllegalStateException( - listeners + " has multiple listeners whose protocol is " + endPoint.securityProtocol); + listeners + " has multiple listeners whose listenerName is " + endPoint.listenerName); } else { - endPointMap.put(endPoint.securityProtocol, endPoint); + endPointMap.put(endPoint.listenerName, endPoint); } } return endPointMap; @@ -87,7 +111,7 @@ public static EndPoint getPlainTextEndPoint(final String listeners) { for (String listener : listeners.split(END_POINT_SEPARATOR)) { if (listener.startsWith(SecurityProtocol.PLAINTEXT.name()) || listener.startsWith(SecurityProtocol.SASL_PLAINTEXT.name())) { - return new EndPoint(listener); + return new EndPoint(listener, null); } } throw new IllegalStateException(listeners + " has no plain text endpoint"); @@ -97,9 +121,31 @@ public static EndPoint getSslEndPoint(final String listeners) { for (String listener : listeners.split(END_POINT_SEPARATOR)) { if (listener.startsWith(SecurityProtocol.SSL.name()) || listener.startsWith(SecurityProtocol.SASL_SSL.name())) { - return new EndPoint(listener); + return new EndPoint(listener, null); } } throw new IllegalStateException(listeners + " has no ssl endpoint"); } + + public static Map parseProtocolMap(final String kafkaProtocolMap) { + + final Map protocolMap = new HashMap<>(); + if (kafkaProtocolMap == null) { + return protocolMap; + } + + for (String protocolSet : kafkaProtocolMap.split(PROTOCOL_MAP_SEPARATOR)) { + String[] protocol = protocolSet.split(PROTOCOL_SEPARATOR); + if (protocol.length != 2) { + throw new IllegalStateException( + "wrong format for kafkaProtocolMap " + kafkaProtocolMap); + } + if (protocolMap.containsKey(protocol[0])) { + throw new IllegalStateException( + kafkaProtocolMap + " has multiple listeners whose listenerName is " + protocol[0]); + } + protocolMap.put(protocol[0], SecurityProtocol.forName(protocol[1])); + } + return protocolMap; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index a9ccc47471..527bd81da8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -51,7 +51,6 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -312,7 +311,8 @@ public String getProtocolDataToAdvertise() { public void start(BrokerService service) { brokerService = service; kopBrokerLookupManager = new KopBrokerLookupManager( - brokerService.getPulsar(), false, kafkaConfig.getKafkaAdvertisedListeners()); + brokerService.getPulsar(), false, + kafkaConfig.getKafkaAdvertisedListeners()); log.info("Starting KafkaProtocolHandler, kop version is: '{}'", KopVersion.getVersion()); log.info("Git Revision {}", KopVersion.getGitSha()); @@ -440,29 +440,24 @@ public Map> newChannelIniti try { ImmutableMap.Builder> builder = - ImmutableMap.>builder(); - - final Map advertisedEndpointMap = - EndPoint.parseListeners(kafkaConfig.getKafkaAdvertisedListeners()); - EndPoint.parseListeners(kafkaConfig.getListeners()).forEach((protocol, endPoint) -> { - EndPoint advertisedEndPoint = advertisedEndpointMap.get(protocol); - if (advertisedEndPoint == null) { - // Use the bind endpoint as the advertised endpoint. - advertisedEndPoint = endPoint; - } - switch (protocol) { + ImmutableMap.>builder(); + + EndPoint.parseListeners(kafkaConfig.getListeners(), kafkaConfig.getKafkaProtocolMap()). + forEach((listener, endPoint) -> { + switch (endPoint.getSecurityProtocol()) { case PLAINTEXT: case SASL_PLAINTEXT: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), kafkaConfig, this, adminManager, false, - advertisedEndPoint, scopeStatsLogger, localBrokerDataCache)); + endPoint, scopeStatsLogger, localBrokerDataCache)); break; case SSL: case SASL_SSL: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), kafkaConfig, this, adminManager, true, - advertisedEndPoint, scopeStatsLogger, localBrokerDataCache)); + endPoint, scopeStatsLogger, localBrokerDataCache)); break; + default: } }); return builder.build(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 7b9bd22a38..8d36e7dbcb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -157,7 +157,6 @@ import org.apache.kafka.common.requests.TxnOffsetCommitResponse; import org.apache.kafka.common.requests.WriteTxnMarkersRequest; import org.apache.kafka.common.requests.WriteTxnMarkersResponse; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -179,7 +178,6 @@ import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; -import org.eclipse.jetty.util.StringUtil; /** * This class contains all the request handling methods. @@ -2188,11 +2186,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { return future; } - // if kafkaListenerName is set, the lookup result is the advertised address - if (!StringUtil.isBlank(kafkaConfig.getKafkaListenerName())) { - // TODO: should add SecurityProtocol according to which endpoint is handling the request. - // firstly we only support PLAINTEXT when lookup with kafkaListenerName - String kafkaAdvertisedAddress = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(), + if (advertisedEndPoint.isMultiListener()) { + // if kafkaProtocolMap is set, the lookup result is the advertised address + String kafkaAdvertisedAddress = String.format("%s://%s:%s", advertisedEndPoint.getSecurityProtocol().name, pulsarAddress.getHostName(), pulsarAddress.getPort()); KafkaTopicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture); returnFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress)); @@ -2285,7 +2281,8 @@ public CompletableFuture findBroker(TopicName topic) { } CompletableFuture returnFuture = new CompletableFuture<>(); - topicManager.getTopicBroker(topic.toString()) + topicManager.getTopicBroker(topic.toString(), + advertisedEndPoint.isMultiListener() ? advertisedEndPoint.getListenerName() : null) .thenApply(address -> getProtocolDataToAdvertise(address, topic)) .thenAccept(kopAddressFuture -> kopAddressFuture.thenAccept(listenersOptional -> { if (!listenersOptional.isPresent()) { @@ -2298,8 +2295,8 @@ public CompletableFuture findBroker(TopicName topic) { // It's the `kafkaAdvertisedListeners` config that's written to ZK final String listeners = listenersOptional.get(); final EndPoint endPoint = - (tlsEnabled ? EndPoint.getSslEndPoint(listeners) - : EndPoint.getPlainTextEndPoint(listeners)); + (tlsEnabled ? EndPoint.getSslEndPoint(listeners) : + EndPoint.getPlainTextEndPoint(listeners)); final Node node = newNode(endPoint.getInetAddress()); if (log.isDebugEnabled()) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 2b05872993..174ba15bd2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -189,17 +189,23 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private String kafkaListeners; @FieldContext( - category = CATEGORY_KOP, - doc = "Listeners to publish to ZooKeeper for clients to use.\n" - + "The format is the same as `kafkaListeners`.\n" + category = CATEGORY_KOP, + doc = "Comma-separated map of listener name and protocol.\n" + + "e.g. PRIVATE:PLAINTEXT,PRIVATE_SSL:SSL,PUBLIC:PLAINTEXT,PUBLIC_SSL:SSL.\n" + ) + private String kafkaProtocolMap; + + @Deprecated + @FieldContext( + category = CATEGORY_KOP, + doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead." ) private String kafkaAdvertisedListeners; + @Deprecated @FieldContext( category = CATEGORY_KOP, - doc = "Specify the internal listener name for the broker.\n" - + "The listener name must be contained in the advertisedListeners.\n" - + "This config is used as the listener name in topic lookup." + doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead." ) private String kafkaListenerName; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 8f88556b19..df8c4db836 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -67,7 +67,7 @@ public class KafkaTopicManager { private final AtomicBoolean closed = new AtomicBoolean(false); - public static final ConcurrentHashMap> + public static final ConcurrentHashMap>> LOOKUP_CACHE = new ConcurrentHashMap<>(); public static final ConcurrentHashMap>> @@ -178,7 +178,7 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) { // call pulsarclient.lookup.getbroker to get and // own a topic. // // when error happens, the returned future will complete with null. - public CompletableFuture getTopicBroker(String topicName) { + public CompletableFuture getTopicBroker(String topicName, String listenerName) { if (closed.get()) { if (log.isDebugEnabled()) { log.debug("[{}] Return null for getTopicBroker({}) since channel closing", @@ -186,16 +186,28 @@ public CompletableFuture getTopicBroker(String topicName) { } return CompletableFuture.completedFuture(null); } - return LOOKUP_CACHE.computeIfAbsent(topicName, t -> { + + ConcurrentHashMap> topicLookupCache = + LOOKUP_CACHE.computeIfAbsent(topicName, t-> { + if (log.isDebugEnabled()) { + log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker", + requestHandler.ctx.channel(), topicName); + } + ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + cache.put(listenerName == null ? "" : listenerName, lookupBroker(topicName, listenerName)); + return cache; + }); + + return topicLookupCache.computeIfAbsent(listenerName == null ? "" : listenerName, t-> { if (log.isDebugEnabled()) { log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker", - requestHandler.ctx.channel(), topicName); + requestHandler.ctx.channel(), topicName); } - return lookupBroker(topicName); + return lookupBroker(topicName, listenerName); }); } - private CompletableFuture lookupBroker(final String topic) { + private CompletableFuture lookupBroker(final String topic, String listenerName) { if (closed.get()) { if (log.isDebugEnabled()) { log.debug("[{}] Return null for getTopic({}) since channel closing", @@ -203,7 +215,7 @@ private CompletableFuture lookupBroker(final String topic) { } return CompletableFuture.completedFuture(null); } - return lookupClient.getBrokerAddress(TopicName.get(topic)); + return lookupClient.getBrokerAddress(TopicName.get(topic), listenerName); } // A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java index 0717050bcc..4440249499 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java @@ -52,7 +52,8 @@ public class KopBrokerLookupManager { public static final ConcurrentHashMap>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>(); - public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled, String advertisedListeners) { + public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled, + String advertisedListeners) { this.pulsarService = pulsarService; this.tlsEnabled = tlsEnabled; this.advertisedListeners = advertisedListeners; @@ -82,8 +83,8 @@ public CompletableFuture findBroker(String topic) { // It's the `kafkaAdvertisedListeners` config that's written to ZK final EndPoint endPoint = - tlsEnabled ? EndPoint.getSslEndPoint(listeners.get()) - : EndPoint.getPlainTextEndPoint(listeners.get()); + (tlsEnabled ? EndPoint.getSslEndPoint(listeners.get()) : + EndPoint.getPlainTextEndPoint(listeners.get())); // here we found topic broker: broker2, but this is in broker1, // how to clean the lookup cache? diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java index 3ece7c9239..358d1a0195 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java @@ -19,10 +19,13 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.lookup.LookupResult; @@ -46,10 +49,13 @@ public class LookupClient implements Closeable { @Getter private final PulsarClientImpl pulsarClient; + private ConcurrentHashMap pulsarClientMap; + public LookupClient(final PulsarService pulsarService, final KafkaServiceConfiguration kafkaConfig) { namespaceService = pulsarService.getNamespaceService(); try { - pulsarClient = createPulsarClient(pulsarService, kafkaConfig); + pulsarClient = createPulsarClient(pulsarService, kafkaConfig, null); + pulsarClientMap = createPulsarClientMap(pulsarService, kafkaConfig); } catch (PulsarClientException e) { log.error("Failed to create PulsarClient", e); throw new IllegalStateException(e); @@ -69,10 +75,14 @@ public LookupClient(final PulsarService pulsarService) { } public CompletableFuture getBrokerAddress(final TopicName topicName) { + return getBrokerAddress(topicName, null); + } + + public CompletableFuture getBrokerAddress(final TopicName topicName, String listenerName) { // First try to use NamespaceService to find the broker directly. final LookupOptions options = LookupOptions.builder() .authoritative(false) - .advertisedListenerName(pulsarClient.getConfiguration().getListenerName()) + .advertisedListenerName(listenerName) .loadTopicsInBundle(true) .build(); return namespaceService.getBrokerServiceUrlAsync(topicName, options).thenCompose(optLookupResult -> { @@ -87,7 +97,8 @@ public CompletableFuture getBrokerAddress(final TopicName top final LookupResult lookupResult = optLookupResult.get(); if (lookupResult.isRedirect()) { // Kafka client can't process redirect field, so here we fallback to PulsarClient - return pulsarClient.getLookup().getBroker(topicName).thenApply(Pair::getLeft); + return pulsarClientMap.getOrDefault(listenerName == null ? "" : listenerName, pulsarClient). + getLookup().getBroker(topicName).thenApply(Pair::getLeft); } else { return getAddressFutureFromBrokerUrl(lookupResult.getLookupData().getBrokerUrl()); } @@ -103,9 +114,23 @@ public void close() { } } - private static PulsarClientImpl createPulsarClient(final PulsarService pulsarService, - final KafkaServiceConfiguration kafkaConfig) - throws PulsarClientException { + private ConcurrentHashMap createPulsarClientMap( + PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig) throws PulsarClientException { + ConcurrentHashMap pulsarClientMap = new ConcurrentHashMap<>(); + final Map protocolMap = EndPoint.parseProtocolMap(kafkaConfig.getKafkaProtocolMap()); + if (protocolMap.isEmpty()) { + pulsarClientMap.put("", pulsarClient); + } else { + for (Map.Entry entry : protocolMap.entrySet()) { + pulsarClientMap.put(entry.getKey(), createPulsarClient(pulsarService, kafkaConfig, entry.getKey())); + } + } + return pulsarClientMap; + } + + private static PulsarClientImpl createPulsarClient( + final PulsarService pulsarService, final KafkaServiceConfiguration kafkaConfig, + final String listenerName) throws PulsarClientException { // It's migrated from PulsarService#getClient() but it can configure listener name final ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(kafkaConfig.isTlsEnabled() @@ -137,7 +162,7 @@ private static PulsarClientImpl createPulsarClient(final PulsarService pulsarSer kafkaConfig.getBrokerClientAuthenticationParameters())); } - conf.setListenerName(kafkaConfig.getKafkaListenerName()); + conf.setListenerName(listenerName); return new PulsarClientImpl(conf, pulsarService.getIoEventLoopGroup()); } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EndPointTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EndPointTest.java index 4e4034e1d8..fd95e7f4a7 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EndPointTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EndPointTest.java @@ -30,13 +30,13 @@ public class EndPointTest { @Test public void testValidListener() throws Exception { - EndPoint endPoint = new EndPoint("PLAINTEXT://192.168.0.1:9092"); + EndPoint endPoint = new EndPoint("PLAINTEXT://192.168.0.1:9092", null); assertEquals(endPoint.getSecurityProtocol(), SecurityProtocol.PLAINTEXT); assertEquals(endPoint.getHostname(), "192.168.0.1"); assertEquals(endPoint.getPort(), 9092); final String localhost = InetAddress.getLocalHost().getCanonicalHostName(); - endPoint = new EndPoint("SSL://:9094"); + endPoint = new EndPoint("SSL://:9094", null); assertEquals(endPoint.getSecurityProtocol(), SecurityProtocol.SSL); assertEquals(endPoint.getHostname(), localhost); assertEquals(endPoint.getPort(), 9094); @@ -45,24 +45,24 @@ public void testValidListener() throws Exception { @Test public void testInvalidListener() throws Exception { try { - new EndPoint("hello world"); + new EndPoint("hello world", null); } catch (IllegalStateException e) { assertTrue(e.getMessage().contains("listener 'hello world' is invalid")); } try { - new EndPoint("pulsar://localhost:6650"); + new EndPoint("pulsar://localhost:6650", null); fail(); } catch (IllegalArgumentException e) { assertTrue(e.getMessage().contains("No enum constant")); } try { - new EndPoint("PLAINTEXT://localhost:65536"); + new EndPoint("PLAINTEXT://localhost:65536", null); fail(); } catch (IllegalStateException e) { assertTrue(e.getMessage().contains("port 65536 is invalid")); } try { - new EndPoint("PLAINTEXT://localhost:-1"); + new EndPoint("PLAINTEXT://localhost:-1", null); fail(); } catch (IllegalStateException e) { assertTrue(e.getMessage().contains("port -1 is invalid")); @@ -71,17 +71,17 @@ public void testInvalidListener() throws Exception { @Test public void testValidListeners() throws Exception { - final Map endPointMap = + final Map endPointMap = EndPoint.parseListeners("PLAINTEXT://localhost:9092,SSL://:9093"); assertEquals(endPointMap.size(), 2); - final EndPoint plainEndPoint = endPointMap.get(SecurityProtocol.PLAINTEXT); + final EndPoint plainEndPoint = endPointMap.get("PLAINTEXT"); assertNotNull(plainEndPoint); assertEquals(plainEndPoint.getSecurityProtocol(), SecurityProtocol.PLAINTEXT); assertEquals(plainEndPoint.getHostname(), "localhost"); assertEquals(plainEndPoint.getPort(), 9092); - final EndPoint sslEndPoint = endPointMap.get(SecurityProtocol.SSL); + final EndPoint sslEndPoint = endPointMap.get("SSL"); final String localhost = InetAddress.getLocalHost().getCanonicalHostName(); assertNotNull(sslEndPoint); assertEquals(sslEndPoint.getSecurityProtocol(), SecurityProtocol.SSL); @@ -89,13 +89,65 @@ public void testValidListeners() throws Exception { assertEquals(sslEndPoint.getPort(), 9093); } + @Test + public void testValidMultiListeners() throws Exception { + final String kafkaProtocolMap = "internal:PLAINTEXT,internal_ssl:SSL,external:PLAINTEXT,external_ssl:SSL"; + final String kafkaListeners = "internal://localhost:9092,internal_ssl://localhost:9093," + + "external://externalhost:9094,external_ssl://externalhost:9095"; + final Map endPointMap = EndPoint.parseListeners(kafkaListeners, kafkaProtocolMap); + assertEquals(endPointMap.size(), 4); + + final EndPoint internal = endPointMap.get("internal"); + assertNotNull(internal); + assertEquals(internal.getSecurityProtocol(), SecurityProtocol.PLAINTEXT); + assertEquals(internal.getHostname(), "localhost"); + assertEquals(internal.getPort(), 9092); + + final EndPoint internalSSL = endPointMap.get("internal_ssl"); + assertNotNull(internalSSL); + assertEquals(internalSSL.getSecurityProtocol(), SecurityProtocol.SSL); + assertEquals(internalSSL.getHostname(), "localhost"); + assertEquals(internalSSL.getPort(), 9093); + + final EndPoint external = endPointMap.get("external"); + assertNotNull(external); + assertEquals(external.getSecurityProtocol(), SecurityProtocol.PLAINTEXT); + assertEquals(external.getHostname(), "externalhost"); + assertEquals(external.getPort(), 9094); + + final EndPoint externalSSL = endPointMap.get("external_ssl"); + assertNotNull(externalSSL); + assertEquals(externalSSL.getSecurityProtocol(), SecurityProtocol.SSL); + assertEquals(externalSSL.getHostname(), "externalhost"); + assertEquals(externalSSL.getPort(), 9095); + } + + @Test + public void testInvalidMultiListeners() throws Exception { + try { + EndPoint.parseListeners("internal://localhost:9092,internal_ssl://localhost:9093", + "internal:PLAINTEXT,internal:SSL,internal_ssl:SSL"); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains(" has multiple listeners whose listenerName is internal")); + } + + try { + EndPoint.parseListeners("internal://localhost:9092,internal_ssl://localhost:9093", + "internal:PLAINTEXT,external:SSL"); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("internal_ssl is not set in kafkaProtocolMap")); + } + } + @Test public void testRepeatedListeners() throws Exception { try { EndPoint.parseListeners("PLAINTEXT://localhost:9092,SSL://:9093,SSL://localhost:9094"); fail(); } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains(" has multiple listeners whose protocol is SSL")); + assertTrue(e.getMessage().contains(" has multiple listeners whose listenerName is SSL")); } } @@ -124,7 +176,7 @@ public void testGetEndPoint() { @Test public void testOriginalUrl() throws Exception { - final EndPoint endPoint = new EndPoint("PLAINTEXT://:9092"); + final EndPoint endPoint = new EndPoint("PLAINTEXT://:9092", null); assertEquals(endPoint.getHostname(), InetAddress.getLocalHost().getCanonicalHostName()); assertEquals(endPoint.getOriginalListener(), "PLAINTEXT://:9092"); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index 1456d742d6..834f22a41e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -13,12 +13,19 @@ */ package io.streamnative.pulsar.handlers.kop; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.testng.annotations.Test; + /** * Test for kafkaListenerName config. */ @@ -49,16 +56,51 @@ public void testListenerName() throws Exception { log.info("Set advertisedListeners to {}", advertisedListeners); super.internalSetup(); - final KafkaProducer producer = new KafkaProducer<>(newKafkaProducerProperties()); + kafkaProducerSend("localhost:" + kafkaBrokerPort); + + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testMultipleListenerName() throws Exception { + super.resetConfig(); + conf.setAdvertisedAddress(null); + final String localAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null); + conf.setInternalListenerName("pulsar"); + final String kafkaProtocolMap = "kafka:PLAINTEXT,kafka_external:PLAINTEXT"; + conf.setKafkaProtocolMap(kafkaProtocolMap); + int externalPort = PortManager.nextFreePort(); + final String kafkaListeners = "kafka://0.0.0.0:" + kafkaBrokerPort + + ",kafka_external://0.0.0.0:" + externalPort; + conf.setKafkaListeners(kafkaListeners); + final String advertisedListeners = + "pulsar:pulsar://" + localAddress + ":" + brokerPort + + ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort + + ",kafka_external:pulsar://" + "localhost:" + externalPort; + conf.setAdvertisedListeners(advertisedListeners); + log.info("Set advertisedListeners to {}", advertisedListeners); + super.internalSetup(); + + kafkaProducerSend("localhost:" + kafkaBrokerPort); + kafkaProducerSend("localhost:" + externalPort); + + super.internalCleanup(); + } + + private void kafkaProducerSend(String server) throws ExecutionException, InterruptedException, TimeoutException { + final Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + final KafkaProducer producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my-topic", "my-message"), (metadata, exception) -> { if (exception == null) { log.info("Send to {}", metadata); } else { log.error("Send failed: {}", exception.getMessage()); } - }); + }).get(30, TimeUnit.SECONDS); producer.close(); - - super.internalCleanup(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 90aaee9218..aa98439e13 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -134,7 +134,7 @@ public KopProtocolHandlerTestBase(final String entryFormat) { } protected EndPoint getPlainEndPoint() { - return new EndPoint(PLAINTEXT_PREFIX + "127.0.0.1:" + kafkaBrokerPort); + return new EndPoint(PLAINTEXT_PREFIX + "127.0.0.1:" + kafkaBrokerPort, null); } protected void resetConfig() {