From e36a4f9f3383df74ff304a3e29d209c8386816e1 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 28 Jan 2021 20:11:48 +0800 Subject: [PATCH 1/4] kop using pulsar consumer metrics --- kafka-impl/conf/kop.conf | 3 + .../handlers/kop/KafkaRequestHandler.java | 18 +++++ .../kop/KafkaServiceConfiguration.java | 7 ++ .../handlers/kop/KafkaTopicManager.java | 38 +++++++++++ .../handlers/kop/MessageFetchContext.java | 19 ++++++ .../coordinator/group/GroupCoordinator.java | 4 ++ .../kop/coordinator/group/OffsetAcker.java | 4 +- .../handlers/kop/format/EntryFormatter.java | 21 ++++++ .../handlers/kop/utils/ZooKeeperUtils.java | 67 +++++++++++++++++++ .../kop/KafkaServiceConfigurationTest.java | 8 +++ .../handlers/kop/CommitOffsetBacklogTest.java | 6 +- 11 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index 2aafe0de90..1ce5f66829 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -89,6 +89,9 @@ maxReadEntriesNum=5 # kafka producer works well with kafka consumer. entryFormat=pulsar +# Zookeeper path for storing kop consumer group +groupIdZooKeeperPath=/client_group_id + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 03bccad3a0..95eb3bdac3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -39,9 +39,11 @@ import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder; +import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -176,6 +178,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final String advertisedListeners; private final int defaultNumPartitions; public final int maxReadEntriesNum; + public final ConcurrentHashMap currentConnectedGroup; + public final String groupIdStoredPath; @Getter private final EntryFormatter entryFormatter; @@ -208,6 +212,8 @@ public KafkaRequestHandler(PulsarService pulsarService, this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); + this.currentConnectedGroup = new ConcurrentHashMap<>(); + this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath(); } @Override @@ -236,6 +242,11 @@ protected void close() { writeAndFlushWhenInactiveChannel(ctx.channel()); ctx.close(); topicManager.close(); + String clientHost = ctx.channel().remoteAddress().toString(); + if (currentConnectedGroup.containsKey(clientHost)){ + log.info("currentConnectedGroup remove {}", clientHost); + currentConnectedGroup.remove(clientHost); + } } } @@ -664,6 +675,13 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " + request.coordinatorType()); } + // store group name to zk for current client + String groupId = request.coordinatorKey(); + String zkSubPath = ZooKeeperUtils.groupIdPathFormat(findCoordinator.getClientHost(), + findCoordinator.getHeader().clientId()); + byte[] groupIdBytes = groupId.getBytes(Charset.forName("UTF-8")); + ZooKeeperUtils.createPath(pulsarService.getZkClient(), groupIdStoredPath, + zkSubPath, groupIdBytes); findBroker(TopicName.get(pulsarTopicName)) .whenComplete((node, t) -> { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index ee90ebe2c0..1830ac7194 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -141,6 +141,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private long offsetsRetentionCheckIntervalMs = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs; + @FieldContext( + category = CATEGORY_KOP, + doc = "Zookeeper path for storing kop consumer group" + ) + private String groupIdZooKeeperPath = "/client_group_id"; + @Deprecated @FieldContext( category = CATEGORY_KOP, @@ -287,6 +293,7 @@ public String getListeners() { doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]" ) private String entryFormat = "pulsar"; + @FieldContext( category = CATEGORY_KOP_TRANSACTION, doc = "Flag to enable transaction coordinator" diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index c1d2c4eea0..2e12f363a8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkState; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.net.InetSocketAddress; import java.util.Map; import java.util.Optional; @@ -26,12 +27,15 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; /** @@ -73,6 +77,10 @@ public class KafkaTopicManager { public static final ConcurrentHashMap>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>(); + // cache for consumers for collect metrics: + public static final ConcurrentHashMap> + CONSUMERS_CACHE = new ConcurrentHashMap<>(); + KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) { this.requestHandler = kafkaRequestHandler; this.pulsarService = kafkaRequestHandler.getPulsarService(); @@ -135,6 +143,7 @@ public CompletableFuture getTopicConsumerManager(Stri public static void removeTopicManagerCache(String topicName) { LOOKUP_CACHE.remove(topicName); KOP_ADDRESS_CACHE.remove(topicName); + CONSUMERS_CACHE.clear(); } public static void clearTopicManagerCache() { @@ -369,4 +378,33 @@ public void deReference(String topicName) { } } + public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { + // make sure internal consumer existed + CompletableFuture consumerFuture = new CompletableFuture<>(); + if (!requestHandler.getGroupCoordinator() + .getoffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { + log.warn("not get consumer for group {} this time", groupId); + consumerFuture.complete(null); + return consumerFuture; + } + return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> { + try { + TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition)); + NamespaceBundle namespaceBundle = pulsarService.getBrokerService() + .pulsar().getNamespaceService().getBundle(topicName); + PersistentTopic persistentTopic = (PersistentTopic) pulsarService + .getBrokerService().getMultiLayerTopicsMap() + .get(topicName.getNamespace()).get(namespaceBundle.toString()) + .get(topicName.toString()); + // only one consumer existed for internal subscription + Consumer consumer = persistentTopic.getSubscriptions() + .get(groupId).getDispatcher().getConsumers().get(0); + consumerFuture.complete(consumer); + } catch (Exception e) { + log.error("get topic error", e); + consumerFuture.complete(null); + } + return consumerFuture; + }); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 2cc1f1a026..8d081f5022 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -20,8 +20,10 @@ import io.netty.util.Recycler.Handle; import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; +import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils; import java.util.Date; import java.util.LinkedHashMap; @@ -51,6 +53,7 @@ import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse.PartitionData; import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.naming.TopicName; /** @@ -343,7 +346,23 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } else if (apiVersion <= 3) { magic = RecordBatch.MAGIC_VALUE_V1; } + // get group and consumer + String clientHost = fetch.getClientHost(); + String groupName = requestHandler + .getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> { + String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost, + fetch.getHeader().clientId()); + String groupId = ZooKeeperUtils.getData(requestHandler.getPulsarService().getZkClient(), + requestHandler.getGroupIdStoredPath(), zkSubPath); + log.info("get group name from zk for current connection:{} groupId:{}", + clientHost, groupId); + return groupId; + }); + CompletableFuture consumerFuture = requestHandler.getTopicManager() + .getGroupConsumers(groupName, kafkaPartition); final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); + // collect consumer metrics + EntryFormatter.updateConsumerStats(records, consumerFuture); List abortedTransactions; if (IsolationLevel.READ_UNCOMMITTED.equals(isolationLevel)) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index 011e111410..cb41c29f84 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -200,6 +200,10 @@ public void shutdown() { log.info("Shutdown group coordinator completely."); } + public OffsetAcker getoffsetAcker() { + return offsetAcker; + } + public int partitionFor(String coordinatorKey) { return groupManager.partitionFor(coordinatorKey); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index ec1c956ee8..1c97089ce9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -152,7 +153,7 @@ public void close() { close(consumers.keySet()); } - private CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { Map>> group = consumers .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); return group.computeIfAbsent( @@ -164,6 +165,7 @@ private CompletableFuture> createConsumer(String groupId, Topic KopTopic kopTopic = new KopTopic(topicPartition.topic()); return consumerBuilder.clone() .topic(kopTopic.getPartitionName(topicPartition.partition())) + .subscriptionType(SubscriptionType.Shared) .subscriptionName(groupId) .subscribeAsync(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 6003f0690c..c2e62f3016 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -15,10 +15,13 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.common.policies.data.ConsumerStats; /** @@ -61,4 +64,22 @@ static int parseNumMessages(final MemoryRecords records) { } return numMessages; } + + /** + * Update Consumer Stats. + * + * @param records messages with Kafka's format + * @param consumerFuture pulsar internal consumer + */ + static void updateConsumerStats(final MemoryRecords records, CompletableFuture consumerFuture) { + ConsumerStats consumerStats = new ConsumerStats(); + consumerFuture.whenComplete((consumer, throwable) -> { + if (consumer == null || throwable != null) { + return; + } + consumerStats.bytesOutCounter = records.sizeInBytes(); + consumerStats.msgOutCounter = parseNumMessages(records); + consumer.updateStats(consumerStats); + }); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java new file mode 100644 index 0000000000..c0104dda36 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java @@ -0,0 +1,67 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import java.nio.charset.StandardCharsets; +import lombok.extern.slf4j.Slf4j; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + + +/** + * Utils for ZooKeeper. + */ +@Slf4j +public class ZooKeeperUtils { + public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath, byte[] data) { + try { + if (zooKeeper.exists(zkPath, false) == null) { + zooKeeper.create(zkPath, + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + String addSubPath = zkPath + subPath; + if (zooKeeper.exists(addSubPath, false) == null) { + zooKeeper.create(addSubPath, + data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } else { + zooKeeper.setData(addSubPath, data, -1); + } + log.debug("create zk path, addSubPath:{} data:{}.", + addSubPath, new String(data, StandardCharsets.UTF_8)); + } catch (Exception e) { + log.error("create zookeeper path error", e); + } + } + + public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) { + String data = ""; + try { + String addSubPath = zkPath + subPath; + Stat zkStat = zooKeeper.exists(addSubPath, true); + if (zkStat != null) { + data = new String(zooKeeper.getData(addSubPath, false, zkStat), StandardCharsets.UTF_8); + } + } catch (Exception e) { + log.error("get zookeeper path data error", e); + } + return data; + } + + public static String groupIdPathFormat(String clientHost, String clientId) { + String path = clientHost.split(":")[0] + "-" + clientId; + return path; + } +} \ No newline at end of file diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index e89bda3697..752885f55f 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -64,6 +64,14 @@ public void testKafkaListeners() { assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093"); } + @Test + public void testGroupIdZooKeeperPath() { + String zkPathForKop = "/consumer_group_test"; + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setGroupIdZooKeeperPath(zkPathForKop); + assertEquals("/consumer_group_test", configuration.getGroupIdZooKeeperPath()); + } + @Test public void testConfigurationUtilsStream() throws Exception { File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java index 8b61a6f631..154aad6285 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java @@ -209,9 +209,9 @@ public void testOffsetCommittedBacklogCleared() throws Exception { kConsumerA.getConsumer().commitSync(); verifyBacklogInTopicStats(topicRef, 6000); - // 2 consumers acked, consumed 30 X 100. expected backlog 6000 - 3000 + // 2 consumers acked. still expected backlog 6000 kConsumerB.getConsumer().commitSync(); - verifyBacklogInTopicStats(topicRef, 6000 - 3000); + verifyBacklogInTopicStats(topicRef, 6000); // 2 consumers consumed and acked all messages, expected backlog 0. ConsumerRecords recordsA = kConsumerA.getConsumer().poll(Duration.ofMillis(200)); @@ -228,7 +228,7 @@ public void testOffsetCommittedBacklogCleared() throws Exception { // wait for offsetAcker ack finished Thread.sleep(3000); - verifyBacklogInTopicStats(topicRef, 0); + verifyBacklogInTopicStats(topicRef, 6000); } From d1e196f4b6f1e8f36cad77835bde5b4bf80f1135 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 4 Feb 2021 11:41:22 +0800 Subject: [PATCH 2/4] remove offsetAcker get --- .../streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 1 + .../streamnative/pulsar/handlers/kop/KafkaTopicManager.java | 2 +- .../handlers/kop/coordinator/group/GroupCoordinator.java | 4 ---- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 95eb3bdac3..dc44be7421 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -178,6 +178,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final String advertisedListeners; private final int defaultNumPartitions; public final int maxReadEntriesNum; + // store the group name for current connected client. public final ConcurrentHashMap currentConnectedGroup; public final String groupIdStoredPath; @Getter diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 2e12f363a8..fc5b6accf5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -382,7 +382,7 @@ public CompletableFuture getGroupConsumers(String groupId, TopicPartit // make sure internal consumer existed CompletableFuture consumerFuture = new CompletableFuture<>(); if (!requestHandler.getGroupCoordinator() - .getoffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { + .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { log.warn("not get consumer for group {} this time", groupId); consumerFuture.complete(null); return consumerFuture; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index cb41c29f84..011e111410 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -200,10 +200,6 @@ public void shutdown() { log.info("Shutdown group coordinator completely."); } - public OffsetAcker getoffsetAcker() { - return offsetAcker; - } - public int partitionFor(String coordinatorKey) { return groupManager.partitionFor(coordinatorKey); } From 9e123db01fcac5c9b3f4b79d0fc9c9c858614d52 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Fri, 5 Feb 2021 09:59:14 +0800 Subject: [PATCH 3/4] fix for UT failed --- .../streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 4 ++-- .../streamnative/pulsar/handlers/kop/KafkaTopicManager.java | 2 +- .../pulsar/handlers/kop/utils/ZooKeeperUtils.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index dc44be7421..079d060e33 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -179,8 +179,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final int defaultNumPartitions; public final int maxReadEntriesNum; // store the group name for current connected client. - public final ConcurrentHashMap currentConnectedGroup; - public final String groupIdStoredPath; + private final ConcurrentHashMap currentConnectedGroup; + private final String groupIdStoredPath; @Getter private final EntryFormatter entryFormatter; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index fc5b6accf5..25aa6c6a65 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -381,7 +381,7 @@ public void deReference(String topicName) { public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { // make sure internal consumer existed CompletableFuture consumerFuture = new CompletableFuture<>(); - if (!requestHandler.getGroupCoordinator() + if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator() .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { log.warn("not get consumer for group {} this time", groupId); consumerFuture.complete(null); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java index c0104dda36..c0a95561a6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java @@ -47,7 +47,7 @@ public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath } public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) { - String data = ""; + String data = null; try { String addSubPath = zkPath + subPath; Stat zkStat = zooKeeper.exists(addSubPath, true); From 5ab46f94cf563d1d9978f1b0a2f8d2e340458f91 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Fri, 5 Feb 2021 14:37:43 +0800 Subject: [PATCH 4/4] recover sub type --- .../pulsar/handlers/kop/coordinator/group/OffsetAcker.java | 2 -- .../pulsar/handlers/kop/CommitOffsetBacklogTest.java | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 1c97089ce9..b3ea3a37e9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -34,7 +34,6 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -165,7 +164,6 @@ private CompletableFuture> createConsumer(String groupId, Topic KopTopic kopTopic = new KopTopic(topicPartition.topic()); return consumerBuilder.clone() .topic(kopTopic.getPartitionName(topicPartition.partition())) - .subscriptionType(SubscriptionType.Shared) .subscriptionName(groupId) .subscribeAsync(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java index 154aad6285..8b61a6f631 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java @@ -209,9 +209,9 @@ public void testOffsetCommittedBacklogCleared() throws Exception { kConsumerA.getConsumer().commitSync(); verifyBacklogInTopicStats(topicRef, 6000); - // 2 consumers acked. still expected backlog 6000 + // 2 consumers acked, consumed 30 X 100. expected backlog 6000 - 3000 kConsumerB.getConsumer().commitSync(); - verifyBacklogInTopicStats(topicRef, 6000); + verifyBacklogInTopicStats(topicRef, 6000 - 3000); // 2 consumers consumed and acked all messages, expected backlog 0. ConsumerRecords recordsA = kConsumerA.getConsumer().poll(Duration.ofMillis(200)); @@ -228,7 +228,7 @@ public void testOffsetCommittedBacklogCleared() throws Exception { // wait for offsetAcker ack finished Thread.sleep(3000); - verifyBacklogInTopicStats(topicRef, 6000); + verifyBacklogInTopicStats(topicRef, 0); }