diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 1b5445f3e1..bcd66a53a4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey; import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; +import com.google.common.collect.Maps; import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; @@ -23,7 +24,6 @@ import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -57,7 +57,7 @@ class AdminManager { private final PulsarAdmin admin; private final int defaultNumPartitions; - private volatile Set brokersCache = new HashSet<>(); + private volatile Map> brokersCache = Maps.newHashMap(); private final ReentrantReadWriteLock brokersCacheLock = new ReentrantReadWriteLock(); @@ -227,11 +227,20 @@ public void deleteTopic(String topicToDelete, } } - public Collection getBrokers() { + public Collection getBrokers(String listenerName) { + + if (brokersCache.containsKey(listenerName)) { + return brokersCache.get(listenerName); + } + + return Collections.emptyList(); + } + + public Map> getAllBrokers() { return brokersCache; } - public void setBrokers(Set newBrokers) { + public void setBrokers(Map> newBrokers) { brokersCacheLock.writeLock().lock(); try { this.brokersCache = newBrokers; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java index dfd978ce36..ef80d5b91c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java @@ -30,7 +30,7 @@ */ public class EndPoint { - private static final String END_POINT_SEPARATOR = ","; + public static final String END_POINT_SEPARATOR = ","; private static final String PROTOCOL_MAP_SEPARATOR = ","; private static final String PROTOCOL_SEPARATOR = ":"; private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; @@ -52,9 +52,7 @@ public class EndPoint { public EndPoint(final String listener, final Map protocolMap) { this.originalListener = listener; final String errorMessage = "listener '" + listener + "' is invalid"; - final Matcher matcher = PATTERN.matcher(listener); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); + final Matcher matcher = matcherListener(listener, errorMessage); this.listenerName = matcher.group(1); if (protocolMap == null || protocolMap.isEmpty()) { @@ -82,8 +80,6 @@ public EndPoint(final String listener, final Map proto checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid"); } - - public InetSocketAddress getInetAddress() { return new InetSocketAddress(hostname, port); } @@ -148,4 +144,12 @@ public static Map parseProtocolMap(final String kafkaP } return protocolMap; } + + public static Matcher matcherListener(String listener, String errorMessage) { + final Matcher matcher = PATTERN.matcher(listener); + checkState(matcher.find(), errorMessage); + checkState(matcher.groupCount() == 3, errorMessage); + return matcher; + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index aa83281e89..bf3c394d8a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -65,7 +65,8 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter { @Getter protected final KafkaServiceConfiguration kafkaConfig; - public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) { + public KafkaCommandDecoder(StatsLogger statsLogger, + KafkaServiceConfiguration kafkaConfig) { this.requestStats = new RequestStats(statsLogger); this.kafkaConfig = kafkaConfig; this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index e00008724c..78192e6106 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -524,7 +524,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar, List allTopicMetadata = Collections.synchronizedList(Lists.newArrayList()); List allNodes = Collections.synchronizedList(Lists.newArrayList()); // Get all kop brokers in local cache - allNodes.addAll(adminManager.getBrokers()); + allNodes.addAll(adminManager.getBrokers(advertisedEndPoint.getListenerName())); List topics = metadataRequest.topics(); // topics in format : persistent://%s/%s/abc-partition-x, will be grouped by as: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 174ba15bd2..7486de2246 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,8 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; @@ -28,7 +26,6 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.Getter; import lombok.NonNull; import lombok.Setter; @@ -64,9 +61,6 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @Category private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction"; - private static final String END_POINT_SEPARATOR = ","; - private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; - private static final Pattern PATTERN = Pattern.compile(REGEX); // // --- Kafka on Pulsar Broker configuration --- // @@ -397,11 +391,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private String checkAdvertisedListeners(String advertisedListeners) { StringBuilder listenersReBuilder = new StringBuilder(); - for (String listener : advertisedListeners.split(END_POINT_SEPARATOR)) { + for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) { final String errorMessage = "listener '" + listener + "' is invalid"; - final Matcher matcher = PATTERN.matcher(listener); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); + final Matcher matcher = EndPoint.matcherListener(listener, errorMessage); String hostname = matcher.group(2); if (hostname.isEmpty()) { try { @@ -417,9 +409,10 @@ private String checkAdvertisedListeners(String advertisedListeners) { } else { listenersReBuilder.append(listener); } - listenersReBuilder.append(END_POINT_SEPARATOR); + listenersReBuilder.append(EndPoint.END_POINT_SEPARATOR); } - return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString(); + return listenersReBuilder.deleteCharAt( + listenersReBuilder.lastIndexOf(EndPoint.END_POINT_SEPARATOR)).toString(); } public @NonNull String getKafkaAdvertisedListeners() { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java index a0d8eb7a53..abef7742ea 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java @@ -13,9 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -26,11 +25,13 @@ import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread; import java.nio.charset.StandardCharsets; -import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -38,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -52,9 +52,6 @@ @Slf4j public class KopEventManager { - private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; - private static final Pattern PATTERN = Pattern.compile(REGEX); - private static final String kopEventThreadName = "kop-event-thread"; private final ReentrantLock putLock = new ReentrantLock(); private static final LinkedBlockingQueue queue = @@ -186,7 +183,7 @@ private void getBrokers(List pulsarBrokers, BiConsumer registerEventLatency, String name, long startProcessTime) { - final Set kopBrokers = Sets.newConcurrentHashSet(); + ConcurrentMap> kopBrokersMap = Maps.newConcurrentMap(); final AtomicInteger pendingBrokers = new AtomicInteger(pulsarBrokers.size()); pulsarBrokers.forEach(broker -> { @@ -207,9 +204,14 @@ private void getBrokers(List pulsarBrokers, JsonElement element = protocols.get("kafka"); if (element != null) { - String kopBrokerStr = element.getAsString(); - Node kopNode = getNode(kopBrokerStr); - kopBrokers.add(kopNode); + String kopBrokerStrs = element.getAsString(); + Map> kopNodesMap = getNodes(kopBrokerStrs); + kopNodesMap.forEach((listenerName, nodesSet) -> { + Set currentNodeSet = kopBrokersMap.computeIfAbsent(listenerName, + s -> Sets.newConcurrentHashSet()); + currentNodeSet.addAll(nodesSet); + kopBrokersMap.put(listenerName, currentNodeSet); + }); } else { if (log.isDebugEnabled()) { log.debug("Get broker {} path currently not a kop broker, skip it.", broker); @@ -222,13 +224,13 @@ private void getBrokers(List pulsarBrokers, } if (pendingBrokers.decrementAndGet() == 0) { - Collection oldKopBrokers = adminManager.getBrokers(); - adminManager.setBrokers(kopBrokers); + Map> oldKopBrokers = adminManager.getAllBrokers(); + adminManager.setBrokers(kopBrokersMap); if (registerEventLatency != null) { registerEventLatency.accept(name, startProcessTime); } log.info("Refresh kop brokers new cache {}, old brokers cache {}", - adminManager.getBrokers(), oldKopBrokers); + adminManager.getAllBrokers(), oldKopBrokers); } } ); @@ -242,18 +244,24 @@ private JsonObject parseJsonObject(String info) { } @VisibleForTesting - public static Node getNode(String kopBrokerStr) { - final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; - final Matcher matcher = PATTERN.matcher(kopBrokerStr); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); - String host = matcher.group(2); - String port = matcher.group(3); - - return new Node( - Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)), - host, - Integer.parseInt(port)); + public static Map> getNodes(String kopBrokerStrs) { + HashMap> nodesMap = Maps.newHashMap(); + String[] kopBrokerArr = kopBrokerStrs.split(EndPoint.END_POINT_SEPARATOR); + for (String kopBrokerStr : kopBrokerArr) { + final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; + final Matcher matcher = EndPoint.matcherListener(kopBrokerStr, errorMessage); + String listenerName = matcher.group(1); + String host = matcher.group(2); + String port = matcher.group(3); + Set nodeSet = nodesMap.computeIfAbsent(listenerName, s -> new HashSet<>()); + nodeSet.add(new Node( + Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)), + host, + Integer.parseInt(port))); + nodesMap.put(listenerName, nodeSet); + } + + return nodesMap; } @Getter diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java index 805fce2396..fa2fb33961 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import java.util.Map; +import java.util.Set; import org.apache.kafka.common.Node; import org.testng.Assert; import org.testng.annotations.Test; @@ -20,14 +22,48 @@ public class KopEventManagerTest { @Test - public void testGetNode() { + public void testGetOneNode() { final String host = "localhost"; final int port = 9120; final String securityProtocol = "SASL_PLAINTEXT"; final String brokerStr = securityProtocol + "://" + host + ":" + port; - Node node = KopEventManager.getNode(brokerStr); - Assert.assertEquals(node.host(), host); - Assert.assertEquals(node.port(), port); + Map> nodes = KopEventManager.getNodes(brokerStr); + Assert.assertEquals(1, nodes.size()); + Assert.assertTrue(nodes.containsKey(securityProtocol)); + Set nodeSet = nodes.get(securityProtocol); + Assert.assertEquals(1, nodeSet.size()); + nodeSet.forEach(node -> { + Assert.assertEquals(node.host(), host); + Assert.assertEquals(node.port(), port); + }); + } + + @Test + public void testGetMultipleNodes() { + final String listenerName1 = "kafka_internal"; + final String host1 = "localhost"; + final int port1 = 9120; + final String listenerName2 = "kafka_external"; + final String host2 = "localhost"; + final int port2 = 9121; + final String brokersStr = listenerName1 + "://" + host1 + ":" + port1 + "," + + listenerName2 + "://" + host2 + ":" + port2; + Map> nodes = KopEventManager.getNodes(brokersStr); + Assert.assertEquals(2, nodes.size()); + Assert.assertTrue(nodes.containsKey(listenerName1)); + Set nodesSet1 = nodes.get(listenerName1); + Assert.assertEquals(1, nodesSet1.size()); + nodesSet1.forEach(node -> { + Assert.assertEquals(node.host(), host1); + Assert.assertEquals(node.port(), port1); + }); + Assert.assertTrue(nodes.containsKey(listenerName2)); + Set nodesSet2 = nodes.get(listenerName2); + Assert.assertEquals(1, nodesSet2.size()); + nodesSet2.forEach(node -> { + Assert.assertEquals(node.host(), host2); + Assert.assertEquals(node.port(), port2); + }); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index 834f22a41e..fefafb982f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -21,8 +21,10 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.testng.Assert; import org.testng.annotations.Test; @@ -50,7 +52,7 @@ public void testListenerName() throws Exception { conf.setInternalListenerName("pulsar"); final String advertisedListeners = "pulsar:pulsar://" + localAddress + ":" + brokerPort - + ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort; + + ",kafka:pulsar://localhost:" + kafkaBrokerPort; conf.setAdvertisedListeners(advertisedListeners); conf.setKafkaListenerName("kafka"); log.info("Set advertisedListeners to {}", advertisedListeners); @@ -75,8 +77,8 @@ public void testMultipleListenerName() throws Exception { conf.setKafkaListeners(kafkaListeners); final String advertisedListeners = "pulsar:pulsar://" + localAddress + ":" + brokerPort - + ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort - + ",kafka_external:pulsar://" + "localhost:" + externalPort; + + ",kafka:pulsar://localhost:" + kafkaBrokerPort + + ",kafka_external:pulsar://localhost:" + externalPort; conf.setAdvertisedListeners(advertisedListeners); log.info("Set advertisedListeners to {}", advertisedListeners); super.internalSetup(); @@ -87,6 +89,28 @@ public void testMultipleListenerName() throws Exception { super.internalCleanup(); } + @Test(timeOut = 20000) + public void testConnectListenerNotExist() throws Exception { + final int externalPort = PortManager.nextFreePort(); + super.resetConfig(); + conf.setAdvertisedAddress(null); + conf.setKafkaListeners("kafka://0.0.0.0:" + kafkaBrokerPort + ",kafka_external://0.0.0.0:" + externalPort); + conf.setKafkaProtocolMap("kafka:PLAINTEXT,kafka_external:PLAINTEXT"); + conf.setAdvertisedListeners("pulsar:pulsar://localhost:" + brokerPort + + ",kafka:pulsar://localhost:" + kafkaBrokerPort + + ",kafka_external:pulsar://localhost:" + externalPort); + super.internalSetup(); + final Properties props = newKafkaProducerProperties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); + final KafkaProducer producer = new KafkaProducer<>(props); + RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("my-topic", "hello")).get(); + Assert.assertNotNull(recordMetadata); + Assert.assertEquals(0, recordMetadata.offset()); + producer.close(); + super.internalCleanup(); + } + + private void kafkaProducerSend(String server) throws ExecutionException, InterruptedException, TimeoutException { final Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);