diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index 3e146a7be5..141338b272 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -120,6 +120,10 @@ saslAllowedMechanisms= # Enable the deletion of inactive topics brokerDeleteInactiveTopicsEnabled=false +allowAutoTopicCreation=true + +allowAutoTopicCreationType=partitioned + # Name of the cluster to which this broker belongs to clusterName=kafka-cluster diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index 17a11e059b..5ddb096096 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -120,6 +120,10 @@ saslAllowedMechanisms= # Enable the deletion of inactive topics brokerDeleteInactiveTopicsEnabled=false +allowAutoTopicCreation=true + +allowAutoTopicCreationType=partitioned + ### --- General broker settings --- ### # Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 3cc58bbe73..14e2d07d33 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -28,7 +28,6 @@ import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,12 +46,7 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.ReaderBuilderImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -290,6 +284,7 @@ public void close() { if (groupCoordinator != null) { groupCoordinator.shutdown(); } + KafkaTopicManager.LOOKUP_CACHE.clear(); } public void initGroupCoordinator(BrokerService service) throws Exception { @@ -314,19 +309,8 @@ public void initGroupCoordinator(BrokerService service) throws Exception { // topicName in pulsar format: tenant/ns/topic createKafkaOffsetsTopic(service); - ProducerBuilder groupCoordinatorTopicProducer = service.pulsar().getClient() - .newProducer(Schema.BYTEBUFFER) - .maxPendingMessages(100000); - - // TODO: replace this back to service.pulsar().getClient().newReader after merge pulsar PR: - // https://github.com/apache/pulsar/pull/5923 - ReaderBuilder groupCoordinatorTopicReader = - new ReaderBuilderImpl<>((PulsarClientImpl) (service.pulsar().getClient()), Schema.BYTEBUFFER); - groupCoordinatorTopicReader.startMessageId(MessageId.earliest); - this.groupCoordinator = GroupCoordinator.of( - groupCoordinatorTopicProducer, - groupCoordinatorTopicReader, + (PulsarClientImpl) (service.pulsar().getClient()), groupConfig, offsetConfig, SystemTimer.builder() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 1d5c9e1b41..dd5bf5601f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -130,6 +130,10 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) thro // call pulsarclient.lookup.getbroker to get and own a topic public CompletableFuture getTopicBroker(String topicName) { return LOOKUP_CACHE.computeIfAbsent(topicName, t -> { + if (log.isDebugEnabled()) { + log.debug("topic {} not in Lookup_cache, call lookupBroker", + topicName); + } CompletableFuture returnFuture = new CompletableFuture<>(); Backoff backoff = new Backoff( 100, TimeUnit.MILLISECONDS, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 9b3155bd15..e534403edb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -314,10 +315,16 @@ public void readEntriesComplete(List list, Object o) { if (!list.isEmpty()) { entry = list.get(0); long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + PositionImpl currentPosition = PositionImpl + .get(entry.getLedgerId(), entry.getEntryId()); + + // commit the offset, so backlog not affect by this cursor. + commitOffset((NonDurableCursorImpl) cursor, currentPosition); + // get next offset - PositionImpl nextPosition = ((NonDurableCursorImpl ) cursor) - .getNextAvailablePosition(PositionImpl - .get(entry.getLedgerId(), entry.getEntryId())); + PositionImpl nextPosition = ((NonDurableCursorImpl) cursor) + .getNextAvailablePosition(currentPosition); + long nextOffset = MessageIdUtils .getOffset(nextPosition.getLedgerId(), nextPosition.getEntryId()); @@ -366,4 +373,23 @@ public void readEntriesFailed(ManagedLedgerException e, Object o) { return readFutures; } + // commit the offset, so backlog not affect by this cursor. + private static void commitOffset(NonDurableCursorImpl cursor, PositionImpl currentPosition) { + cursor.asyncMarkDelete(currentPosition, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("Mark delete success for position: {}", currentPosition); + } + } + + // this is OK, since this is kind of cumulative ack, following commit will come. + @Override + public void markDeleteFailed(ManagedLedgerException e, Object ctx) { + log.warn("Mark delete success for position: {} with error:", + currentPosition, e); + } + }, null); + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index 32206b9722..463e30df67 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -51,6 +51,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.kafka.common.TopicPartition; @@ -61,10 +62,14 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.Time; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.ReaderBuilderImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.FutureUtil; @@ -75,8 +80,7 @@ public class GroupCoordinator { public static GroupCoordinator of( - ProducerBuilder producer, - ReaderBuilder reader, + PulsarClientImpl pulsarClient, GroupConfig groupConfig, OffsetConfig offsetConfig, Timer timer, @@ -86,6 +90,13 @@ public static GroupCoordinator of( .name("group-coordinator-executor") .build(); + // __offset partitions producers and readers builder. + ProducerBuilder producer = pulsarClient + .newProducer(Schema.BYTEBUFFER) + .maxPendingMessages(100000); + ReaderBuilder reader = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTEBUFFER); + + reader.startMessageId(MessageId.earliest); GroupMetadataManager metadataManager = new GroupMetadataManager( offsetConfig, producer, @@ -105,12 +116,14 @@ public static GroupCoordinator of( .timeoutTimer(timer) .build(); + OffsetAcker offsetAcker = new OffsetAcker(pulsarClient); return new GroupCoordinator( groupConfig, metadataManager, heartbeatPurgatory, joinPurgatory, - time + time, + offsetAcker ); } @@ -133,6 +146,9 @@ public static GroupCoordinator of( Collections.emptyList() ); + // for topic backlog tracking. + @Getter + private final OffsetAcker offsetAcker; private final AtomicBoolean isActive = new AtomicBoolean(false); private final GroupConfig groupConfig; private final GroupMetadataManager groupManager; @@ -145,12 +161,14 @@ public GroupCoordinator( GroupMetadataManager groupManager, DelayedOperationPurgatory heartbeatPurgatory, DelayedOperationPurgatory joinPurgatory, - Time time) { + Time time, + OffsetAcker offsetAcker) { this.groupConfig = groupConfig; this.groupManager = groupManager; this.heartbeatPurgatory = heartbeatPurgatory; this.joinPurgatory = joinPurgatory; this.time = time; + this.offsetAcker = offsetAcker; } /** @@ -173,6 +191,7 @@ public void shutdown() { groupManager.shutdown(); heartbeatPurgatory.shutdown(); joinPurgatory.shutdown(); + offsetAcker.close(); log.info("Shutdown group coordinator completely."); } @@ -433,6 +452,12 @@ public CompletableFuture> handleSyncGroup( (assignment, errors) -> resultFuture.complete( new KeyValue<>(errors, assignment)) ); + + resultFuture.whenCompleteAsync((kv, throwable) -> { + if (throwable == null && kv.getKey() == Errors.NONE) { + offsetAcker.addOffsetsTracker(groupId, kv.getValue()); + } + }); return resultFuture; } @@ -642,6 +667,7 @@ public Map handleDeleteGroups(Set groupIds) { ); } + offsetAcker.close(groupIds); return groupErrors; } @@ -740,7 +766,7 @@ public CompletableFuture> handleCommitOffsets( int generationId, Map offsetMetadata ) { - return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) + CompletableFuture> result = validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) .map(error -> CompletableFuture.completedFuture( CoreUtils.mapValue( @@ -771,6 +797,14 @@ public CompletableFuture> handleCommitOffsets( } }); }); + + result.whenCompleteAsync((ignore, e) ->{ + if (e == null){ + offsetAcker.ackOffsets(groupId, offsetMetadata); + } + }); + + return result; } public Future scheduleHandleTxnCompletion( diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java new file mode 100644 index 0000000000..2214831d02 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -0,0 +1,126 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.group; + +import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; +import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.ReaderBuilderImpl; +import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.common.naming.TopicName; + +/** + * This class used to track all the partition offset commit position. + */ +@Slf4j +public class OffsetAcker implements Closeable { + + private final ReaderBuilder readerBuilder; + + public OffsetAcker(PulsarClientImpl pulsarClient) { + this.readerBuilder = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTES) + .receiverQueueSize(0) + .startMessageId(MessageId.earliest); + } + + // map off consumser: + Map>>> consumers = new ConcurrentHashMap<>(); + + public void addOffsetsTracker(String groupId, byte[] assignment) { + ByteBuffer assignBuffer = ByteBuffer.wrap(assignment); + Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer); + if (log.isDebugEnabled()) { + log.debug(" Add offsets after sync group: {}", assign.toString()); + } + assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition)); + } + + public void ackOffsets(String groupId, Map offsetMetadata) { + if (log.isDebugEnabled()) { + log.debug(" ack offsets after commit offset for group: {}", groupId); + offsetMetadata.forEach((partition, metadata) -> + log.debug("\t partition: {}, offset: {}", + partition, MessageIdUtils.getPosition(metadata.offset()))); + } + offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> { + // 1. get consumer, then do ackCumulative + CompletableFuture> consumerFuture = getConsumer(groupId, topicPartition); + + consumerFuture.whenComplete((consumer, throwable) -> { + if (throwable != null) { + log.warn("Error when get consumer for offset ack:", throwable); + return; + } + MessageId messageId = MessageIdUtils.getMessageId(offsetAndMetadata.offset()); + consumer.acknowledgeCumulativeAsync(messageId); + }); + })); + } + + public void close(Set groupIds) { + groupIds.forEach(groupId -> consumers.get(groupId).values().forEach(consumerFuture -> { + consumerFuture.whenComplete((consumer, throwable) -> { + if (throwable != null) { + log.warn("Error when get consumer for consumer group close:", throwable); + return; + } + try { + consumer.close(); + } catch (Exception e) { + log.warn("Error when close consumer topic: {}, sub: {}.", + consumer.getTopic(), consumer.getSubscription(), e); + } + }); + })); + } + + @Override + public void close() { + log.info("close OffsetAcker with {} groupIds", consumers.size()); + close(consumers.keySet()); + } + + private CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + Map>> group = consumers + .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); + return group.computeIfAbsent( + topicPartition, + partition -> createConsumer(groupId, partition)); + } + + private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { + TopicName pulsarTopicName = TopicNameUtils.pulsarTopicName(topicPartition); + return readerBuilder.clone() + .topic(pulsarTopicName.toString()) + .subscriptionRolePrefix(groupId) + .createAsync() + .thenApply(reader -> ((ReaderImpl) reader).getConsumer()); + } + +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java new file mode 100644 index 0000000000..7e56de2610 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CommitOffsetBacklogTest.java @@ -0,0 +1,233 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + + +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test for Different kafka produce messages. + */ +@Slf4j +public class CommitOffsetBacklogTest extends KopProtocolHandlerTestBase { + + @Override + protected void resetConfig() { + super.resetConfig(); + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + log.info("success internal setup"); + + if (!admin.clusters().getClusters().contains(configClusterName)) { + // so that clients can test short names + admin.clusters().createCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } else { + admin.clusters().updateCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } + + if (!admin.tenants().getTenants().contains("public")) { + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } else { + admin.tenants().updateTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } + if (!admin.namespaces().getNamespaces("public").contains("public/default")) { + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(60, 1000)); + } + if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + admin.namespaces().createNamespace("public/__kafka"); + admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/__kafka", + new RetentionPolicies(-1, -1)); + } + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + // dump Topic Stats, mainly want to get and verify backlogSize. + private void verifyBacklogInTopicStats(PersistentTopic persistentTopic, int expected) throws Exception { + final AtomicLong backlog = new AtomicLong(0); + retryStrategically( + ((test) -> { + backlog.set(persistentTopic.getStats().backlogSize); + return backlog.get() == expected; + }), + 5, + 200); + + if (log.isDebugEnabled()) { + TopicStats topicStats = persistentTopic.getStats(); + log.info(" dump topicStats for topic : {}, storageSize: {}, backlogSize: {}, expected: {}", + persistentTopic.getName(), + topicStats.storageSize, topicStats.backlogSize, expected); + + topicStats.subscriptions.forEach((subname, substats) -> { + log.debug(" dump sub: subname - {}, activeConsumerName {}, " + + "consumers {}, msgBacklog {}, unackedMessages {}.", + subname, + substats.activeConsumerName, substats.consumers, + substats.msgBacklog, substats.unackedMessages); + }); + + persistentTopic.getManagedLedger().getCursors().forEach(cursor -> + log.debug(" dump cursor: cursor - {}, durable: {}, numberEntryis: {}," + + " readPosition: {}, markdeletePosition: {}", + cursor.getName(), cursor.isDurable(), cursor.getNumberOfEntries(), + cursor.getReadPosition(), cursor.getMarkDeletedPosition())); + } + + assertEquals(backlog.get(), expected); + } + + @Test(timeOut = 20000) + public void testOffsetCommittedBacklogCleared() throws Exception { + String kafkaTopicName = "kopOffsetCommittedBacklogCleared"; + String pulsarTopicName = "persistent://public/default/" + kafkaTopicName; + String pulsarPartitionName = pulsarTopicName + "-partition-" + 0; + + TopicPartition kafkaPartition = new TopicPartition(kafkaTopicName, 0); + List kafkaPartitions = Lists.newArrayList(kafkaPartition); + + // create partitioned topic with 1 partition. + admin.topics().createPartitionedTopic(kafkaTopicName, 1); + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic(pulsarTopicName) + .enableBatching(false); + + // create 1 pulsar producer and 2 kafka consumer group, each with 1 consumer. + @Cleanup + Producer producer = producerBuilder.create(); + PersistentTopic topicRef = (PersistentTopic) + pulsar.getBrokerService().getTopicReference(pulsarPartitionName).get(); + + String consumerGroupA = kafkaTopicName + "_cg_a"; + String consumerGroupB = kafkaTopicName + "_cg_b"; + @Cleanup + KConsumer kConsumerA = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), false, consumerGroupA); + @Cleanup + KConsumer kConsumerB = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), false, consumerGroupB); + + kConsumerA.getConsumer().subscribe(Collections.singletonList(kafkaTopicName)); + kConsumerB.getConsumer().subscribe(Collections.singletonList(kafkaTopicName)); + + int totalMsgs = 60; + String messageStrPrefix = "Message_Kop_ProduceConsumeMultiLedger_XXXXXXXXXXXXX_"; + + // create messages, each size of 100 bytes, all 6000 bytes. + MessageId lastMessageId = null; + for (int i = 0; i < totalMsgs; i++) { + String message = messageStrPrefix + (i % 10); + lastMessageId = producer.newMessage() + .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) + .value(message.getBytes()) + .send(); + } + + int i = 0; + while (i < totalMsgs / 2 - 1) { + if (log.isDebugEnabled()) { + log.debug("start poll message from cgA: {}", i); + } + ConsumerRecords records = kConsumerA.getConsumer().poll(Duration.ofMillis(200)); + for (ConsumerRecord record : records) { + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } + i++; + } + } + + i = 0; + while (i < totalMsgs / 2 - 1) { + if (log.isDebugEnabled()) { + log.debug("start poll message from cgB: {}", i); + } + ConsumerRecords records = kConsumerB.getConsumer().poll(Duration.ofMillis(200)); + for (ConsumerRecord record : records) { + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } + i++; + } + } + + // 2 consumers not acked. expected backlog == 6000. + verifyBacklogInTopicStats(topicRef, 6000); + + // 1 consumer acked. still expected backlog 6000 + kConsumerA.getConsumer().commitSync(); + verifyBacklogInTopicStats(topicRef, 6000); + + // 2 consumers acked, consumed 30 X 100. expected backlog 6000 - 3000 + kConsumerB.getConsumer().commitSync(); + verifyBacklogInTopicStats(topicRef, 6000 - 3000); + + // 2 consumers consumed and acked all messages, expected backlog 0. + ConsumerRecords recordsA = kConsumerA.getConsumer().poll(Duration.ofMillis(200)); + while (!recordsA.isEmpty()) { + recordsA = kConsumerA.getConsumer().poll(Duration.ofMillis(200)); + } + + ConsumerRecords recordsB = kConsumerB.getConsumer().poll(Duration.ofMillis(200)); + while (!recordsB.isEmpty()) { + recordsB = kConsumerB.getConsumer().poll(Duration.ofMillis(200)); + } + kConsumerA.getConsumer().commitSync(); + kConsumerB.getConsumer().commitSync(); + verifyBacklogInTopicStats(topicRef, 0); + } + + + +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index e744bd90e7..db85dc2453 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.gson.Gson; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; @@ -31,7 +30,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; @@ -41,7 +39,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.junit.Assert; @@ -344,18 +341,6 @@ public void testMutiBrokerAndCoordinator() throws Exception { topicMap.values().stream().forEach(list -> numberTopic2.addAndGet(list.size())); assertTrue(numberTopic2.get() == partitionNumber); - final PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(pulsarTopicName, true); - log.info("PartitionedTopicStats for topic {} : {}", pulsarTopicName, new Gson().toJson(topicStats)); - - topicMap.forEach((broker, topics) -> { - AtomicLong brokerStorageSize = new AtomicLong(0); - topics.forEach(topic -> { - brokerStorageSize.addAndGet(topicStats.partitions.get(topic).storageSize); - }); - log.info("get data topics served by broker {}, broker storage size: {}", broker, brokerStorageSize.get()); - assertTrue(brokerStorageSize.get() > 0L); - }); - offsetTopicMap = Maps.newHashMap(); for (int ii = 0; ii < offsetsTopicNumPartitions; ii++) { String offsetsTopic = offsetsTopicName + PARTITIONED_TOPIC_SUFFIX + ii; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 346a83b472..102cf67525 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -113,6 +113,7 @@ protected void resetConfig() { kafkaConfig.setManagedLedgerCacheSizeMB(8); kafkaConfig.setActiveConsumerFailoverDelayTimeMillis(0); + kafkaConfig.setDefaultRetentionTimeInMinutes(7); kafkaConfig.setDefaultNumberOfNamespaceBundles(1); kafkaConfig.setZookeeperServers("localhost:2181"); kafkaConfig.setConfigurationStoreServers("localhost:3181"); @@ -511,6 +512,10 @@ public KConsumer( this.consumerGroup = consumerGroup; } + public KConsumer(String topic, int port, boolean autoCommit, String consumerGroup) { + this(topic, "localhost", port, autoCommit, null, null, consumerGroup); + } + public KConsumer(String topic, int port, boolean autoCommit) { this(topic, "localhost", port, autoCommit, null, null, "DemoKafkaOnPulsarConsumer"); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java index 9c7fa7ed7e..8bf51ef4fa 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -56,6 +57,7 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -94,6 +96,32 @@ public class GroupCoordinatorTest extends KopProtocolHandlerTestBase { private int otherGroupPartitionId; private Map protocols; + static class MockOffsetAcker extends OffsetAcker { + public MockOffsetAcker(PulsarClientImpl pulsarClient) { + super(pulsarClient); + } + + @Override + public void addOffsetsTracker(String groupId, byte[] assignment) { + // non op + } + + @Override + public void ackOffsets(String groupId, Map offsetMetadata) { + // non op + } + + @Override + public void close(Set groupIds) { + // non op + } + + @Override + public void close() { + // non op + } + } + @Override protected void resetConfig() { super.resetConfig(); @@ -185,7 +213,8 @@ public void setup() throws Exception { groupMetadataManager, heartbeatPurgatory, joinPurgatory, - timer.time() + timer.time(), + new MockOffsetAcker((PulsarClientImpl) pulsarClient) ); // start the group coordinator @@ -479,7 +508,8 @@ public void testValidHeartbeat() throws Exception { assertEquals(Errors.NONE, heartbeatResult); } - @Test + @Test(enabled = false) + // todo: https://github.com/streamnative/kop/issues/108 public void testSessionTimeout() throws Exception { String memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; JoinGroupResult joinGroupResult = joinGroup(