From 1fe5884d4ee3da78039708c8308a7cd84e4b9b65 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Mon, 27 Sep 2021 01:56:57 +0800 Subject: [PATCH 1/8] fix wrong broker cache with multiple listeners --- .../pulsar/handlers/kop/AdminManager.java | 12 ++-- .../handlers/kop/KafkaCommandDecoder.java | 28 +++++++-- .../handlers/kop/KafkaRequestHandler.java | 4 +- .../pulsar/handlers/kop/KopEventManager.java | 57 ++++++++++++------- .../handlers/kop/KopEventManagerTest.java | 44 ++++++++++++-- .../kop/EntryPublishTimeKafkaFormatTest.java | 7 ++- .../pulsar/handlers/kop/KafkaApisTest.java | 3 +- .../handlers/kop/KafkaListenerNameTest.java | 28 +++++++++ .../handlers/kop/KafkaRequestHandlerTest.java | 35 ++++++++---- ...kaRequestHandlerWithAuthorizationTest.java | 28 +++++++-- .../handlers/kop/SaslPlainTestBase.java | 3 +- 11 files changed, 195 insertions(+), 54 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 8d58ae80fb..d698c44b35 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey; import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; +import com.google.common.collect.Maps; import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; @@ -23,7 +24,6 @@ import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -56,7 +56,7 @@ class AdminManager { private final PulsarAdmin admin; private final int defaultNumPartitions; - private volatile Set brokersCache = new HashSet<>(); + private volatile Map> brokersCache = Maps.newHashMap(); private final ReentrantReadWriteLock brokersCacheLock = new ReentrantReadWriteLock(); @@ -229,11 +229,15 @@ public Map deleteTopics(Set topicsToDelete) { return result; } - public Collection getBrokers() { + public Collection getBrokers(String listenerName) { + return brokersCache.get(listenerName); + } + + public Map> getAllBrokers() { return brokersCache; } - public void setBrokers(Set newBrokers) { + public void setBrokers(Map> newBrokers) { brokersCacheLock.writeLock().lock(); try { this.brokersCache = newBrokers; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index aa83281e89..4d9f742bd6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -64,11 +64,16 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter { protected final RequestStats requestStats; @Getter protected final KafkaServiceConfiguration kafkaConfig; + @Getter + protected final EndPoint advertisedEndPoint; - public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) { + public KafkaCommandDecoder(StatsLogger statsLogger, + KafkaServiceConfiguration kafkaConfig, + EndPoint advertisedEndPoint) { this.requestStats = new RequestStats(statsLogger); this.kafkaConfig = kafkaConfig; this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests()); + this.advertisedEndPoint = advertisedEndPoint; } @Override @@ -138,13 +143,21 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg, RequestHeader header = RequestHeader.parse(nio); if (isUnsupportedApiVersionsRequest(header)) { ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion()); - return new KafkaHeaderAndRequest(header, apiVersionsRequest, msg, remoteAddress); + return new KafkaHeaderAndRequest(header, + apiVersionsRequest, + msg, + remoteAddress, + advertisedEndPoint.getListenerName()); } else { ApiKeys apiKey = header.apiKey(); short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, nio); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaHeaderAndRequest(header, body, msg, remoteAddress); + return new KafkaHeaderAndRequest(header, + body, + msg, + remoteAddress, + advertisedEndPoint.getListenerName()); } } @@ -556,15 +569,18 @@ static class KafkaHeaderAndRequest implements Closeable { private final AbstractRequest request; private final ByteBuf buffer; private final SocketAddress remoteAddress; + private final String listenerName; KafkaHeaderAndRequest(RequestHeader header, AbstractRequest request, ByteBuf buffer, - SocketAddress remoteAddress) { + SocketAddress remoteAddress, + String listenerName) { this.header = header; this.request = request; this.buffer = buffer.retain(); this.remoteAddress = remoteAddress; + this.listenerName = listenerName; } public ByteBuf getBuffer() { @@ -583,6 +599,10 @@ public SocketAddress getRemoteAddress() { return this.remoteAddress; } + public String getListenerName() { + return this.listenerName; + } + public String getClientHost() { if (remoteAddress == null) { return DEFAULT_CLIENT_HOST; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index c4f7b5d80b..c0b39aea83 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -282,7 +282,7 @@ public KafkaRequestHandler(PulsarService pulsarService, Boolean tlsEnabled, EndPoint advertisedEndPoint, StatsLogger statsLogger) throws Exception { - super(statsLogger, kafkaConfig); + super(statsLogger, kafkaConfig, advertisedEndPoint); this.pulsarService = pulsarService; this.tenantContextManager = tenantContextManager; this.clusterName = kafkaConfig.getClusterName(); @@ -524,7 +524,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar, List allTopicMetadata = Collections.synchronizedList(Lists.newArrayList()); List allNodes = Collections.synchronizedList(Lists.newArrayList()); // Get all kop brokers in local cache - allNodes.addAll(adminManager.getBrokers()); + allNodes.addAll(adminManager.getBrokers(metadataHar.getListenerName())); List topics = metadataRequest.topics(); // topics in format : persistent://%s/%s/abc-partition-x, will be grouped by as: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java index a0d8eb7a53..440532d171 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -26,11 +27,13 @@ import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread; import java.nio.charset.StandardCharsets; -import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -52,6 +55,7 @@ @Slf4j public class KopEventManager { + private static final String END_POINT_SEPARATOR = ","; private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; private static final Pattern PATTERN = Pattern.compile(REGEX); @@ -186,7 +190,7 @@ private void getBrokers(List pulsarBrokers, BiConsumer registerEventLatency, String name, long startProcessTime) { - final Set kopBrokers = Sets.newConcurrentHashSet(); + ConcurrentMap> kopBrokersMap = Maps.newConcurrentMap(); final AtomicInteger pendingBrokers = new AtomicInteger(pulsarBrokers.size()); pulsarBrokers.forEach(broker -> { @@ -207,9 +211,14 @@ private void getBrokers(List pulsarBrokers, JsonElement element = protocols.get("kafka"); if (element != null) { - String kopBrokerStr = element.getAsString(); - Node kopNode = getNode(kopBrokerStr); - kopBrokers.add(kopNode); + String kopBrokerStrs = element.getAsString(); + Map> kopNodesMap = getNodes(kopBrokerStrs); + kopNodesMap.forEach((listenerName, nodesSet) -> { + Set currentNodeSet = kopBrokersMap.computeIfAbsent(listenerName, + s -> Sets.newConcurrentHashSet()); + currentNodeSet.addAll(nodesSet); + kopBrokersMap.put(listenerName, currentNodeSet); + }); } else { if (log.isDebugEnabled()) { log.debug("Get broker {} path currently not a kop broker, skip it.", broker); @@ -222,13 +231,13 @@ private void getBrokers(List pulsarBrokers, } if (pendingBrokers.decrementAndGet() == 0) { - Collection oldKopBrokers = adminManager.getBrokers(); - adminManager.setBrokers(kopBrokers); + Map> oldKopBrokers = adminManager.getAllBrokers(); + adminManager.setBrokers(kopBrokersMap); if (registerEventLatency != null) { registerEventLatency.accept(name, startProcessTime); } log.info("Refresh kop brokers new cache {}, old brokers cache {}", - adminManager.getBrokers(), oldKopBrokers); + adminManager.getAllBrokers(), oldKopBrokers); } } ); @@ -242,18 +251,26 @@ private JsonObject parseJsonObject(String info) { } @VisibleForTesting - public static Node getNode(String kopBrokerStr) { - final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; - final Matcher matcher = PATTERN.matcher(kopBrokerStr); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); - String host = matcher.group(2); - String port = matcher.group(3); - - return new Node( - Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)), - host, - Integer.parseInt(port)); + public static Map> getNodes(String kopBrokerStrs) { + HashMap> nodesMap = Maps.newHashMap(); + String[] kopBrokerArr = kopBrokerStrs.split(END_POINT_SEPARATOR); + for (String kopBrokerStr : kopBrokerArr) { + final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; + final Matcher matcher = PATTERN.matcher(kopBrokerStr); + checkState(matcher.find(), errorMessage); + checkState(matcher.groupCount() == 3, errorMessage); + String listenerName = matcher.group(1); + String host = matcher.group(2); + String port = matcher.group(3); + Set nodeSet = nodesMap.computeIfAbsent(listenerName, s -> new HashSet<>()); + nodeSet.add(new Node( + Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)), + host, + Integer.parseInt(port))); + nodesMap.put(listenerName, nodeSet); + } + + return nodesMap; } @Getter diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java index 805fce2396..fa2fb33961 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KopEventManagerTest.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import java.util.Map; +import java.util.Set; import org.apache.kafka.common.Node; import org.testng.Assert; import org.testng.annotations.Test; @@ -20,14 +22,48 @@ public class KopEventManagerTest { @Test - public void testGetNode() { + public void testGetOneNode() { final String host = "localhost"; final int port = 9120; final String securityProtocol = "SASL_PLAINTEXT"; final String brokerStr = securityProtocol + "://" + host + ":" + port; - Node node = KopEventManager.getNode(brokerStr); - Assert.assertEquals(node.host(), host); - Assert.assertEquals(node.port(), port); + Map> nodes = KopEventManager.getNodes(brokerStr); + Assert.assertEquals(1, nodes.size()); + Assert.assertTrue(nodes.containsKey(securityProtocol)); + Set nodeSet = nodes.get(securityProtocol); + Assert.assertEquals(1, nodeSet.size()); + nodeSet.forEach(node -> { + Assert.assertEquals(node.host(), host); + Assert.assertEquals(node.port(), port); + }); + } + + @Test + public void testGetMultipleNodes() { + final String listenerName1 = "kafka_internal"; + final String host1 = "localhost"; + final int port1 = 9120; + final String listenerName2 = "kafka_external"; + final String host2 = "localhost"; + final int port2 = 9121; + final String brokersStr = listenerName1 + "://" + host1 + ":" + port1 + "," + + listenerName2 + "://" + host2 + ":" + port2; + Map> nodes = KopEventManager.getNodes(brokersStr); + Assert.assertEquals(2, nodes.size()); + Assert.assertTrue(nodes.containsKey(listenerName1)); + Set nodesSet1 = nodes.get(listenerName1); + Assert.assertEquals(1, nodesSet1.size()); + nodesSet1.forEach(node -> { + Assert.assertEquals(node.host(), host1); + Assert.assertEquals(node.port(), port1); + }); + Assert.assertTrue(nodes.containsKey(listenerName2)); + Set nodesSet2 = nodes.get(listenerName2); + Assert.assertEquals(1, nodesSet2.size()); + nodesSet2.forEach(node -> { + Assert.assertEquals(node.host(), host2); + Assert.assertEquals(node.port(), port2); + }); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java index 6e3dcd63cf..056ca1f6de 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; @@ -131,7 +132,11 @@ KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder b short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); + return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, + body, + byteBuf, + serviceAddress, + SecurityProtocol.PLAINTEXT.name); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index d1fd326b0a..f8ca0f29ce 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -76,6 +76,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest.PartitionData; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.protocol.ProtocolHandler; @@ -180,7 +181,7 @@ KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); + return new KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress, SecurityProtocol.PLAINTEXT.name); } void checkInvalidPartition(CompletableFuture future, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index 834f22a41e..b3651fa2a0 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -21,8 +21,10 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.testng.Assert; import org.testng.annotations.Test; @@ -87,6 +89,32 @@ public void testMultipleListenerName() throws Exception { super.internalCleanup(); } + @Test(timeOut = 20000) + public void testConnectListenerNotExist() throws Exception { + final int externalPort = PortManager.nextFreePort(); + super.resetConfig(); + conf.setAdvertisedAddress(null); + conf.setKafkaListeners("kafka://0.0.0.0:" + kafkaBrokerPort + ",kafka_external://0.0.0.0:" + externalPort); + conf.setKafkaProtocolMap("kafka:PLAINTEXT,kafka_external:PLAINTEXT"); + conf.setAdvertisedListeners("pulsar:pulsar://localhost:" + brokerPort + + ",kafka_external:pulsar://0.0.0.0:" + externalPort); + super.internalSetup(); + final Properties props = newKafkaProducerProperties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); + final KafkaProducer producer = new KafkaProducer<>(props); + RecordMetadata recordMetadata = null; + try { + recordMetadata = producer.send(new ProducerRecord<>("my-topic", "hello")).get(); + Assert.assertTrue(recordMetadata != null); + Assert.assertEquals(1, recordMetadata.offset()); + } catch (InterruptedException | ExecutionException e) { + log.info("Send failed: {}", e.getMessage()); + } + producer.close(); + super.internalCleanup(); + } + + private void kafkaProducerSend(String server) throws ExecutionException, InterruptedException, TimeoutException { final Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index c6e147e15d..4c6860bf19 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -84,6 +84,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; @@ -210,16 +211,17 @@ public void testResponseToByteBuf() throws Exception { ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build(); RequestHeader requestHeader = new RequestHeader( - ApiKeys.API_VERSIONS, - ApiKeys.API_VERSIONS.latestVersion(), - clientId, - correlationId); + ApiKeys.API_VERSIONS, + ApiKeys.API_VERSIONS.latestVersion(), + clientId, + correlationId); KafkaHeaderAndRequest kopRequest = new KafkaHeaderAndRequest( - requestHeader, - apiVersionsRequest, - Unpooled.buffer(20), - null); + requestHeader, + apiVersionsRequest, + Unpooled.buffer(20), + null, + SecurityProtocol.PLAINTEXT.name); ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(); KafkaHeaderAndResponse kopResponse = KafkaHeaderAndResponse.responseForRequest( @@ -655,7 +657,10 @@ public void testOffsetCommitRequestRetentionMs() throws Exception { RequestHeader header = new RequestHeader(ApiKeys.OFFSET_COMMIT, offsetCommitRequest.version(), "", 0); KafkaHeaderAndRequest headerAndRequest = new KafkaHeaderAndRequest(header, - offsetCommitRequest, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null); + offsetCommitRequest, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name); // handle request CompletableFuture future = new CompletableFuture<>(); @@ -699,7 +704,11 @@ public void testListOffsetsForNotExistedTopic() throws Exception { .setTargetTimes(Collections.singletonMap(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)) .build(ApiKeys.LIST_OFFSETS.latestVersion()); handler.handleListOffsetRequest( - new KafkaHeaderAndRequest(header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + new KafkaHeaderAndRequest(header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); final ListOffsetResponse response = (ListOffsetResponse) responseFuture.get(); assertTrue(response.responseData().containsKey(topicPartition)); @@ -715,7 +724,11 @@ public void testMetadataForNonPartitionedTopic(short version) throws Exception { final MetadataRequest request = new MetadataRequest(Collections.singletonList(topic), false, version); final CompletableFuture responseFuture = new CompletableFuture<>(); handler.handleTopicMetadataRequest( - new KafkaHeaderAndRequest(header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + new KafkaHeaderAndRequest(header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index 94c81ce0ca..5f3ca4d4fc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -67,6 +67,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; @@ -215,7 +216,11 @@ public void testMetadataForPartitionedTopicFailed(short version) throws Exceptio final CompletableFuture responseFuture = new CompletableFuture<>(); handler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); @@ -236,7 +241,11 @@ public void testMetadataForPartitionedTopicSuccess(short version) throws Excepti final CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); @@ -261,7 +270,11 @@ public void testMetadataListTopic() throws Exception { final CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); String localName = TopicName.get(topic).getLocalName(); @@ -292,7 +305,8 @@ public void testHandleProduceRequest() throws ExecutionException, InterruptedExc header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null), responseFuture); + null, + SecurityProtocol.PLAINTEXT.name), responseFuture); AbstractResponse response = responseFuture.get(); assertEquals((int) response.errorCounts().get(Errors.TOPIC_AUTHORIZATION_FAILED), 1); } @@ -463,7 +477,11 @@ KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder b short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); + return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, + body, + byteBuf, + serviceAddress, + SecurityProtocol.PLAINTEXT.name); } private ProduceRequest createProduceRequest(String topic) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java index ebf5ad6935..b1a3631c0c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java @@ -113,8 +113,7 @@ protected void setup() throws Exception { .build()); admin.namespaces().createNamespace(TENANT + "/" + NAMESPACE); admin.topics().createPartitionedTopic(TOPIC, 1); - admin - .namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER, + admin.namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER, Sets.newHashSet(AuthAction.consume, AuthAction.produce)); } From 3eddb403aebd4855bbc4e0ce3139ce4c0570642c Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Mon, 27 Sep 2021 08:50:06 +0800 Subject: [PATCH 2/8] fix NPE --- .../pulsar/handlers/kop/AdminManager.java | 6 +++++- .../pulsar/handlers/kop/KafkaListenerNameTest.java | 11 +++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index d698c44b35..5ffa409e5c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -230,7 +230,11 @@ public Map deleteTopics(Set topicsToDelete) { } public Collection getBrokers(String listenerName) { - return brokersCache.get(listenerName); + if (brokersCache.containsKey(listenerName)) { + return brokersCache.get(listenerName); + } else { + return Collections.emptyList(); + } } public Map> getAllBrokers() { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index b3651fa2a0..7052a3bbc6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -102,14 +102,9 @@ public void testConnectListenerNotExist() throws Exception { final Properties props = newKafkaProducerProperties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); final KafkaProducer producer = new KafkaProducer<>(props); - RecordMetadata recordMetadata = null; - try { - recordMetadata = producer.send(new ProducerRecord<>("my-topic", "hello")).get(); - Assert.assertTrue(recordMetadata != null); - Assert.assertEquals(1, recordMetadata.offset()); - } catch (InterruptedException | ExecutionException e) { - log.info("Send failed: {}", e.getMessage()); - } + RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("my-topic", "hello")).get(); + Assert.assertNotNull(recordMetadata); + Assert.assertEquals(1, recordMetadata.offset()); producer.close(); super.internalCleanup(); } From f99249eeb7f1e46fdeb7c1b0b76467a1dbfee4e2 Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Mon, 27 Sep 2021 09:13:13 +0800 Subject: [PATCH 3/8] fix checkstyle error --- .../io/streamnative/pulsar/handlers/kop/AdminManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 5ffa409e5c..3782d3dff9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -230,11 +230,12 @@ public Map deleteTopics(Set topicsToDelete) { } public Collection getBrokers(String listenerName) { + if (brokersCache.containsKey(listenerName)) { return brokersCache.get(listenerName); - } else { - return Collections.emptyList(); - } + } + + return Collections.emptyList(); } public Map> getAllBrokers() { From d6d7a4019280fa43a63249fc7f371c526f308122 Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Mon, 27 Sep 2021 09:59:10 +0800 Subject: [PATCH 4/8] fix test failed --- .../pulsar/handlers/kop/KafkaListenerNameTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index 7052a3bbc6..e965aa3180 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -52,7 +52,7 @@ public void testListenerName() throws Exception { conf.setInternalListenerName("pulsar"); final String advertisedListeners = "pulsar:pulsar://" + localAddress + ":" + brokerPort - + ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort; + + ",kafka:pulsar://localhost:" + kafkaBrokerPort; conf.setAdvertisedListeners(advertisedListeners); conf.setKafkaListenerName("kafka"); log.info("Set advertisedListeners to {}", advertisedListeners); @@ -77,8 +77,8 @@ public void testMultipleListenerName() throws Exception { conf.setKafkaListeners(kafkaListeners); final String advertisedListeners = "pulsar:pulsar://" + localAddress + ":" + brokerPort - + ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort - + ",kafka_external:pulsar://" + "localhost:" + externalPort; + + ",kafka:pulsar://localhost:" + kafkaBrokerPort + + ",kafka_external:pulsar://localhost:" + externalPort; conf.setAdvertisedListeners(advertisedListeners); log.info("Set advertisedListeners to {}", advertisedListeners); super.internalSetup(); @@ -97,7 +97,8 @@ public void testConnectListenerNotExist() throws Exception { conf.setKafkaListeners("kafka://0.0.0.0:" + kafkaBrokerPort + ",kafka_external://0.0.0.0:" + externalPort); conf.setKafkaProtocolMap("kafka:PLAINTEXT,kafka_external:PLAINTEXT"); conf.setAdvertisedListeners("pulsar:pulsar://localhost:" + brokerPort - + ",kafka_external:pulsar://0.0.0.0:" + externalPort); + + ",kafka:pulsar://localhost:" + kafkaBrokerPort + + ",kafka_external:pulsar://localhost:" + externalPort); super.internalSetup(); final Properties props = newKafkaProducerProperties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); From 6a41d83f1c0e28425bf18fd79d1ef24cb43cdaf1 Mon Sep 17 00:00:00 2001 From: wenbing1 Date: Mon, 27 Sep 2021 10:28:33 +0800 Subject: [PATCH 5/8] fix test failed --- .../streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java index e965aa3180..fefafb982f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java @@ -105,7 +105,7 @@ public void testConnectListenerNotExist() throws Exception { final KafkaProducer producer = new KafkaProducer<>(props); RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("my-topic", "hello")).get(); Assert.assertNotNull(recordMetadata); - Assert.assertEquals(1, recordMetadata.offset()); + Assert.assertEquals(0, recordMetadata.offset()); producer.close(); super.internalCleanup(); } From 7cff20e4b3a5ad72c8232bc8658fa90769420075 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Tue, 28 Sep 2021 18:20:26 +0800 Subject: [PATCH 6/8] addressed reviewer's comments --- .../pulsar/handlers/kop/EndPoint.java | 15 ++++++---- .../handlers/kop/KafkaCommandDecoder.java | 27 +++--------------- .../handlers/kop/KafkaRequestHandler.java | 4 +-- .../kop/KafkaServiceConfiguration.java | 17 ++++------- .../pulsar/handlers/kop/KopEventManager.java | 13 ++------- .../kop/EntryPublishTimeKafkaFormatTest.java | 7 +---- .../pulsar/handlers/kop/KafkaApisTest.java | 3 +- .../handlers/kop/KafkaRequestHandlerTest.java | 20 +++---------- ...kaRequestHandlerWithAuthorizationTest.java | 28 ++++--------------- .../handlers/kop/SaslPlainTestBase.java | 3 +- 10 files changed, 36 insertions(+), 101 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java index dfd978ce36..6dcd52e690 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java @@ -30,6 +30,7 @@ */ public class EndPoint { + @Getter private static final String END_POINT_SEPARATOR = ","; private static final String PROTOCOL_MAP_SEPARATOR = ","; private static final String PROTOCOL_SEPARATOR = ":"; @@ -52,9 +53,7 @@ public class EndPoint { public EndPoint(final String listener, final Map protocolMap) { this.originalListener = listener; final String errorMessage = "listener '" + listener + "' is invalid"; - final Matcher matcher = PATTERN.matcher(listener); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); + final Matcher matcher = matcherListener(listener, errorMessage); this.listenerName = matcher.group(1); if (protocolMap == null || protocolMap.isEmpty()) { @@ -82,8 +81,6 @@ public EndPoint(final String listener, final Map proto checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid"); } - - public InetSocketAddress getInetAddress() { return new InetSocketAddress(hostname, port); } @@ -148,4 +145,12 @@ public static Map parseProtocolMap(final String kafkaP } return protocolMap; } + + public static Matcher matcherListener(String listener, String errorMessage) { + final Matcher matcher = PATTERN.matcher(listener); + checkState(matcher.find(), errorMessage); + checkState(matcher.groupCount() == 3, errorMessage); + return matcher; + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 4d9f742bd6..bf3c394d8a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -64,16 +64,12 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter { protected final RequestStats requestStats; @Getter protected final KafkaServiceConfiguration kafkaConfig; - @Getter - protected final EndPoint advertisedEndPoint; public KafkaCommandDecoder(StatsLogger statsLogger, - KafkaServiceConfiguration kafkaConfig, - EndPoint advertisedEndPoint) { + KafkaServiceConfiguration kafkaConfig) { this.requestStats = new RequestStats(statsLogger); this.kafkaConfig = kafkaConfig; this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests()); - this.advertisedEndPoint = advertisedEndPoint; } @Override @@ -143,21 +139,13 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg, RequestHeader header = RequestHeader.parse(nio); if (isUnsupportedApiVersionsRequest(header)) { ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion()); - return new KafkaHeaderAndRequest(header, - apiVersionsRequest, - msg, - remoteAddress, - advertisedEndPoint.getListenerName()); + return new KafkaHeaderAndRequest(header, apiVersionsRequest, msg, remoteAddress); } else { ApiKeys apiKey = header.apiKey(); short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, nio); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaHeaderAndRequest(header, - body, - msg, - remoteAddress, - advertisedEndPoint.getListenerName()); + return new KafkaHeaderAndRequest(header, body, msg, remoteAddress); } } @@ -569,18 +557,15 @@ static class KafkaHeaderAndRequest implements Closeable { private final AbstractRequest request; private final ByteBuf buffer; private final SocketAddress remoteAddress; - private final String listenerName; KafkaHeaderAndRequest(RequestHeader header, AbstractRequest request, ByteBuf buffer, - SocketAddress remoteAddress, - String listenerName) { + SocketAddress remoteAddress) { this.header = header; this.request = request; this.buffer = buffer.retain(); this.remoteAddress = remoteAddress; - this.listenerName = listenerName; } public ByteBuf getBuffer() { @@ -599,10 +584,6 @@ public SocketAddress getRemoteAddress() { return this.remoteAddress; } - public String getListenerName() { - return this.listenerName; - } - public String getClientHost() { if (remoteAddress == null) { return DEFAULT_CLIENT_HOST; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index c0b39aea83..3f20dbad0c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -282,7 +282,7 @@ public KafkaRequestHandler(PulsarService pulsarService, Boolean tlsEnabled, EndPoint advertisedEndPoint, StatsLogger statsLogger) throws Exception { - super(statsLogger, kafkaConfig, advertisedEndPoint); + super(statsLogger, kafkaConfig); this.pulsarService = pulsarService; this.tenantContextManager = tenantContextManager; this.clusterName = kafkaConfig.getClusterName(); @@ -524,7 +524,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar, List allTopicMetadata = Collections.synchronizedList(Lists.newArrayList()); List allNodes = Collections.synchronizedList(Lists.newArrayList()); // Get all kop brokers in local cache - allNodes.addAll(adminManager.getBrokers(metadataHar.getListenerName())); + allNodes.addAll(adminManager.getBrokers(advertisedEndPoint.getListenerName())); List topics = metadataRequest.topics(); // topics in format : persistent://%s/%s/abc-partition-x, will be grouped by as: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 174ba15bd2..f145fc22cf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -13,8 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import java.io.FileInputStream; @@ -28,7 +26,6 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.Getter; import lombok.NonNull; import lombok.Setter; @@ -64,9 +61,6 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @Category private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction"; - private static final String END_POINT_SEPARATOR = ","; - private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; - private static final Pattern PATTERN = Pattern.compile(REGEX); // // --- Kafka on Pulsar Broker configuration --- // @@ -397,11 +391,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private String checkAdvertisedListeners(String advertisedListeners) { StringBuilder listenersReBuilder = new StringBuilder(); - for (String listener : advertisedListeners.split(END_POINT_SEPARATOR)) { + for (String listener : advertisedListeners.split(EndPoint.getEND_POINT_SEPARATOR())) { final String errorMessage = "listener '" + listener + "' is invalid"; - final Matcher matcher = PATTERN.matcher(listener); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); + final Matcher matcher = EndPoint.matcherListener(listener, errorMessage); String hostname = matcher.group(2); if (hostname.isEmpty()) { try { @@ -417,9 +409,10 @@ private String checkAdvertisedListeners(String advertisedListeners) { } else { listenersReBuilder.append(listener); } - listenersReBuilder.append(END_POINT_SEPARATOR); + listenersReBuilder.append(EndPoint.getEND_POINT_SEPARATOR()); } - return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString(); + return listenersReBuilder.deleteCharAt( + listenersReBuilder.lastIndexOf(EndPoint.getEND_POINT_SEPARATOR())).toString(); } public @NonNull String getKafkaAdvertisedListeners() { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java index 440532d171..cb8d0bc927 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java @@ -13,8 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -41,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -55,10 +52,6 @@ @Slf4j public class KopEventManager { - private static final String END_POINT_SEPARATOR = ","; - private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; - private static final Pattern PATTERN = Pattern.compile(REGEX); - private static final String kopEventThreadName = "kop-event-thread"; private final ReentrantLock putLock = new ReentrantLock(); private static final LinkedBlockingQueue queue = @@ -253,12 +246,10 @@ private JsonObject parseJsonObject(String info) { @VisibleForTesting public static Map> getNodes(String kopBrokerStrs) { HashMap> nodesMap = Maps.newHashMap(); - String[] kopBrokerArr = kopBrokerStrs.split(END_POINT_SEPARATOR); + String[] kopBrokerArr = kopBrokerStrs.split(EndPoint.getEND_POINT_SEPARATOR()); for (String kopBrokerStr : kopBrokerArr) { final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; - final Matcher matcher = PATTERN.matcher(kopBrokerStr); - checkState(matcher.find(), errorMessage); - checkState(matcher.groupCount() == 3, errorMessage); + final Matcher matcher = EndPoint.matcherListener(kopBrokerStr, errorMessage); String listenerName = matcher.group(1); String host = matcher.group(2); String port = matcher.group(3); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java index 056ca1f6de..6e3dcd63cf 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; @@ -132,11 +131,7 @@ KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder b short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, - body, - byteBuf, - serviceAddress, - SecurityProtocol.PLAINTEXT.name); + return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index f8ca0f29ce..d1fd326b0a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -76,7 +76,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest.PartitionData; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.protocol.ProtocolHandler; @@ -181,7 +180,7 @@ KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress, SecurityProtocol.PLAINTEXT.name); + return new KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } void checkInvalidPartition(CompletableFuture future, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 4c6860bf19..ff008c3b73 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -84,7 +84,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; @@ -220,8 +219,7 @@ public void testResponseToByteBuf() throws Exception { requestHeader, apiVersionsRequest, Unpooled.buffer(20), - null, - SecurityProtocol.PLAINTEXT.name); + null); ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(); KafkaHeaderAndResponse kopResponse = KafkaHeaderAndResponse.responseForRequest( @@ -657,10 +655,7 @@ public void testOffsetCommitRequestRetentionMs() throws Exception { RequestHeader header = new RequestHeader(ApiKeys.OFFSET_COMMIT, offsetCommitRequest.version(), "", 0); KafkaHeaderAndRequest headerAndRequest = new KafkaHeaderAndRequest(header, - offsetCommitRequest, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name); + offsetCommitRequest, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null); // handle request CompletableFuture future = new CompletableFuture<>(); @@ -705,11 +700,7 @@ public void testListOffsetsForNotExistedTopic() throws Exception { .build(ApiKeys.LIST_OFFSETS.latestVersion()); handler.handleListOffsetRequest( new KafkaHeaderAndRequest(header, - request, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), - responseFuture); + request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final ListOffsetResponse response = (ListOffsetResponse) responseFuture.get(); assertTrue(response.responseData().containsKey(topicPartition)); assertEquals(response.responseData().get(topicPartition).error, Errors.UNKNOWN_TOPIC_OR_PARTITION); @@ -725,10 +716,7 @@ public void testMetadataForNonPartitionedTopic(short version) throws Exception { final CompletableFuture responseFuture = new CompletableFuture<>(); handler.handleTopicMetadataRequest( new KafkaHeaderAndRequest(header, - request, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), + request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index 5f3ca4d4fc..94c81ce0ca 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -67,7 +67,6 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; @@ -216,11 +215,7 @@ public void testMetadataForPartitionedTopicFailed(short version) throws Exceptio final CompletableFuture responseFuture = new CompletableFuture<>(); handler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, - request, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), + header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); @@ -241,11 +236,7 @@ public void testMetadataForPartitionedTopicSuccess(short version) throws Excepti final CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, - request, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), + header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); @@ -270,11 +261,7 @@ public void testMetadataListTopic() throws Exception { final CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleTopicMetadataRequest( new KafkaCommandDecoder.KafkaHeaderAndRequest( - header, - request, - PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), + header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); String localName = TopicName.get(topic).getLocalName(); @@ -305,8 +292,7 @@ public void testHandleProduceRequest() throws ExecutionException, InterruptedExc header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), - null, - SecurityProtocol.PLAINTEXT.name), responseFuture); + null), responseFuture); AbstractResponse response = responseFuture.get(); assertEquals((int) response.errorCounts().get(Errors.TOPIC_AUTHORIZATION_FAILED), 1); } @@ -477,11 +463,7 @@ KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder b short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); - return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, - body, - byteBuf, - serviceAddress, - SecurityProtocol.PLAINTEXT.name); + return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); } private ProduceRequest createProduceRequest(String topic) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java index b1a3631c0c..ebf5ad6935 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java @@ -113,7 +113,8 @@ protected void setup() throws Exception { .build()); admin.namespaces().createNamespace(TENANT + "/" + NAMESPACE); admin.topics().createPartitionedTopic(TOPIC, 1); - admin.namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER, + admin + .namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER, Sets.newHashSet(AuthAction.consume, AuthAction.produce)); } From 296a73d2991cd400ca95127040bde8bddf550d17 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Wed, 29 Sep 2021 19:27:58 +0800 Subject: [PATCH 7/8] addressed reviewer's comments --- .../java/io/streamnative/pulsar/handlers/kop/EndPoint.java | 3 +-- .../pulsar/handlers/kop/KafkaServiceConfiguration.java | 6 +++--- .../streamnative/pulsar/handlers/kop/KopEventManager.java | 2 +- .../pulsar/handlers/kop/KafkaRequestHandlerTest.java | 7 +++---- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java index 6dcd52e690..ef80d5b91c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java @@ -30,8 +30,7 @@ */ public class EndPoint { - @Getter - private static final String END_POINT_SEPARATOR = ","; + public static final String END_POINT_SEPARATOR = ","; private static final String PROTOCOL_MAP_SEPARATOR = ","; private static final String PROTOCOL_SEPARATOR = ":"; private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)"; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index f145fc22cf..7486de2246 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -391,7 +391,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private String checkAdvertisedListeners(String advertisedListeners) { StringBuilder listenersReBuilder = new StringBuilder(); - for (String listener : advertisedListeners.split(EndPoint.getEND_POINT_SEPARATOR())) { + for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) { final String errorMessage = "listener '" + listener + "' is invalid"; final Matcher matcher = EndPoint.matcherListener(listener, errorMessage); String hostname = matcher.group(2); @@ -409,10 +409,10 @@ private String checkAdvertisedListeners(String advertisedListeners) { } else { listenersReBuilder.append(listener); } - listenersReBuilder.append(EndPoint.getEND_POINT_SEPARATOR()); + listenersReBuilder.append(EndPoint.END_POINT_SEPARATOR); } return listenersReBuilder.deleteCharAt( - listenersReBuilder.lastIndexOf(EndPoint.getEND_POINT_SEPARATOR())).toString(); + listenersReBuilder.lastIndexOf(EndPoint.END_POINT_SEPARATOR)).toString(); } public @NonNull String getKafkaAdvertisedListeners() { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java index cb8d0bc927..abef7742ea 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java @@ -246,7 +246,7 @@ private JsonObject parseJsonObject(String info) { @VisibleForTesting public static Map> getNodes(String kopBrokerStrs) { HashMap> nodesMap = Maps.newHashMap(); - String[] kopBrokerArr = kopBrokerStrs.split(EndPoint.getEND_POINT_SEPARATOR()); + String[] kopBrokerArr = kopBrokerStrs.split(EndPoint.END_POINT_SEPARATOR); for (String kopBrokerStr : kopBrokerArr) { final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid"; final Matcher matcher = EndPoint.matcherListener(kopBrokerStr, errorMessage); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index ff008c3b73..83d2076f00 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -699,8 +699,8 @@ public void testListOffsetsForNotExistedTopic() throws Exception { .setTargetTimes(Collections.singletonMap(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)) .build(ApiKeys.LIST_OFFSETS.latestVersion()); handler.handleListOffsetRequest( - new KafkaHeaderAndRequest(header, - request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); + new KafkaHeaderAndRequest(header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + responseFuture); final ListOffsetResponse response = (ListOffsetResponse) responseFuture.get(); assertTrue(response.responseData().containsKey(topicPartition)); assertEquals(response.responseData().get(topicPartition).error, Errors.UNKNOWN_TOPIC_OR_PARTITION); @@ -715,8 +715,7 @@ public void testMetadataForNonPartitionedTopic(short version) throws Exception { final MetadataRequest request = new MetadataRequest(Collections.singletonList(topic), false, version); final CompletableFuture responseFuture = new CompletableFuture<>(); handler.handleTopicMetadataRequest( - new KafkaHeaderAndRequest(header, - request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), + new KafkaHeaderAndRequest(header, request, PulsarByteBufAllocator.DEFAULT.heapBuffer(), null), responseFuture); final MetadataResponse response = (MetadataResponse) responseFuture.get(); assertEquals(response.topicMetadata().size(), 1); From 6ec341ce7d664962bbf74a081fcf85fc931e2f0d Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Wed, 29 Sep 2021 19:29:56 +0800 Subject: [PATCH 8/8] addressed reviewer's comments --- .../handlers/kop/KafkaRequestHandlerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 83d2076f00..c6e147e15d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -210,16 +210,16 @@ public void testResponseToByteBuf() throws Exception { ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build(); RequestHeader requestHeader = new RequestHeader( - ApiKeys.API_VERSIONS, - ApiKeys.API_VERSIONS.latestVersion(), - clientId, - correlationId); + ApiKeys.API_VERSIONS, + ApiKeys.API_VERSIONS.latestVersion(), + clientId, + correlationId); KafkaHeaderAndRequest kopRequest = new KafkaHeaderAndRequest( - requestHeader, - apiVersionsRequest, - Unpooled.buffer(20), - null); + requestHeader, + apiVersionsRequest, + Unpooled.buffer(20), + null); ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(); KafkaHeaderAndResponse kopResponse = KafkaHeaderAndResponse.responseForRequest(